Apache Flink

什么是 Flink?

Apache Flink™ is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

核心组件布局

Apache Flink Stack

(图片来源:Apache Flink™ 官网)

环境部署

基础环境

Linux

用户
1
2
3
4
5
6
7
8
9
10
11
12
13
# 增加用户,并赋予其密码
$ adduser flink
$ passwd flink # ur password for flink user

# 赋予用户可以 sudo 的权限
$ chmod u+w /etc/sudoers
$ vim /etc/sudoers
# 找到 `root ALL=(ALL) ALL` 这行,并在下面添加 flink 用户
flink ALL=(ALL) ALL
$ chmod u-w /etc/sudoers

# 切换到 flink 用户
$ su - flink
目录
1
2
3
4
$ cd /home/flink

# 存放软件目录 & 安装目录 & 日志目录 & 临时目录
$ mkdir install && mkdir software && mkdir logs && mkdir data

依赖

JDK

 从 oracle 官网下载 JDK 安装包

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cd ~/install
$ tar zxvf jdk-8u131-linux-x64.tar.gz -C ~/software/

$ cd ~/software
$ ln -s jdk-8u131-linux-x64 java
$ vim ~/.bash_profile
JAVA_HOME=/home/flink/software/java
FLINK_HOME=/home/flink/software/flink
CLASSPATH=.:$JAVA_HOME/lib/tools.jar
PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH

$ source ~/.bash_profile

 如果需要清除之前的低版本 JDK,或者重装,可以参照(没有这个需求,可跳过):

1
2
3
4
$ rpm -qa | grep -iE 'jdk|java'
jdk-1.7.0_80-fcs.x86_64

$ sudo rpm -e --nodeps jdk-1.7.0_80-fcs.x86_64

Local Cluster

1
2
3
4
5
6
7
8
# 下载安装包
$ cd /home/flink/install
$ wget https://dlcdn.apache.org/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
$ tar zxvf flink-1.17.0-bin-scala_2.12.tgz
$ rm -f flink
$ ln -s flink-1.17.0 flink
$ cd flink/
$ bin/flink -v
1
Version: 1.17.0, Commit ID: 69ecda0
1
2
3
4
5
# 启动 Local Cluster
$ bin/start-cluster.sh

# Web UI
$ open 'http://localhost:8081'

Apache Flink UI

(对 Apache Flink 可视化页面的截图)

分布式集群

节点分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ ssh flink@yuzhouwan00
$ rsync -avuz -e ssh /home/flink/ flink@yuzhouwan01:/home/flink
$ rsync -avuz -e ssh /home/flink/ flink@yuzhouwan02:/home/flink
$ rsync -avuz -e ssh /home/flink/ flink@yuzhouwan03:/home/flink

# 在各个节点上,激活环境变量
$ source ~/.bash_profile
$ java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)

$ flink -v
Version: 1.3.1, Commit ID: 1ca6e5b

免密登录

1
2
3
4
5
6
7
8
9
10
11
12
13
# 因为 Flink 集群中会用到 `sshd` 命令来管理和操作远程组件
$ ssh flink@yuzhouwan01
$ cd ~/.ssh # 如果没有改目录,则需要先执行 `ssh localhost`
$ ssh-keygen -t rsa
$ cat ./id_rsa.pub >> ./authorized_keys
$ scp ~/.ssh/id_rsa.pub flink@yuzhouwan02:/home/flink
$ scp ~/.ssh/id_rsa.pub flink@yuzhouwan03:/home/flink

# 在 Slave 节点上依次执行以下命令
$ ssh flink@yuzhouwan02
$ mkdir ~/.ssh
$ cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
$ rm ~/id_rsa.pub

配置集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ cd $FLINK_HOME
$ vim conf/flink-conf.yaml
jobmanager.rpc.address: yuzhouwan01
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
parallelism.default: 12
taskmanager.tmp.dirs: /home/flink/data

# 配置 slave 列表
$ vim conf/slaves
yuzhouwan02
yuzhouwan03

$ scp -r /home/flink/software/flink-1.3.1/conf/ flink@yuzhouwan02:/home/flink/software/flink/conf/
$ scp -r /home/flink/software/flink-1.3.1/conf/ flink@yuzhouwan03:/home/flink/software/flink/conf/

