Apache Flink

什么是 Flink?

 Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.

核心组件布局

环境部署

基础环境

Linux

用户
1
2
3
4
5
6
7
8
9
10
11
12
13
# 增加用户,并赋予其密码
$ adduser flink
$ passwd flink # ur password for flink user
# 赋予用户可以 sudo 的权限
$ chmod u+w /etc/sudoers
$ vim /etc/sudoers
# 找到 `root ALL=(ALL) ALL` 这行,并在下面添加 flink 用户
flink ALL=(ALL) ALL
$ chmod u-w /etc/sudoers
# 切换到 flink 用户
$ su - flink
目录
1
2
3
4
$ cd /home/flink
# 存放软件目录 & 安装目录 & 日志目录 & 临时目录
$ mkdir install && mkdir software && mkdir logs && mkdir data

依赖

JDK

 Download from jdk-8u131-linux-x64.tar.gz

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cd ~/install
$ tar zxvf jdk-8u131-linux-x64.tar.gz -C ~/software/
$ cd ~/software
$ ln -s jdk-8u131-linux-x64 java
$ vim ~/.bash_profile
JAVA_HOME=/home/flink/software/java
FLINK_HOME=/home/flink/software/flink
CLASSPATH=.:$JAVA_HOME/lib/tools.jar
PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH
$ source ~/.bash_profile

 如果需要清除之前的低版本 JDK,或者重装,可以参照 (没有这个需求,可跳过) :

1
2
3
4
$ rpm -qa | grep -iE 'jdk|java'
jdk-1.7.0_80-fcs.x86_64
$ sudo rpm -e --nodeps jdk-1.7.0_80-fcs.x86_64

Local Cluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 下载安装包
$ cd /home/flink/install
$ wget http://archive.apache.org/dist/flink/flink-1.3.1/flink-1.3.1-bin-hadoop2-scala_2.11.tgz
$ wget http://archive.apache.org/dist/flink/flink-1.3.1/flink-1.3.1-bin-hadoop2-scala_2.11.tgz.md5
$ head -n 6 flink-1.3.1-bin-hadoop2-scala_2.11.tgz.md5
161e51c0b78d1fdf196f1c53c112a37f flink-1.3.1-bin-hadoop2-scala_2.11.tgz
$ md5sum flink-1.3.1-bin-hadoop2-scala_2.11.tgz | tr "a-z" "A-Z"
161E51C0B78D1FDF196F1C53C112A37F FLINK-1.3.1-BIN-HADOOP2-SCALA_2.11.TGZ
# 对比 MD5 码一致后进行解压安装
$ tar zxvf flink-1.3.1-bin-hadoop2-scala_2.11.tgz -C ~/software/
$ cd ~/software/ && ln -s flink-1.3.1/ flink
$ cd flink/
$ bin/flink -v
Version: 1.3.1, Commit ID: 1ca6e5b
# 启动 Local Cluster
$ bin/start-local.sh
# WEB UI
$ http://localhost:8081

分布式集群

节点分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ ssh flink@yuzhouwan00
$ rsync -avuz -e ssh /home/flink/ flink@yuzhouwan01:/home/flink
$ rsync -avuz -e ssh /home/flink/ flink@yuzhouwan02:/home/flink
$ rsync -avuz -e ssh /home/flink/ flink@yuzhouwan03:/home/flink
# 在各个节点上,激活环境变量
$ source ~/.bash_profile
$ java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
$ flink -v
Version: 1.3.1, Commit ID: 1ca6e5b

免密登录

1
2
3
4
5
6
7
8
9
10
11
12
13
# 因为 Flink 集群中会用到 `sshd` 命令来管理和操作远程组件
$ ssh flink@yuzhouwan01
$ cd ~/.ssh # 如果没有改目录,则需要先执行 `ssh localhost`
$ ssh-keygen -t rsa
$ cat ./id_rsa.pub >> ./authorized_keys
$ scp ~/.ssh/id_rsa.pub flink@yuzhouwan02:/home/flink
$ scp ~/.ssh/id_rsa.pub flink@yuzhouwan03:/home/flink
# 在 Slave 节点上依次执行以下命令
$ ssh flink@yuzhouwan02
$ mkdir ~/.ssh
$ cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
$ rm ~/id_rsa.pub

