c语言sscanf函数的用法是什么
298
2022-12-01
记一次 spark rdd 写数据到 hbase 报 NPE 的问题排查
文章目录
前言问题排查
前言
最近我们的 EMR 集群要从 spark2.3.2 升级到 spark2.4.3 来解决 spark 小文件多的问题。但是在升级后发现之前 spark rdd 写 hbase 正常的任务报错了,花费一番功夫才解决,所以写篇文章记录下。
首先看下报错内容
CONSOLE# 20/06/09 14:26:48 ERROR Start: 同步数据异常CONSOLE# java.lang.NullPointerExceptionCONSOLE# at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:123)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)CONSOLE# at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)CONSOLE# at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:391)CONSOLE# at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)CONSOLE# at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)CONSOLE# at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)CONSOLE# at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)CONSOLE# at com.tuya.earth.sink.HbaseSink.saveToHbase(HbaseSink.scala:137)CONSOLE# at com.tuya.earth.sink.HbaseSink.insert(HbaseSink.scala:43)CONSOLE# at com.tuya.earth.Flow.lambda$sink$2(Flow.java:63)CONSOLE# at java.util.ArrayList.forEach(ArrayList.java:1257)CONSOLE# at com.tuya.earth.Flow.sink(Flow.java:60)CONSOLE# at com.tuya.earth.Flow.start(Flow.java:36)CONSOLE# at com.tuya.earth.Start.main(Start.java:20)CONSOLE# at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)CONSOLE# at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)CONSOLE# at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)CONSOLE# at java.lang.reflect.Method.invoke(Method.java:498)CONSOLE# at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)CONSOLE# at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)CONSOLE# at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)CONSOLE# at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)CONSOLE# at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)CONSOLE# at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)CONSOLE# at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)CONSOLE# at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
该程序是一个由 hive->hbase 同步任务,同步工具是我们内部使用 spark 代码开发的,支持es,mysql,kafka,hbase,hive的数据互相同步。
问题排查
起初报了这个 NPE,首先想到的是不是由于 spark 版本升级到 2.4.3 了,扩展 jar 的 hbase 版本也变了,于是就到服务器上看了下。
首先看了 2.3.2 集群的
[hadoop@172 jars]$ ll | grep hbase-rw-r--r-- 1 hadoop hadoop 20846 Jan 21 2019 hbase-annotations-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 1332395 Jan 21 2019 hbase-client-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 590351 Jan 21 2019 hbase-common-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 230728 Jan 21 2019 hbase-common-1.3.1-tests.jar-rw-r--r-- 1 hadoop hadoop 131409 Jan 21 2019 hbase-examples-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 19383 Jan 21 2019 hbase-external-blockcache-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 117596 Jan 21 2019 hbase-hadoop2-compat-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 43912 Jan 21 2019 hbase-hadoop-compat-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 13030 Jan 21 2019 hbase-it-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 102340 Jan 21 2019 hbase-prefix-tree-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 125967 Jan 21 2019 hbase-procedure-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 4513305 Jan 21 2019 hbase-protocol-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 55060 Jan 21 2019 hbase-resource-bundle-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 438777 Jan 21 2019 hbase-rest-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 4399312 Jan 21 2019 hbase-server-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 12889 Jan 21 2019 hbase-shell-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 2801459 Jan 21 2019 hbase-thrift-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 113540 Jan 3 14:34 hive-hbase-handler-2.3.3.jar
2.4.3集群
[hadoop@172 jars]$ ll | grep hbase-rw-r--r-- 1 hadoop hadoop 21221 Feb 21 12:03 hbase-annotations-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 1404274 Feb 21 12:03 hbase-client-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 620026 Feb 21 12:03 hbase-common-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 241949 Feb 21 12:03 hbase-common-1.4.9-tests.jar-rw-r--r-- 1 hadoop hadoop 136241 Feb 21 12:03 hbase-hadoop2-compat-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 37120 Feb 21 12:03 hbase-hadoop2-compat-1.4.9-tests.jar-rw-r--r-- 1 hadoop hadoop 51450 Feb 21 12:03 hbase-hadoop-compat-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 29309 Feb 21 12:03 hbase-metrics-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 21803 Feb 21 12:03 hbase-metrics-api-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 103320 Feb 21 12:03 hbase-prefix-tree-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 131122 Feb 21 12:03 hbase-procedure-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 4950745 Feb 21 12:03 hbase-protocol-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 4686217 Feb 21 12:03 hbase-server-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 113942 Feb 28 23:00 hive-hbase-handler-2.3.5.jar
明显看到,随着 spark 版本从 2.3.2 升级到 2.4.3, hbase 的版本从 1.3.1升到了1.4.9。 那么就是版本的问题了,然后就是根据上面 NPE 的堆栈信息,查看两个版本 hbase 代码的区别。
CONSOLE# at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:123)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)CONSOLE# at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
根据堆栈发现其中抛 NPE 异常的代码就在 UserProvider 类的 123 行,该行的代码是
public static UserProvider instantiate(Configuration conf) { //123行代码 Class extends UserProvider> clazz = conf.getClass(USER_PROVIDER_CONF_KEY, UserProvider.class, UserProvider.class); return ReflectionUtils.newInstance(clazz, conf); }
仔细看代码,可能为 null 的地方只有是 conf 实例,根据堆栈信息向上排查,发现 conf 实例是从 TableOutputFormat.checkOutputSpecs 传进来的
1.4.9 版本的 TableOutputFormat 类
@Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { //getConf() 可能为 null try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) { TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE)); if (!admin.tableExists(tableName)) { throw new TableNotFoundException("Can't write, table does not exist:" + tableName.getNameAsString()); } if (!admin.isTableEnabled(tableName)) { throw new TableNotEnabledException("Can't write, table is not enabled: " + tableName.getNameAsString()); } } } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration otherConf) { String tableName = otherConf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } String address = otherConf.get(QUORUM_ADDRESS); int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); String serverClass = otherConf.get(REGION_SERVER_CLASS); String serverImpl = otherConf.get(REGION_SERVER_IMPL); try { this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); if (serverClass != null) { this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } } catch(IOException e) { LOG.error(e); throw new RuntimeException(e); } }
查看了 TableOutputFormat 类,发现并没有什么异常的地方。于是看了下hbase1.3.1 版本,发现该类的 checkOutputSpecs 是个空实现。
@Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // TODO Check if the table exists? }
这就很尴尬了,对比两个版本目前无法帮助到我们。但是有一点很明确,只要调用了 setConf 就不会为 null,难道是没调用 setConf 方法?带着这个问题,继续往下看堆栈,直到进入spark sql的PairRDDFunctions.saveAsNewAPIHadoopDataset 方法
saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = NewAPIHadoopJob.getInstance(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id val jobConfiguration = job.getConfiguration val wrappedConf = new SerializableConfiguration(jobConfiguration) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } ##省略下面代码}
果然是没有调用 TableOutputFormat.setConf 方法,但是 spark-sql 这边好像也有个开关来开启是否进行检测输出表是否存在。
private def isOutputSpecValidationEnabled: Boolean = { val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) enabledInConf && !validationDisabled }
于是我尝试着在同步任务提交时加上 --conf spark.hadoop.validateOutputSpecs=false 参数 ,任务果然正常了。 算是临时解决的这个问题。
对于我来说,到这里当然不算结束了,因为不管是spark还是hbase应该是发现了这个问题才对,毕竟hbase都2.2.5版本了。于是我到github clone了hbase的源码,在master分支我发现他们已经解决了这个bug
@Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { Configuration hConf = getConf(); if (hConf == null) { hConf = context.getConfiguration(); } try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) { TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); if (!admin.tableExists(tableName)) { throw new TableNotFoundException("Can't write, table does not exist:" + tableName.getNameAsString()); } if (!admin.isTableEnabled(tableName)) { throw new TableNotEnabledException("Can't write, table is not enabled: " + tableName.getNameAsString()); } } }
解决方式也很简单,因为我们在写 spark rdd 写 hbase 的代码时,会把配置放到 JobContext ,所以 hbase 的相关配置事可以从 JobContext 获取的。本来想提个 pr 的。。哈哈,发现的太晚了,hbase 在 2.0.0 版本就已经修复了这个问题。 其实 spark2.4.3 也有个 UDAF 的 bug ,虽然解决了,但是没找到原因,等找到原因了,再发一篇~
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~