启动集群

1
2
3
4
5
6
7
8
$ start-cluster.sh 
Starting cluster.
Starting jobmanager daemon on host yuzhouwan01.
Starting taskmanager daemon on host yuzhouwan02.
Starting taskmanager daemon on host yuzhouwan03.

# Web UI
$ http://yuzhouwan01:8081

发布应用

Local

 使用 ExecutionEnvironment.createLocalEnvironment(parallelism) 获取程序执行环境,本地执行 main 函数即可完成发布

Remote

 可将本机为 Client,提交任务到远程集群执行,对应获取程序执行环境的代码如下

1
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/yuzhouwan.jar");

Cluster

 还有一种方法,可以将程序提交至远程集群上,通过 ExecutionEnvironment.getExecutionEnvironment() 方法 获取执行环境,并通过 maven-assembly-plugin 插件,将程序打包成单个可执行的 jar 包,并在 Web UI 上找到 Submit new Job 模块,完成上传、展示执行和最终提交任务至集群

1
2
3
4
5
6
7
8
9
10
11
12
13
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.yuzhouwan.process.ServerLogProcess</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
在 log/flink-flink-jobmanager-0-federation01.log 中可以查看任务失败的详情

实战技巧

 当前 Flink v1.3.1 的 CEP(Complex event processing for Flink)功能相比 Esper / Siddhi 之类的成熟 CEP框架,可能还是无法满足某些应用场景(如,groupby、aggregation 等聚合操作)。先前,有 @haoch 开发的一套 flink-siddhi 将 Siddhi整合到 Flink里面,不过 PR 因为 Apache Bahir的一位 Commitor 提议将这个功能放到 Bahir-Flink 里,导致 flink-siddhi 功能提交进度推迟了。好在,Flink目前正在参考 Oracle’s SQL for pattern matching,设计 CEP on SQL 的方案,目前看来,还是很值得期待的

组件

事件流(Event stream)
模式定义(Pattern definition)
单独模式
  • Begin
  • Filter
  • Subtype
  • OR
  • Continuity
  • Within
模式检测(Pattern detection)
告警生成(Alert generation)

性能调优

G1GC

1
2
$ vim conf/flink-conf.yaml
env.java.opts: -XX:+UseG1GC -XX:G1HeapRegionSize=4M -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=45 -XX:ParallelGCThreads=8

踩过的坑

Elasticsearch Connector

1
2
3
4
5
6
7
8
$ git fetch --tags
$ git checkout tags/release-1.3.1

$ D:\apps\Java\jdk1.8.0_111\bin\java -Dmaven.multiModuleProjectDirectory=E:\Github\Flink\asdf2014\flink-connectors\flink-connector-elasticsearch5 -Dmaven.home=D:\apps\maven\apache-maven-3.3.9 -Dclassworlds.conf=D:\apps\maven\apache-maven-3.3.9\bin\m2.conf -Didea.launcher.port=7538 "-Didea.launcher.bin.path=D:\apps\JetBrains\IntelliJ IDEA 2016.2.5\bin" -Dfile.encoding=UTF-8 -classpath "D:\apps\maven\apache-maven-3.3.9\boot\plexus-classworlds-2.5.2.jar;D:\apps\JetBrains\IntelliJ IDEA 2016.2.5\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain org.codehaus.classworlds.Launcher -Didea.version=2016.2.5 -T 1C -DskipTests=true install -P scala-2.11,!scala-2.10,!include-yarn-tests

# -Dscala-2.11 指定 profile 失效,是因为 v1.3.1 里面的问题,已经将 scala version 写死了,需要修改 pom 文件,参考 1.4.0-SNAPSHOT
# 此时,应该能在 $MAVEN_HOME/repository 中找到 flink-connector-elasticsearch5_2.11-1.3.1.jar 了,在 maven 中使用就不再报错了;如果没能成功,就需要 install:install-file 的方式进行导入了
$ mvn install:install-file -Dfile=D:\apps\maven\repository\org\apache\flink\flink-connector-elasticsearch5_2.11\1.3.1\flink-connector-elasticsearch5_2.11-1.3.1.jar -DgroupId=org.apache.flink -DartifactId=flink-connector-elasticsearch5_2.11 -Dversion=1.3.1 -Dpackaging=jar -DgeneratePom=true -DpomFile=C:\yuzhouwan\Maven\pom.xml
同样的问题,在 Cassandra Connector 也存在,已提交 PR#4087 完成修复

