RDD_Transfom_转换算子

网友投稿 252 2022-09-27

RDD_Transfom_转换算子

RDD的算子:转换算子和行动算子

1.单value

map,mapPar,mapParIndex,flatMap,glom,groupBy,filter,sample,distinct,coalesce,reparation,sortBy

1.map

object Spark01_RDD_Operation_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4)) //转换函数 def mapFuntion(num:Int):Int = { num * 2 } val mapRDD : RDD[Int] = rdd.map(mapFuntion) val mapRDD2 = rdd.map(num => num * 2) val mapRDD1 : RDD[Int] = rdd.map(_*2) mapRDD1.collect().foreach(print) sc.stop() }}

2.mapPar

val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) val RDD = rdd.mapPartitionsWithIndex( (index, item) => { if (index == 1) { item } else { // 空的迭代器 Nil.iterator } } ) RDD.collect().foreach(println)

3.flatMap

扁平化

val rdd = sc.makeRDD(List(List(1, 2), List(3, 4))) // 整体到个体 val RDD = rdd.flatMap( list => { list } ) val rdd1 = sc.makeRDD(List("hello scala", "hello spark")) val RDD1 = rdd1.flatMap( s => { s.split(" ") } ) RDD1.collect().foreach(println)

4.glom

转化类型

// Int - Array val rdd = sc.makeRDD(List(1, 2, 3, 4)) val glomRDD = rdd.glom() //返回一个Array // 可以计算分区内最大值,分区间最大值求和 glomRDD.collect().foreach(data => println(data.mkString(",")))

5.groupBy

分组

groupBy会将数据重新打乱,然后重写组合,为shuffle

// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) def groupFunction(num: Int) : Int = { num % 2 } val RDD = rdd.groupBy(groupFunction) RDD.collect().foreach(println)

6.filter:过滤

// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // 1 3 val RDD = rdd.filter(_%2!= 0) RDD.collect().foreach(println)

7.sample:随机抽取

第一个参数为是否放回,第二个参数为抽到的概率,第三个参数为是否为种子

// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // 2个参数:>0.4 抽到 3个参数:确定数字 val RDD = rdd.sample(false, 0.4, 1) println(RDD.collect().mkString(","))

8.distinct

distinct:去重,底层为Hashset

// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 7, 9, 10)) // 如何去重 val RDD = rdd.distinct() // List底层为hashSet, RDD.collect().foreach(println)

9.coalesce

coalesce:缩小分区,如果第二个参数为true的话会shuffle

// 缩小分区 // 如果想要不出现数据不均衡,可以进行shuffle处理 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3) val RDD = rdd.coalesce(2, true) RDD.saveAsTextFile("outPut")

10.repartition

repartition:扩大分区,相当于coalesce扩大分区第二个参数为true

// 扩大分区 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) val RDD = rdd.repartition(3) RDD.saveAsTextFile("outPut")

11.sortBy

sortBy:根据排序

val rdd = sc.makeRDD(List(4, 2, 1, 5, 7)) val RDD = rdd.sortBy(num => num) RDD.saveAsTextFile("outPut")

2.双value

intersection,union,substract,zip

1.intersection

交集

val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5 ,6)) //交集:3 4 val RDD1 = rdd1.intersection(rdd2) println(RDD1.collect().mkString("-"))

2.unIon

并集

val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5 ,6)) //并集: 1 2 3 4 5 6 val RDD2 = rdd1.union(rdd2) println(RDD2.collect().mkString("-"))

3.substract

差集

val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5 ,6)) //差集:1 2 val RDD3 = rdd1.subtract(rdd2) println(RDD3.collect().mkString("-"))

4.zip

拉链

//拉链:1-3 2-4 3-5 4-6 val RDD4 = rdd1.zip(rdd2) println(RDD4.collect().mkString("-"))

注意拉链的类型可以不一样,但是分区数必须一样,其他的类型必须一样

3.key-value

groupByKey,reduceByKey,aggregateByKey,foldByKey,combineByKey

object Spark_Test05_Key_Value { def main(args: Array[String]): Unit = { // key-value: val sparkConf = new SparkConf().setAppName("Map").setMaster("local") val sc = new SparkContext(sparkConf) val rdd:RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) val mapRDD = rdd.map((_, 1)) // RDD => PairRDDFunctions:隐式转换,二次编译 // 因为RDD源码中有一个伴生对象,隐式函数 // 根据指定规则重分区:HashPartitionere:根据// mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output") val rdd1 = sc.makeRDD(List((1, 1), (1, 2), (2, 3), (2, 4))) // reduceByKey:相同key聚合:分区内的聚合 rdd1.reduceByKey((x:Int, y:Int) => x + y).collect().foreach(println) // groupByKey:相同key一个组 rdd1.groupByKey().collect().foreach(println) // aggregateByKey:分区内和分区间分离开 // 结果取决于传入的第一个值的类型 rdd1.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println) // 规则一样的都是(x + y) => x + y rdd1.aggregateByKey(0)(_+_, _+_).collect().foreach(println) // 如果计算规则一样的话为foldByKey rdd1.foldByKey(0)(_+_).collect().foreach(println) // combineByKey:相同key的第一条数据的处理函数,分区内,分区间 rdd1.combineByKey(v => v, (x:Int, y) => x + y, (x: Int, y: Int) => x + y).collect().foreach(println) // reduceByKey, foldByKey, aggregateByKey, combineByKey //都为wordcount sc.stop() }}

4.两个key-value

join,leftOuterJion,rightOuterJoin,cogroup

// 两个key-value // join:会笛卡尔积,几何倍增长,内连接 val rdd2 = sc.makeRDD(List(("A", 1), ("A", 2), ("A", 3), ("B", 4))) val rdd3 = sc.makeRDD(List(("A", 6), ("A", 9), ("A", 0), ("B", 1))) rdd2.join(rdd3).collect().foreach(println) // 左外连接,右外连接:leftOuterJoin, rightOuterJoin rdd2.rightOuterJoin(rdd3).collect().foreach(println) // cogroup:connect + group:相同的key放在一个组,然后group rdd2.cogroup(rdd3).collect().foreach(println)

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:使用Mybatis
下一篇:Bean的生命周期与用到的设计模式
相关文章

 发表评论

暂时没有评论,来抢沙发吧~