第一个spark应用开发详解(java版)

网友投稿 287 2022-11-29

第一个spark应用开发详解(java版)

欢迎访问我的GitHub

WordCount是大数据学习最好的入门demo,今天就一起开发java版本的WordCount,然后提交到Spark2.3.2环境运行;

版本信息

操作系统:CentOS7; JDK:1.8.0_191; Spark:2.3.3; Scala:2.11.12; Hadoop:2.7.7; Maven:3.5.0;

关于hadoop环境

本次实战用到了hadoop的hdfs,关于hadoop的部署,请参考《Linux部署hadoop2.7.7集群》

关于spark环境

本次实战用到了spark2.3.3,关于spark集群的部署,请参考《部署spark2.2集群(standalone模式)》请注意,由于2.3.3版本的spark-core的jar包不支持scala2.12,所以在部署spark的时候,scala版本请使用2.11;

关于本次实战开发的应用

本次实战开发的应用是经典的WorkCount,也就是指定一个文本文件,统计其中每个单词出现的次数,再取出现次数最多的10个,打印出来,并保存在hdfs文件中;

本次统计单词数用到的文本

本次用到的txt文件,是我在网上找到的pdf版本的《乱世佳人》英文版,然后导出为txt,读者您可以自行选择适合的txt文件来测试; 在hdfs服务所在的机器上执行以下命令,创建input文件夹:

~/hadoop-2.7.7/bin/hdfs dfs -mkdir /input

在hdfs服务所在的机器上执行以下命令,创建output文件夹:

~/hadoop-2.7.7/bin/hdfs dfs -mkdir /output

把本次用到的text文件上传到hdfs服务所在的机器,再执行以下命令将文本文件上传到hdfs的/input文件夹下:

~/hadoop-2.7.7/bin/hdfs dfs -put ~/GoneWiththeWind.txt /input

源码下载

接下来详细讲述应用的编码过程,如果您不想自己写代码,也可以在GitHub下载完整的应用源码,地址和链接信息如下表所示:

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

package com.bolingcavalry.sparkwordcount; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; /** * @Description: spark的WordCount实战 * @author: willzhao E-mail: zq2599@gmail.com * @date: 2019/2/8 17:21 */ public class WordCount { public static void main(String[] args) { String hdfsHost = args[0]; String hdfsPort = args[1]; String textFileName = args[2]; SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); String hdfsBasePath = "hdfs://" + hdfsHost + ":" + hdfsPort; //文本文件的hdfs路径 String inputPath = hdfsBasePath + "/input/" + textFileName; //输出结果文件的hdfs路径 String outputPath = hdfsBasePath + "/output/" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); System.out.println("input path : " + inputPath); System.out.println("output path : " + outputPath); //导入文件 JavaRDD textFile = javaSparkContext.textFile(inputPath); JavaPairRDD counts = textFile //每一行都分割成单词,返回后组成一个大集合 .flatMap(s -> Arrays.asList(s.split(" ")).iterator()) //key是单词,value是1 .mapToPair(word -> new Tuple2<>(word, 1)) //基于key进行reduce,逻辑是将value累加 .reduceByKey((a, b) -> a + b); //先将key和value倒过来,再按照key排序 JavaPairRDD sorts = counts //key和value颠倒,生成新的map .mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1())) //按照key倒排序 .sortByKey(false); //取前10个 List> top10 = sorts.take(10); //打印出来 for(Tuple2 tuple2 : top10){ System.out.println(tuple2._2() + "\t" + tuple2._1()); } //分区合并成一个,再导出为一个txt保存在hdfs javaSparkContext.parallelize(top10).coalesce(1).saveAsTextFile(outputPath); //关闭context javaSparkContext.close(); } }

在pom.xml目录下执行以下命令,编译构建jar包:

mvn clean package -Dmaven.test.skip=true

构建成功后,在target目录下生成文件sparkwordcount-1.0-SNAPSHOT.jar,上传到spark服务器的~/jars/目录下; 假设spark服务器的IP地址为192.168.119.163,在spark服务器执行以下命令即可提交任务:

~/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \ --master spark://192.168.119.163:7077 \ --class com.bolingcavalry.sparkwordcount.WordCount \ --executor-memory 512m \ --total-executor-cores 2 \ ~/jars/sparkwordcount-1.0-SNAPSHOT.jar \ 192.168.119.163 \ 8020 \ GoneWiththeWind.txt

上述命令的最后三个参数,是java的main方法的入参,具体的使用请参照WordCount类的源码; 提交成功后立即开始执行任务,看到类似如下信息表示任务完成:

2019-02-08 21:26:04 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2019-02-08 21:26:04 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2019-02-08 21:26:04 INFO SparkContext:54 - Successfully stopped SparkContext 2019-02-08 21:26:04 INFO ShutdownHookManager:54 - Shutdown hook called 2019-02-08 21:26:04 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c3e2ea9e-7daf-4cab-a207-26f0a0394017 2019-02-08 21:26:04 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-d60e4d75-4189-4f33-a5e2-fbe9b06bdae7

往前翻滚一下控制台输出的信息,如下所示,可以见到单词统计的前十名已经输出在控制台了: 2019-02-08 21:36:15 INFO DAGScheduler:54 - Job 1 finished: take at WordCount.java:61, took 0.313008 s the 18264 and 14150 to 10020 of 8615 a 7571 her 7086 she 6217 was 5912 in 5751 had 4502 2019-02-08 21:36:15 INFO deprecation:1173 - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 2019-02-08 21:36:15 INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1 在hdfs服务器执行查看文件的命令,可见/output下新建了子目录20190208213610:

[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output Found 1 items drwxr-xr-x - hadoop supergroup 0 2019-02-08 21:36 /output/20190208213610

查看子目录,发现里面有两个文件:

[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output/20190208213610 Found 2 items -rw-r--r-- 3 hadoop supergroup 0 2019-02-08 21:36 /output/20190208213610/_SUCCESS -rw-r--r-- 3 hadoop supergroup 108 2019-02-08 21:36 /output/20190208213610/part-00000

上面看到的/output/20190208213610/part-00000就是输出结果,用cat命令查看其内容:

[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -cat /output/20190208213610/part-00000 (18264,the) (14150,and) (10020,to) (8615,of) (7571,a) (7086,her) (6217,she) (5912,was) (5751,in) (4502,had)

可见与前面控制台输出的一致; 在spark的web页面,可见刚刚执行的任务信息:

至此,第一个spark应用的开发和运行就完成了,接下来的文章中,咱们一起来完成更多的spark实战;

欢迎关注51CTO博客:程序员欣宸

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:/usr/share/neo4j/bin/neo4j: line 451: /var/run/neo4j/neo4j.pid: No such file or directory
下一篇:解决使用@RequestParam注解和泛型遇到的问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~