记一次 spark rdd 写数据到 hbase 报 NPE 的问题排查

网友投稿 298 2022-12-01

记一次 spark rdd 写数据到 hbase 报 NPE 的问题排查

文章目录

​​前言​​​​问题排查​​

前言

最近我们的 ​​EMR​​​ 集群要从 ​​spark2.3.2​​​ 升级到 ​​spark2.4.3​​​ 来解决 ​​spark​​​ 小文件多的问题。但是在升级后发现之前 ​​spark rdd​​​ 写 ​​hbase​​ 正常的任务报错了,花费一番功夫才解决,所以写篇文章记录下。

首先看下报错内容

CONSOLE# 20/06/09 14:26:48 ERROR Start: 同步数据异常CONSOLE# java.lang.NullPointerExceptionCONSOLE# at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:123)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)CONSOLE# at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)CONSOLE# at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:391)CONSOLE# at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)CONSOLE# at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)CONSOLE# at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)CONSOLE# at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)CONSOLE# at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)CONSOLE# at com.tuya.earth.sink.HbaseSink.saveToHbase(HbaseSink.scala:137)CONSOLE# at com.tuya.earth.sink.HbaseSink.insert(HbaseSink.scala:43)CONSOLE# at com.tuya.earth.Flow.lambda$sink$2(Flow.java:63)CONSOLE# at java.util.ArrayList.forEach(ArrayList.java:1257)CONSOLE# at com.tuya.earth.Flow.sink(Flow.java:60)CONSOLE# at com.tuya.earth.Flow.start(Flow.java:36)CONSOLE# at com.tuya.earth.Start.main(Start.java:20)CONSOLE# at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)CONSOLE# at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)CONSOLE# at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)CONSOLE# at java.lang.reflect.Method.invoke(Method.java:498)CONSOLE# at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)CONSOLE# at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)CONSOLE# at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)CONSOLE# at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)CONSOLE# at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)CONSOLE# at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)CONSOLE# at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)CONSOLE# at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

该程序是一个由 ​​hive->hbase​​​ 同步任务,同步工具是我们内部使用 ​​spark​​​ 代码开发的,支持​​es,mysql,kafka,hbase,hive​​的数据互相同步。

问题排查

起初报了这个 ​​NPE​​​,首先想到的是不是由于 ​​spark​​​ 版本升级到 ​​2.4.3​​​ 了,扩展 ​​jar​​​ 的 ​​hbase​​ 版本也变了,于是就到服务器上看了下。

首先看了 ​​2.3.2​​ 集群的

[hadoop@172 jars]$ ll | grep hbase-rw-r--r-- 1 hadoop hadoop 20846 Jan 21 2019 hbase-annotations-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 1332395 Jan 21 2019 hbase-client-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 590351 Jan 21 2019 hbase-common-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 230728 Jan 21 2019 hbase-common-1.3.1-tests.jar-rw-r--r-- 1 hadoop hadoop 131409 Jan 21 2019 hbase-examples-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 19383 Jan 21 2019 hbase-external-blockcache-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 117596 Jan 21 2019 hbase-hadoop2-compat-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 43912 Jan 21 2019 hbase-hadoop-compat-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 13030 Jan 21 2019 hbase-it-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 102340 Jan 21 2019 hbase-prefix-tree-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 125967 Jan 21 2019 hbase-procedure-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 4513305 Jan 21 2019 hbase-protocol-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 55060 Jan 21 2019 hbase-resource-bundle-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 438777 Jan 21 2019 hbase-rest-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 4399312 Jan 21 2019 hbase-server-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 12889 Jan 21 2019 hbase-shell-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 2801459 Jan 21 2019 hbase-thrift-1.3.1.jar-rw-r--r-- 1 hadoop hadoop 113540 Jan 3 14:34 hive-hbase-handler-2.3.3.jar

​​2.4.3​​集群

[hadoop@172 jars]$ ll | grep hbase-rw-r--r-- 1 hadoop hadoop 21221 Feb 21 12:03 hbase-annotations-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 1404274 Feb 21 12:03 hbase-client-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 620026 Feb 21 12:03 hbase-common-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 241949 Feb 21 12:03 hbase-common-1.4.9-tests.jar-rw-r--r-- 1 hadoop hadoop 136241 Feb 21 12:03 hbase-hadoop2-compat-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 37120 Feb 21 12:03 hbase-hadoop2-compat-1.4.9-tests.jar-rw-r--r-- 1 hadoop hadoop 51450 Feb 21 12:03 hbase-hadoop-compat-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 29309 Feb 21 12:03 hbase-metrics-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 21803 Feb 21 12:03 hbase-metrics-api-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 103320 Feb 21 12:03 hbase-prefix-tree-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 131122 Feb 21 12:03 hbase-procedure-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 4950745 Feb 21 12:03 hbase-protocol-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 4686217 Feb 21 12:03 hbase-server-1.4.9.jar-rw-r--r-- 1 hadoop hadoop 113942 Feb 28 23:00 hive-hbase-handler-2.3.5.jar

