需要将一个Flink任务从1.13升级到1.14,更换依赖后编辑正常,可是会出现HBase的异常:
[info]java.io.IOException: java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:232)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:157)
at com.quwan.dao.HbaseDao.getConnection(HbaseDao.java:338)
at com.quwan.dao.HbaseDao.getHbaseTable(HbaseDao.java:312)
at com.quwan.dao.HbaseDao.getSceneListIdV2(HbaseDao.java:287)
at com.quwan.flink.process.RuleCoProcess.queryWhiteList(RuleCoProcess.java:237)
at SC.eval0(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.janino.ScriptEvaluator.evaluate(ScriptEvaluator.java:798)
at org.codehaus.janino.ScriptEvaluator.evaluate(ScriptEvaluator.java:790)
at com.quwan.flink.process.RuleCoProcess.processElement1(RuleCoProcess.java:117)
at com.quwan.flink.process.RuleCoProcess.processElement1(RuleCoProcess.java:30)
at org.apache.flink.streaming.api.operators.co.CoProcessOperator.processElement1(CoProcessOperator.java:69)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
at org.apache.hadoop.hbase.security.User$SecureHadoopUser.runAs(User.java:347)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:228)
... 31 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.lambda$createConnection$0(ConnectionFactory.java:230)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
... 33 more
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hbase.client.ConnectionImplementation.close(ConnectionImplementation.java:2004)
at org.apache.hadoop.hbase.client.ConnectionImplementation.
(ConnectionImplementation.java:330)
... 41 more[/info]
根据日志找到HBase中的ConnectionImplementation类的构造方法的cache中的close()报错:
[info]ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
........
try {
this.registry = AsyncRegistryFactory.getRegistry(conf);
retrieveClusterId();
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
// Do we publish the status?
if (shouldListen) {
if (listenerClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
} else {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
}
}
} catch (Throwable e) {
// avoid leaks: registry, rpcClient, ...
LOG.debug("connection construction failed", e);
close();
throw e;
}
}[/info]
再看close()方法报错的那行:
[info]public void close() {
if (this.closed) {
return;
}
closeMaster();
shutdownPools();
if (this.metrics != null) {
this.metrics.shutdown();
}
this.closed = true;
registry.close();
this.stubs.clear();
if (clusterStatusListener != null) {
clusterStatusListener.close();
}
if (rpcClient != null) {
rpcClient.close();
}
if (authService != null) {
authService.shutdown();
}
}[/info]
可以发现是registry变量为null导致的,但是registry为什么会为null,一步一步调试发现是ZKAsyncRegistry类中的ReadOnlyZKClient类无法初始化。
看了jar包里面这个类是正常打进去了的,然后这个类是zk相关的类,就看了下zk的依赖是不是正常,结果发现zk的类竟然没有打到包里,在1.13下应该是正常打进去的。
最后排查发现hbase-client包中的zk是被exclude排除的的,但是在1.13下依赖的flink中包含了zk,所以是正常的,但是在1.14下flink的包中没有zk就导致了异常。
HBase的ConnectionImplementation类中的close()方法对registry对象没有判空就进行close(),导致最终报出的异常不是错误的根本原因(ReadOnlyZKClient类无法初始化),
而是空指针异常,需要打开Debug日志才能发现原本的错误。