Either 无法被序列化

描述

程序
1
2
3
4
5
6
7
DataStream<Either<HBaseServerLog, HBaseServerLog>> eventsWarn = cepWarn(events);

AlertEvent alertPoliceEvent = AlertEvent.getInstance();
eventsWarn.map(log -> {
alertPoliceEvent.alert(log);
return log;
});
报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ServerLogProcess.java:46)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:374)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:159)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:129)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:121)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1526)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:87)
at com.yuzhouwan.hbase.monitor.server.log.process.ServerLogProcess.main(ServerLogProcess.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Either' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1503)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1489)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:426)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:379)
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
at com.yuzhouwan.hbase.monitor.server.log.process.ServerLogProcess.main(ServerLogProcess.java:46)
... 5 more

解决

 不使用 map 方法去处理告警,将告警逻辑放到 PatternFlatTimeoutFunctionPatternFlatSelectFunction 里面

Cannot retrieve Left value on a Right

描述

程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(ElasticsearchSinkFunction<Either<HBaseServerLog, HBaseServerLog>>) (element, ctx, indexer) -> {
if (IS_DEBUG && IS_DEEP_DEBUG) _log.debug("Message: {} in ES Sink", element);
if (element == null) return;
HBaseServerLog left = element.left();
if (left != null)
indexer.add(indexRequest()
.index(HBASE_SERVER_LOG_INDEX_NAME)
.type(HBASE_SERVER_LOG_TYPE_NAME)
.source(left.toJSON()));
HBaseServerLog right = element.right();
if (right != null)
indexer.add(indexRequest()
.index(HBASE_SERVER_LOG_INDEX_NAME)
.type(HBASE_SERVER_LOG_TYPE_NAME)
.source(right.toJSON()));
};
报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java.lang.IllegalStateException: Cannot retrieve Left value on a Right
at org.apache.flink.types.Either$Right.left(Either.java:172) ~[flink-core-1.3.1.jar:1.3.1]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.lambda$createEsSinkFunction$8bb6efc1$1(StoreData2ES.java:102) ~[classes/:na]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282) ~[flink-connector-elasticsearch-base_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.cep.PatternStream$PatternFlatSelectTimeoutWrapper$RightCollector.collect(PatternStream.java:374) ~[flink-cep_2.11-1.3.1.jar:1.3.1]
at com.yuzhouwan.hbase.monitor.server.log.analyse.func.select.SelectFunctionWarn.lambda$flatSelect$0(SelectFunctionWarn.java:39) ~[classes/:na]
at java.util.ArrayList.forEach(ArrayList.java:1249) ~[na:1.8.0_111]
at com.yuzhouwan.hbase.monitor.server.log.analyse.func.select.SelectFunctionWarn.flatSelect(SelectFunctionWarn.java:37) ~[classes/:na]
at org.apache.flink.cep.PatternStream$PatternFlatSelectTimeoutWrapper.flatMap(PatternStream.java:341) ~[flink-cep_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.cep.PatternStream$PatternFlatSelectTimeoutWrapper.flatMap(PatternStream.java:320) ~[flink-cep_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) ~[flink-streaming-java_2.11-1.3.1.jar:1.3.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ~[flink-runtime_2.11-1.3.1.jar:1.3.1]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

解决

1
2
3
4
// 先使用 `isLeft | isRight` 方法,做一下判断,然后再做处理
HBaseServerLog log = null;
if (element.isLeft()) log = element.left();
else if (element.isRight()) log = element.right();

ElasticsearchSinkFunction is not serializable

描述

1
2
3
4
5
6
7
8
9
java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) ~[flink-core-1.3.0.jar:1.3.0]
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195) ~[flink-connector-elasticsearch-base_2.11-1.3.0.jar:1.3.0]
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95) ~[flink-connector-elasticsearch5_2.11-1.3.0.jar:1.3.0]
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:78) ~[flink-connector-elasticsearch5_2.11-1.3.0.jar:1.3.0]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.initEsSink(StoreData2ES.java:89) ~[classes/:na]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.init(StoreData2ES.java:85) ~[classes/:na]
at com.yuzhouwan.hbase.monitor.server.log.store.StoreData2ES.<init>(StoreData2ES.java:50) ~[classes/:na]
at com.yuzhouwan.hbase.monitor.server.log.process.ServerLogProcess.main(ServerLogProcess.java:52) ~[classes/:na]

