RDD的特性 ---- RDD的checkpoint

网友投稿 308 2022-08-25

RDD的特性 ---- RDD的checkpoint

RDD的特性 ---- RDD的checkpoint

一、Checkpoint的作用

Checkpooint的主要作用是斩断Rdd的依赖链,并将数据存储在引擎中,例如分布式存储和副本机制的HDFS中。本质上Checkpoint也是一种缓存机制。

1.Checkpoint的方式

2.什么是斩断依赖链

3.Checkpoint和cache的区别

注意:

最本质的区别在于Checkpoint将RDD计算出来存储在可靠的HDFS上,并且可以斩断依赖链,若出错可以直接通过赋值HDFS中的文件实现容错。 Cache是将RDD存放至内存中,并且未斩断依赖链,若出错只能通过依赖链回溯上一级RDD,重放计算。

二、Checkpoint的使用

@Test def test: Unit ={ val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]") val sc = new SparkContext(conf) // 设置保存 checkPoint 的目录 sc.setCheckpointDir("checkpoint") val resource = sc.textFile("src/main/scala/RDD的缓存/ip.txt") val ipRDD = resource.map( item => ( item.split(",")(0) , 1) ) val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) ) var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg ) // checkPoint // aggRDD = aggRDD.cache() // 不准确的说 checkPoint 是一个Action操作, // 也就是说调用了 checkPoint 方法后会重新计算一遍RDD的值,然后把结果存在HDFS或本地目录中 // 所以因该在 checkPoint 之前进行一次 cache() aggRDD = aggRDD.cache() aggRDD.checkpoint() val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1 val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2 println("max:"+maxRDD,"min:"+minRDD) }

不准确的说 checkPoint 是一个Action操作,也就是说调用了 checkPoint 方法后会重新计算一遍RDD的值,然后把结果存在HDFS或本地目录中,这个过程中就又涉及到资源内存消耗的问题。所以因该在 checkPoint 之前进行一次 cache(),存储的是最后一次转换操作的结果,若后面计算有误,直接去从存储路径的文件中查找文件重新计算。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:RDD的特性 ---- RDD的缓存
下一篇:12强赛收官战,国足重点研究阿曼角球战术!(世预赛国足平阿曼)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~