oracle竖列的数据怎么变成一行
252
2022-09-27
RDD_Transfom_转换算子
RDD的算子:转换算子和行动算子
1.单value
map,mapPar,mapParIndex,flatMap,glom,groupBy,filter,sample,distinct,coalesce,reparation,sortBy
1.map
object Spark01_RDD_Operation_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4)) //转换函数 def mapFuntion(num:Int):Int = { num * 2 } val mapRDD : RDD[Int] = rdd.map(mapFuntion) val mapRDD2 = rdd.map(num => num * 2) val mapRDD1 : RDD[Int] = rdd.map(_*2) mapRDD1.collect().foreach(print) sc.stop() }}
2.mapPar
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) val RDD = rdd.mapPartitionsWithIndex( (index, item) => { if (index == 1) { item } else { // 空的迭代器 Nil.iterator } } ) RDD.collect().foreach(println)
3.flatMap
扁平化
val rdd = sc.makeRDD(List(List(1, 2), List(3, 4))) // 整体到个体 val RDD = rdd.flatMap( list => { list } ) val rdd1 = sc.makeRDD(List("hello scala", "hello spark")) val RDD1 = rdd1.flatMap( s => { s.split(" ") } ) RDD1.collect().foreach(println)
4.glom
转化类型
// Int - Array val rdd = sc.makeRDD(List(1, 2, 3, 4)) val glomRDD = rdd.glom() //返回一个Array // 可以计算分区内最大值,分区间最大值求和 glomRDD.collect().foreach(data => println(data.mkString(",")))
5.groupBy
分组
groupBy会将数据重新打乱,然后重写组合,为shuffle
// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) def groupFunction(num: Int) : Int = { num % 2 } val RDD = rdd.groupBy(groupFunction) RDD.collect().foreach(println)
6.filter:过滤
// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // 1 3 val RDD = rdd.filter(_%2!= 0) RDD.collect().foreach(println)
7.sample:随机抽取
第一个参数为是否放回,第二个参数为抽到的概率,第三个参数为是否为种子
// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // 2个参数:>0.4 抽到 3个参数:确定数字 val RDD = rdd.sample(false, 0.4, 1) println(RDD.collect().mkString(","))
8.distinct
distinct:去重,底层为Hashset
// 算子, 转换算子 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 7, 9, 10)) // 如何去重 val RDD = rdd.distinct() // List底层为hashSet, RDD.collect().foreach(println)
9.coalesce
coalesce:缩小分区,如果第二个参数为true的话会shuffle
// 缩小分区 // 如果想要不出现数据不均衡,可以进行shuffle处理 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3) val RDD = rdd.coalesce(2, true) RDD.saveAsTextFile("outPut")
10.repartition
repartition:扩大分区,相当于coalesce扩大分区第二个参数为true
// 扩大分区 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) val RDD = rdd.repartition(3) RDD.saveAsTextFile("outPut")
11.sortBy
sortBy:根据排序
val rdd = sc.makeRDD(List(4, 2, 1, 5, 7)) val RDD = rdd.sortBy(num => num) RDD.saveAsTextFile("outPut")
2.双value
intersection,union,substract,zip
1.intersection
交集
val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5 ,6)) //交集:3 4 val RDD1 = rdd1.intersection(rdd2) println(RDD1.collect().mkString("-"))
2.unIon
并集
val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5 ,6)) //并集: 1 2 3 4 5 6 val RDD2 = rdd1.union(rdd2) println(RDD2.collect().mkString("-"))
3.substract
差集
val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5 ,6)) //差集:1 2 val RDD3 = rdd1.subtract(rdd2) println(RDD3.collect().mkString("-"))
4.zip
拉链
//拉链:1-3 2-4 3-5 4-6 val RDD4 = rdd1.zip(rdd2) println(RDD4.collect().mkString("-"))
注意拉链的类型可以不一样,但是分区数必须一样,其他的类型必须一样
3.key-value
groupByKey,reduceByKey,aggregateByKey,foldByKey,combineByKey
object Spark_Test05_Key_Value { def main(args: Array[String]): Unit = { // key-value: val sparkConf = new SparkConf().setAppName("Map").setMaster("local") val sc = new SparkContext(sparkConf) val rdd:RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) val mapRDD = rdd.map((_, 1)) // RDD => PairRDDFunctions:隐式转换,二次编译 // 因为RDD源码中有一个伴生对象,隐式函数 // 根据指定规则重分区:HashPartitionere:根据// mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output") val rdd1 = sc.makeRDD(List((1, 1), (1, 2), (2, 3), (2, 4))) // reduceByKey:相同key聚合:分区内的聚合 rdd1.reduceByKey((x:Int, y:Int) => x + y).collect().foreach(println) // groupByKey:相同key一个组 rdd1.groupByKey().collect().foreach(println) // aggregateByKey:分区内和分区间分离开 // 结果取决于传入的第一个值的类型 rdd1.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println) // 规则一样的都是(x + y) => x + y rdd1.aggregateByKey(0)(_+_, _+_).collect().foreach(println) // 如果计算规则一样的话为foldByKey rdd1.foldByKey(0)(_+_).collect().foreach(println) // combineByKey:相同key的第一条数据的处理函数,分区内,分区间 rdd1.combineByKey(v => v, (x:Int, y) => x + y, (x: Int, y: Int) => x + y).collect().foreach(println) // reduceByKey, foldByKey, aggregateByKey, combineByKey //都为wordcount sc.stop() }}
4.两个key-value
join,leftOuterJion,rightOuterJoin,cogroup
// 两个key-value // join:会笛卡尔积,几何倍增长,内连接 val rdd2 = sc.makeRDD(List(("A", 1), ("A", 2), ("A", 3), ("B", 4))) val rdd3 = sc.makeRDD(List(("A", 6), ("A", 9), ("A", 0), ("B", 1))) rdd2.join(rdd3).collect().foreach(println) // 左外连接,右外连接:leftOuterJoin, rightOuterJoin rdd2.rightOuterJoin(rdd3).collect().foreach(println) // cogroup:connect + group:相同的key放在一个组,然后group rdd2.cogroup(rdd3).collect().foreach(println)
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~