解决

 原因是,从外部传入两个关于 ES 的参数,导致 ElasticsearchSinkFunction 类无法被序列化。解决方法,就是实现 ElasticsearchSinkFunction<T> 接口,并标记 Serializable,再将外部参数,通过 ElasticsearchSinkFunction 子类的构造函数传入(这里还需要注意避免使用 static 属性)。类似的,还有 PatternFlatSelectFunction<IN, OUT>IterativeCondition<T>,对于外部传入的实例,如果因为序列化,也可能会出现 NullPointerException 异常。这时候,就算通过实现Cloneable 接口,对外部实例进行 clone,也会无法避免。因此,需要在 PatternFlatSelectFunction<IN, OUT>IterativeCondition<T> 内部,重新初始化实例,才能解决该问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ElasticsearchSinkFunctionWithConf<T> implements ElasticsearchSinkFunction<T>, Serializable {

private String hbaseServerLogIndexName;
private String hbaseServerLogTypeName;

public ElasticsearchSinkFunctionWithConf(String hbaseServerLogIndexName, String hbaseServerLogTypeName) {
this.hbaseServerLogIndexName = hbaseServerLogIndexName;
this.hbaseServerLogTypeName = hbaseServerLogTypeName;
}

@Override
public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
if (element == null) return;
if (!(element instanceof HBaseServerLog)) return;
indexer.add(indexRequest()
.index(hbaseServerLogIndexName)
.type(hbaseServerLogTypeName)
.source(((HBaseServerLog) element).toJSON()));
}
}

The implementation of the IterativeCondition is not serializable

描述

 因为 FlinkSpark 一样,无法在集群范围内,共享一个全局变量(即便是 Spark 的 Accumulators 也只能做到全局的累加器),所以对于中间结果,一般地需要用类似 Redis / Memcached / MongoDB(三者的比对,详见我的另一篇博文)的第三方存储来保存中间结果

 然而,JedisCluster 却无法做到序列化,因此报错 not serializable

1
org.apache.flink.api.common.InvalidProgramException: The implementation of the IterativeCondition is not serializable. The object probably contains or references non serializable fields.

解决

1
2
3
4
5
6
7
8
9
10
11
12
// 因为,无法序列化 JedisCluster,所以不能作为 IterativeCondition 的变量保存下来
// 但是,又不可能在每次 event 处理的时候,都重新构建 Redis Cluster 的连接池
// 所以,考虑就算无法通过序列化分发到各个 task 中,但是也要做到每个 task 中,只初始化一次
// 因此,我们用 `Object redis` 作为一个变量,但是并不做任何赋值,作为存留第一次初始化 Reids 连接池的对象引用

private Object redis;
// ...
RedisMiddleStatusStore redis;
if (this.redis == null) {
redis = new RedisMiddleStatusStore(DP);
this.redis = redis;
} else redis = (RedisMiddleStatusStore) this.redis;

描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java.lang.RuntimeException: Could not extract key from null
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:228)
at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
Caused by: java.lang.RuntimeException: Could not extract key from null
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
... 15 more
Caused by: org.apache.flink.types.NullKeyFieldException: Unable to access field java.lang.Integer com.yuzhouwan.hbase.monitor.server.log.data.model.HBaseServerLog.flumeId on object null
at org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181)
at org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329)
at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185)
at org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
... 19 more

解决

1
2
3
4
5
6
7
8
// 不使用默认的 Tuple,而是自己构建 KeySelector
// KeyedStream<HBaseServerLog, Tuple> keyed = events.keyBy("flumeId");
KeyedStream<HBaseServerLog, Integer> keyed = events.keyBy((KeySelector<HBaseServerLog, Integer>) log -> {
if (log == null) return 0;
Integer flumeId;
if ((flumeId = log.getFlumeId()) == null) return 1;
return flumeId;
});