配置集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ cd $FLINK_HOME
$ vim conf/flink-conf.yaml
jobmanager.rpc.address: yuzhouwan01
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
parallelism.default: 12
taskmanager.tmp.dirs: /home/flink/data
# 配置 slave 列表
$ vim conf/slaves
yuzhouwan02
yuzhouwan03
$ scp -r /home/flink/software/flink-1.3.1/conf/ flink@yuzhouwan02:/home/flink/software/flink/conf/
$ scp -r /home/flink/software/flink-1.3.1/conf/ flink@yuzhouwan03:/home/flink/software/flink/conf/

启动集群

1
2
3
4
5
6
7
8
$ start-cluster.sh
Starting cluster.
Starting jobmanager daemon on host yuzhouwan01.
Starting taskmanager daemon on host yuzhouwan02.
Starting taskmanager daemon on host yuzhouwan03.
# WEB UI
$ http://yuzhouwan01:8081

发布应用

Local

 使用 ExecutionEnvironment.createLocalEnvironment(parallelism) 获取程序执行环境,本地执行 main 函数即可完成发布

Remote

 可将本机为 Client,提交任务到远程集群执行,对应获取程序执行环境的代码如下

1
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/yuzhouwan.jar");

Cluster

 还有一种方法,可以将程序提交至远程集群上,通过 ExecutionEnvironment.getExecutionEnvironment() 方法 获取执行环境,并通过 maven-assembly-plugin 插件,将程序打包成单个可执行的 jar 包,并在 WEB UI 上找到 Submit new Job 模块,完成上传、展示执行和最终提交任务至集群

1
2
3
4
5
6
7
8
9
10
11
12
13
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.yuzhouwan.process.ServerLogProcess</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

Tips: tail -f log/flink-flink-jobmanager-0-federation01.log 可以查看任务失败详情

实战技巧

 当前 Flink v1.3.1 的 CEP (Complex event processing for Flink) 功能相比 Esper / Siddhi 之类的成熟 CEP框架,可能还是无法满足某些应用场景 (如,groupby、aggregation 等聚合操作)。先前,有 @haoch 开发的一套 flink-siddhi 将 Siddhi整合到 Flink里面,不过 PR 因为 Apache Bahir的一位 Commitor 提议将这个功能放到 Bahir-Flink 里,导致 flink-siddhi 功能提交进度推迟了。好在,Flink目前正在参考 Oracle’s SQL for pattern matching,设计 CEP on SQL 的方案,目前看来,还是很值得期待的

组件

事件流 (Event stream)
模式定义 (Pattern definition)
单独模式
  • Begin
  • Filter
  • Subtype
  • OR
  • Continuity
  • Within
模式检测 (Pattern detection)
告警生成 (Alert generation)

性能调优

G1GC

1
2
$ vim conf/flink-conf.yaml
env.java.opts: -XX:+UseG1GC -XX:G1HeapRegionSize=4M -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=45 -XX:ParallelGCThreads=8

踩过的坑

ElasticSearch Connector

1
2
3
4
5
6
7
8
$ git fetch --tags
$ git checkout tags/release-1.3.1
$ D:\apps\Java\jdk1.8.0_111\bin\java -Dmaven.multiModuleProjectDirectory=E:\Github\Flink\asdf2014\flink-connectors\flink-connector-elasticsearch5 -Dmaven.home=D:\apps\maven\apache-maven-3.3.9 -Dclassworlds.conf=D:\apps\maven\apache-maven-3.3.9\bin\m2.conf -Didea.launcher.port=7538 "-Didea.launcher.bin.path=D:\apps\JetBrains\IntelliJ IDEA 2016.2.5\bin" -Dfile.encoding=UTF-8 -classpath "D:\apps\maven\apache-maven-3.3.9\boot\plexus-classworlds-2.5.2.jar;D:\apps\JetBrains\IntelliJ IDEA 2016.2.5\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain org.codehaus.classworlds.Launcher -Didea.version=2016.2.5 -T 1C -DskipTests=true install -P scala-2.11,!scala-2.10,!include-yarn-tests
# -Dscala-2.11 指定 profile 失效,是因为 v1.3.1 里面的问题,已经将 scala version 写死了,需要修改 pom 文件,参考 1.4.0-SNAPSHOT
# 此时,应该能在 $MAVEN_HOME/repository 中找到 flink-connector-elasticsearch5_2.11-1.3.1.jar 了,在 maven 中使用就不再报错了;如果没能成功,就需要 install:install-file 的方式进行导入了
$ mvn install:install-file -Dfile=D:\apps\maven\repository\org\apache\flink\flink-connector-elasticsearch5_2.11\1.3.1\flink-connector-elasticsearch5_2.11-1.3.1.jar -DgroupId=org.apache.flink -DartifactId=flink-connector-elasticsearch5_2.11 -Dversion=1.3.1 -Dpackaging=jar -DgeneratePom=true -DpomFile=C:\yuzhouwan\Maven\pom.xml