明显看到,随着 ​​spark​​​ 版本从 ​​2.3.2​​​ 升级到 ​​2.4.3​​​, ​​hbase​​​ 的版本从 ​​1.3.1​​​升到了​​1.4.9​​​。 那么就是版本的问题了,然后就是根据上面 ​​​NPE​​​ 的堆栈信息,查看两个版本 ​​hbase​​ 代码的区别。

CONSOLE# at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:123)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)CONSOLE# at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)CONSOLE# at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)

根据堆栈发现其中抛 ​​NPE​​​ 异常的代码就在 ​​UserProvider​​​ 类的 ​​123​​ 行,该行的代码是

public static UserProvider instantiate(Configuration conf) { //123行代码 Class clazz = conf.getClass(USER_PROVIDER_CONF_KEY, UserProvider.class, UserProvider.class); return ReflectionUtils.newInstance(clazz, conf); }

仔细看代码,可能为 ​​null​​​ 的地方只有是 ​​conf​​​ 实例,根据堆栈信息向上排查,发现 ​​conf​​​ 实例是从 ​​TableOutputFormat.checkOutputSpecs​​ 传进来的

​​1.4.9​​​ 版本的 ​​TableOutputFormat​​ 类

@Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { //getConf() 可能为 null try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) { TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE)); if (!admin.tableExists(tableName)) { throw new TableNotFoundException("Can't write, table does not exist:" + tableName.getNameAsString()); } if (!admin.isTableEnabled(tableName)) { throw new TableNotEnabledException("Can't write, table is not enabled: " + tableName.getNameAsString()); } } } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration otherConf) { String tableName = otherConf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } String address = otherConf.get(QUORUM_ADDRESS); int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); String serverClass = otherConf.get(REGION_SERVER_CLASS); String serverImpl = otherConf.get(REGION_SERVER_IMPL); try { this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); if (serverClass != null) { this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } } catch(IOException e) { LOG.error(e); throw new RuntimeException(e); } }

查看了 ​​TableOutputFormat​​​ 类,发现并没有什么异常的地方。于是看了下​​hbase1.3.1​​​ 版本,发现该类的 ​​checkOutputSpecs​​ 是个空实现。

@Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // TODO Check if the table exists? }

这就很尴尬了,对比两个版本目前无法帮助到我们。但是有一点很明确,只要调用了 ​​setConf​​​ 就不会为 ​​null​​​,难道是没调用 ​​setConf​​​ 方法?带着这个问题,继续往下看堆栈,直到进入​​spark sql​​​的​​PairRDDFunctions.saveAsNewAPIHadoopDataset​​ 方法

saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = NewAPIHadoopJob.getInstance(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id val jobConfiguration = job.getConfiguration val wrappedConf = new SerializableConfiguration(jobConfiguration) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } ##省略下面代码}

果然是没有调用 ​​TableOutputFormat.setConf​​​ 方法,但是 ​​spark-sql​​ 这边好像也有个开关来开启是否进行检测输出表是否存在。

private def isOutputSpecValidationEnabled: Boolean = { val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) enabledInConf && !validationDisabled }

于是我尝试着在同步任务提交时加上 ​​--conf spark.hadoop.validateOutputSpecs=false​​​ 参数 ,任务果然正常了。 算是临时解决的这个问题。

对于我来说,到这里当然不算结束了,因为不管是spark还是hbase应该是发现了这个问题才对,毕竟hbase都2.2.5版本了。于是我到github clone了hbase的源码,在master分支我发现他们已经解决了这个bug

@Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { Configuration hConf = getConf(); if (hConf == null) { hConf = context.getConfiguration(); } try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) { TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); if (!admin.tableExists(tableName)) { throw new TableNotFoundException("Can't write, table does not exist:" + tableName.getNameAsString()); } if (!admin.isTableEnabled(tableName)) { throw new TableNotEnabledException("Can't write, table is not enabled: " + tableName.getNameAsString()); } } }

解决方式也很简单,因为我们在写 ​​spark rdd​​​ 写 ​​hbase​​​ 的代码时,会把配置放到 ​​JobContext​​​ ,所以 ​​hbase​​​ 的相关配置事可以从 ​​JobContext​​​ 获取的。本来想提个 ​​pr​​​ 的。。哈哈,发现的太晚了,​​hbase​​​ 在 ​​2.0.0​​​ 版本就已经修复了这个问题。 其实 ​​​spark2.4.3​​​ 也有个 ​​UDAF​​​ 的 ​​bug​​ ,虽然解决了,但是没找到原因,等找到原因了,再发一篇~

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Springboot自动加载配置的原理解析
下一篇:hera源码剖析:一次任务触发的执行流程
相关文章

 发表评论

暂时没有评论,来抢沙发吧~