Task xxx did not react to cancelling signal in the last 30 seconds, but is stuck in method

描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
A fatal error occurred, forcing the TaskManager to shut down: Task 'KeyedCEPPatternOperator -> Flat Map -> Sink: MULTI_EVENT (2/8)' did not react to cancelling signal in the last 30 seconds, but is stuck in method:
java.util.regex.Pattern$GroupTail.match(Pattern.java:4717)
java.util.regex.Pattern$Curly.match0(Pattern.java:4272)
java.util.regex.Pattern$Curly.match(Pattern.java:4234)
java.util.regex.Pattern$GroupHead.match(Pattern.java:4658)
java.util.regex.Pattern$GroupHead.match(Pattern.java:4658)
java.util.regex.Pattern$Curly.match0(Pattern.java:4279)
java.util.regex.Pattern$Curly.match(Pattern.java:4234)
java.util.regex.Pattern$GroupTail.match(Pattern.java:4717)
java.util.regex.Pattern$GroupTail.match(Pattern.java:4717)
java.util.regex.Pattern$Curly.match0(Pattern.java:4272)
java.util.regex.Pattern$Curly.match(Pattern.java:4234)
java.util.regex.Pattern$GroupHead.match(Pattern.java:4658)
java.util.regex.Pattern$GroupHead.match(Pattern.java:4658)
java.util.regex.Pattern$Curly.match0(Pattern.java:4279)
java.util.regex.Pattern$Curly.match(Pattern.java:4234)
java.util.regex.Pattern$GroupTail.match(Pattern.java:4717)
java.util.regex.Pattern$GroupTail.match(Pattern.java:4717)
java.util.regex.Pattern$Curly.match0(Pattern.java:4272)
java.util.regex.Pattern$Curly.match(Pattern.java:4234)
java.util.regex.Pattern$BmpCharProperty.match(Pattern.java:3798)
java.util.regex.Pattern$Curly.match0(Pattern.java:4272)
java.util.regex.Pattern$Curly.match(Pattern.java:4234)
java.util.regex.Pattern$GroupHead.match(Pattern.java:4658)
java.util.regex.Pattern$GroupHead.match(Pattern.java:4658)
java.util.regex.Pattern$Start.match(Pattern.java:3461)
java.util.regex.Matcher.search(Matcher.java:1248)
java.util.regex.Matcher.find(Matcher.java:637)
com.yuzhouwan.hbase.monitor.server.log.analyse.condition.IterativeConditionMultiWithDP.matchAndStore(IterativeConditionMultiWithDP.java:95)
com.yuzhouwan.hbase.monitor.server.log.analyse.condition.IterativeConditionMultiWithDP.filter(IterativeConditionMultiWithDP.java:79)
org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:633)
org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:610)
org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:421)
org.apache.flink.cep.nfa.NFA.process(NFA.java:241)
org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:56)
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:170)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
java.lang.Thread.run(Thread.java:745)

解决

 分析日志,得知 Task 被阻塞在某一个方法里面了
 因为程序已经运行了一周,一直未出现这种情况,所以考虑到可能是某一条异常数据导致的
 因此,增加数据清理逻辑,避免过长的字符串;另外,根据当前的场景,使用 Matcher.matches() 替换了 Matcher.find() 方法,使得正则匹配的性能提高了无数倍(我们的场景下,压测结论是 5000 倍的性能提升)

补充说明

Matcher 常用的三种查找方法
  • Matcher.matches()

    对整个字符串进行匹配,只有整个字符串都匹配成功了,才返回 true

  • Matcher.lookingAt()

    从字符串的起始部分进行匹配,只有字符串的前缀满足模式,才返回 true

  • Matcher.find()

    对局部字符串进行匹配,匹配到的字符串可以在任何位置(因此,性能会有下降很多,除非特定场景,否则尽量避免使用)

社区跟进

Pull Request

Issues

 详见:《如何成为 Apache 的 PMC

欢迎加入我们的技术群,一起交流学习

群名称 群号
人工智能(高级)
人工智能(进阶)
大数据
算法
数据库
Apache Druid 钉钉群 23318065