Tips: 同样的问题,在 Cassandra Connector 也存在,已提交 PR 修复

Either 无法被序列化

描述

程序
1
2
3
4
5
6
7
DataStream<Either<HBaseServerLog, HBaseServerLog>> eventsWarn = cepWarn(events);
AlertEvent alertPoliceEvent = AlertEvent.getInstance();
eventsWarn.map(log -> {
alertPoliceEvent.alert(log);
return log;
});
报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ServerLogProcess.java:46)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:374)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:159)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:129)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:121)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1526)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:87)
at com.yuzhouwan.hbase.monitor.server.log.process.ServerLogProcess.main(ServerLogProcess.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Either' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1503)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1489)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:426)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:379)
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
at com.yuzhouwan.hbase.monitor.server.log.process.ServerLogProcess.main(ServerLogProcess.java:46)
... 5 more

解决

 不使用 map 方法去处理告警,将告警逻辑放到 PatternFlatTimeoutFunctionPatternFlatSelectFunction 里面

Cannot retrieve Left value on a Right

描述

程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(ElasticsearchSinkFunction<Either<HBaseServerLog, HBaseServerLog>>) (element, ctx, indexer) -> {
if (IS_DEBUG && IS_DEEP_DEBUG) _log.debug("Message: {} in ES Sink", element);
if (element == null) return;
HBaseServerLog left = element.left();
if (left != null)
indexer.add(indexRequest()
.index(HBASE_SERVER_LOG_INDEX_NAME)
.type(HBASE_SERVER_LOG_TYPE_NAME)
.source(left.toJSON()));
HBaseServerLog right = element.right();
if (right != null)
indexer.add(indexRequest()
.index(HBASE_SERVER_LOG_INDEX_NAME)
.type(HBASE_SERVER_LOG_TYPE_NAME)
.source(right.toJSON()));
};
报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java.lang.IllegalStateException: Cannot retrieve Left value on a Right
at org.apache.flink.types.Either$Right.left(Either.java:172) ~[flink-core-1.3.1.jar:1.3.1]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.lambda$createEsSinkFunction$8bb6efc1$1(StoreData2ES.java:102) ~[classes/:na]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282) ~[flink-connector-elasticsearch-base_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.cep.PatternStream$PatternFlatSelectTimeoutWrapper$RightCollector.collect(PatternStream.java:374) ~[flink-cep_2.11-1.3.1.jar:1.3.1]
at com.yuzhouwan.hbase.monitor.server.log.analyse.func.select.SelectFunctionWarn.lambda$flatSelect$0(SelectFunctionWarn.java:39) ~[classes/:na]
at java.util.ArrayList.forEach(ArrayList.java:1249) ~[na:1.8.0_111]
at com.yuzhouwan.hbase.monitor.server.log.analyse.func.select.SelectFunctionWarn.flatSelect(SelectFunctionWarn.java:37) ~[classes/:na]
at org.apache.flink.cep.PatternStream$PatternFlatSelectTimeoutWrapper.flatMap(PatternStream.java:341) ~[flink-cep_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.cep.PatternStream$PatternFlatSelectTimeoutWrapper.flatMap(PatternStream.java:320) ~[flink-cep_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ~[flink-runtime_2.11-1.3.1.jar:1.3.1]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

解决

1
2
3
4
// 先使用 `isLeft | isRight` 方法,做一下判断,然后再做处理
HBaseServerLog log = null;
if (element.isLeft()) log = element.left();
else if (element.isRight()) log = element.right();

ElasticsearchSinkFunction is not serializable

描述

1
2
3
4
5
6
7
8
9
java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) ~[flink-core-1.3.0.jar:1.3.0]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195) ~[flink-connector-elasticsearch-base_2.11-1.3.0.jar:1.3.0]
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95) ~[flink-connector-elasticsearch5_2.11-1.3.0.jar:1.3.0]
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:78) ~[flink-connector-elasticsearch5_2.11-1.3.0.jar:1.3.0]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.initEsSink(StoreData2ES.java:89) ~[classes/:na]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.init(StoreData2ES.java:85) ~[classes/:na]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.<init>(StoreData2ES.java:50) ~[classes/:na]
at com.yuzhouwan.hbase.monitor.server.log.process.ServerLogProcess.main(ServerLogProcess.java:52) ~[classes/:na]

解决

 原因是,从外部传入两个关于 ES 的参数,导致 ElasticSearchSinkFunction 类无法被序列化。解决方法,就是实现 ElasticsearchSinkFunction<T> 接口,并标记 Serializable,再将外部参数,通过 ElasticSearchSinkFunction 子类的构造函数传入 (这里还需要注意避免使用 static 属性)。类似的,还有 PatternFlatSelectFunction<IN, OUT>IterativeCondition<T>,对于外部传入的实例,如果因为序列化,也可能会出现 NullPointerException 异常。这时候,就算通过实现Cloneable 接口,对外部实例进行 clone,也会无法避免。因此,需要在 PatternFlatSelectFunction<IN, OUT>IterativeCondition<T> 内部,重新初始化实例,才能解决该问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ElasticsearchSinkFunctionWithConf<T> implements ElasticsearchSinkFunction<T>, Serializable {
private String hbaseServerLogIndexName;
private String hbaseServerLogTypeName;
public ElasticsearchSinkFunctionWithConf(String hbaseServerLogIndexName, String hbaseServerLogTypeName) {
this.hbaseServerLogIndexName = hbaseServerLogIndexName;
this.hbaseServerLogTypeName = hbaseServerLogTypeName;
}
@Override
public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
if (element == null) return;
if (!(element instanceof HBaseServerLog)) return;
indexer.add(indexRequest()
.index(hbaseServerLogIndexName)
.type(hbaseServerLogTypeName)
.source(((HBaseServerLog) element).toJSON()));
}
}

The implementation of the IterativeCondition is not serializable

描述

 因为 FlinkSpark 一样,无法在集群范围内,共享一个全局变量 (即便是 Spark 的 Accumulators 也只能做到全局的累加器),所以对于中间结果,一般地需要用类似 Redis / Memcached / MongoDB (三者的比对,详见后面要写的一篇博文) 的第三方存储来保存中间结果

 然而,JedisCluster 却无法做到序列化,因此报错 not serializable

1
org.apache.flink.api.common.InvalidProgramException: The implementation of the IterativeCondition is not serializable. The object probably contains or references non serializable fields.

解决

1
2
3
4
5
6
7
8
9
10
// 因为,无法序列化 JedisCluster,所以不能作为 IterativeCondition 的变量保存下来。但是,又不可能在每次 event 处理的时候,都重新构建 Redis Cluster 的连接池,所以,考虑就算无法通过序列化分发到各个 task 中,但是也要做到每个 task 中,只初始化一次
// 因此,我们用 `Object redis` 作为一个变量,但是并不做任何赋值,作为存留第一次初始化 Reids 连接池的对象引用
private Object redis;
// ...
RedisMiddleStatusStore redis;
if (this.redis == null) {
redis = new RedisMiddleStatusStore(DP);
this.redis = redis;
} else redis = (RedisMiddleStatusStore) this.redis;

描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java.lang.RuntimeException: Could not extract key from null
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:228)
at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
Caused by: java.lang.RuntimeException: Could not extract key from null
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
... 15 more
Caused by: org.apache.flink.types.NullKeyFieldException: Unable to access field java.lang.Integer com.yuzhouwan.hbase.monitor.server.log.data.model.HBaseServerLog.flumeId on object null
at org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181)
at org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329)
at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185)
at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
... 19 more

解决

1
2
3
4
5
6
7
8
// 不使用默认的 Tuple,而是自己构建 KeySelector
// KeyedStream<HBaseServerLog, Tuple> keyed = events.keyBy("flumeId");
KeyedStream<HBaseServerLog, Integer> keyed = events.keyBy((KeySelector<HBaseServerLog, Integer>) log -> {
if (log == null) return 0;
Integer flumeId;
if ((flumeId = log.getFlumeId()) == null) return 1;
return flumeId;
});

社区跟进

Pull Request

Issues

详见,《开源社区

资料

Doc

更多资源,欢迎加入,一起交流学习

QQ group: (人工智能 1020982 (高级) & 1217710 (进阶) | BigData 1670647)