Apache Kafka 分布式消息队列框架

Kafka 是什么?

 Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design – Official website

为什么要有 Kafka?

分布式

 具备经济、快速、可靠、易扩充、数据共享、设备共享、通讯方便、灵活等,分布式所具备的特性

高吞吐量

 同时为数据生产者和消费者提高吞吐量

高可靠性

 支持多个消费者,当某个消费者失败的时候,能够自动负载均衡

离线 & 实时性

 能将消息持久化,进行批量处理

解耦

 作为各个系统连接的桥梁,避免系统之间的耦合

Kafka 工作机制

一些主要概念

  • Topic(主题)
    A topic is a category or feed name to which messages are published.

  • Producers(发布者)
    Producers publish data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic.

  • Consumers(订阅者)
    Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

示意图

(图片来源:Kafka 官网

Kafka Connect

Kafka Connect 是什么

 Kafka Connect 是一款可扩展且稳定的、可在 Apache Kafka 和其他系统之间进行数据传输的框架。能够快速定义,将大量数据导入导出 Kafka 的连接器。Source Connector 可以接受整个数据库,将表转化为 Stream 更新到 Kafka Topic 中。也支持将应用服务器的指标收集到 Kafka Topic,使得数据可用于低延迟场景的流处理。Sink Connector 则可以将数据从 Kafka Topic 传送到搜索引擎(如 ElasticSearch)或离线分析系统(如 Hadoop

(图片来源:Kafka™ 官网

特性

  • Kafka Connector 通用框架,提供统一的 API 集成接口
  • 同时支持分布式单机模式
  • 提供 RESTful 接口,用来查看和管理 Kafka Connectors
  • 自动化的 Offset 管理
  • 分布式、可扩展,基于现有的 Group 管理协议,可以通过增加 Task / Worker 实现动态扩展
  • 更方便集成其他 流 / 批 处理系统
  • 丰富的 Metrics 监控指标
  • 支持 C / C++ / Go / Java / JMS / .Net / Python 等多种语言的客户端

设计目标

  • 只专注于可靠、可扩展地同步数据(将转换、抽取等任务交给专门的数据处理框架)
  • 尽可能地保持粗粒度(比如,同步数据库时,是将整个数据库作为默认的处理单元,而不是一张张表)
  • 并行地数据同步(支持数据处理能力的自动扩展)
  • 支持 exactly-once 强语义(包括,类似 HDFS 这样没有主键用于区分重复的存储系统)
  • 当源系统提供了数据结构和类型之后,需要在数据同步过程中保存元数据
  • API 定义需要简洁、可重用、易于理解,方便实现自定义的 Connector
  • 同时支持单机开发测试,也支持分布式生产应用

基本概念

Connector

 Connector 是为了方便协调数据流任务,而提出的高层抽象

(图片来源:Confluent™ 官网

Task

 具体实现数据从 Kafka 导入导出的功能

 同时具备 Task 自动容灾的能力(不过 Worker 本身不支持 自动重启和扩容进程资源的功能)

(图片来源:Confluent™ 官网)

Worker

 执行 Connector 和 Task 的进程

(图片来源:Confluent™ 官网)

Converter

 用于转化 Kafka Connect 和 其他系统之间交互的数据格式

(图片来源:Confluent™ 官网)

Transform

 对 Connector 接收和发送的数据进行一些简单的处理逻辑

架构

Connector model

 Connector 模型,定义了 Kafka Connector 与外部系统的操作接口,包括 Connector 和 Task

 Connector 是一个指定的 Connector 实现类,配置了需要拷贝哪些数据,以及如何处理数据的相关属性。Connector 实例由一组 Tasks 组成。Kafka Connect 实际管理 Tasks,Connector 只负责生成 Tasks,并在框架指定更新的时候,完成配置更改。Source 端 / Sink 端 的 Connectors/Tasks API 接口是独立定义的,主要为了能够保证 API 定义的简洁性

Worker model

 Worker 模型,管理 Connector 和 Task 的生命周期,包括 启动、暂停、恢复、重启、停止 等

 Kafka Connect 群集由一组 Worker 进程组成,这些进程是执行 Connector 和 Task 的容器。Worker 自动和其他分布式的 Worker 协调工作,提供可扩展性和容错性。同时 Worker 进程的资源管理可以托管于 Yarn / Mesos,配置管理可以与 Chef / Puppet 框架整合,生命周期管理也可以使用 Oozie / Falcon

Data model

 数据模型,定义了 Kafka Connector 管理的数据结构、记录的序列化

 Connectors 复制数据流从一个 partitioned 输入流到一个 partitioned 输出流,其中输入输出端至少有一个总是 Kafka。每一个 Stream 都是一个有序的数据流,流里面的消息都有一个对应的偏移量。这些偏移量的格式和语义由 Connector 定义,以支持与各种系统的集成;然而,为了在故障的情况下实现某些传递语义,需要确保 Stream 内的偏移是唯一的,并且流可以 seek 任意的偏移。同时,Kafka Connect 支持插件式的转换器,以便支持各种序列化格式,来存储这些数据。另外,Schema 是内置的,使得关于数据格式的重要元数据,得以在复杂的数据管道中传播。但是,当 Schema 不可用时,也可以使用无模式的数据

Quick Start

基础环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 增加用户,并赋予其密码
$ adduser connect
$ passwd connect # ur password for connect user
# 赋予用户可以 sudo 的权限
$ chmod u+w /etc/sudoers
$ vim /etc/sudoers
# 找到 `root ALL=(ALL) ALL` 这行,并在下面添加 connect 用户
connect ALL=(ALL) ALL
$ chmod u-w /etc/sudoers
# 切换到 connect 用户
$ su - connect
$ cd /home/connect
# 存放软件目录 & 安装目录 & 日志目录
$ mkdir install && mkdir software && mkdir logs

Confluent

下载

 在 https://www.confluent.io/download/ 页面中下载 confluent-oss-3.3.1-2.11.tar.gz 安装包

安装
1
2
3
4
5
6
7
8
9
$ cd ~/install/
$ tar zxvf confluent-oss-3.3.1-2.11.tar.gz -C ~/software/
$ cd ~/software/
$ ln -s confluent-3.3.1/ confluent
$ vim ~/.bashrc
export CONFLUENT_HOME=/home/connect/software/confluent
export PATH=$CONFLUENT_HOME/bin:$PATH
$ source ~/.bashrc
启动
1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动 Zookeeper, Kafka, Schema Registry
$ confluent start schema-registry
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
$ jps -ml
32680 org.apache.zookeeper.server.quorum.QuorumPeerMain /tmp/confluent.A8TzcjSE/zookeeper/zookeeper.properties
348 io.confluent.support.metrics.SupportedKafka /tmp/confluent.A8TzcjSE/kafka/kafka.properties
483 io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain /tmp/confluent.A8TzcjSE/schema-registry/schema-registry.properties
发送 avro 数据
1
2
3
4
5
6
7
$ cd ~/software/confluent/
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入以下三行 JSON 串
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
# Ctrl+C 停止进程
接受 avro 数据
1
2
3
4
5
6
$ ./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning
# 接收到 producer 发送的三行 JSON 串
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
# Ctrl+C 停止进程
发送数据格式不对的 avro 数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"int"}'
# 输入以下一行 JSON 串
{"f1":"value1"}
# 获得如下报错
org.apache.kafka.common.errors.SerializationException: Error deserializing json {"f1":"value1"} to Avro of schema "int"
Caused by: org.apache.avro.AvroTypeException: Expected int. Got START_OBJECT
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:698)
at org.apache.avro.io.JsonDecoder.readInt(JsonDecoder.java:172)
at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:83)
at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:503)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at io.confluent.kafka.formatter.AvroMessageReader.jsonToAvro(AvroMessageReader.java:191)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:158)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:58)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
停止
1
2
3
4
5
6
7
$ confluent stop schema-registry
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]

Kafka Connect

启动 confluent
1
2
3
4
5
6
7
8
9
$ confluent start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
查看 connect 日志
1
2
3
4
5
6
$ confluent log connect
$ confluent current
/tmp/confluent.A8TzcjSE
$ cd /tmp/confluent.A8TzcjSE
$ less connect/connect.stderr
查看支持的 connect 类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ confluent list connectors
Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink
$ ll etc/
drwxr-xr-x 2 connect connect 4096 Jul 28 08:07 camus
drwxr-xr-x 2 connect connect 4096 Jul 28 07:41 confluent-common
drwxr-xr-x 2 connect connect 4096 Jul 28 07:28 kafka
drwxr-xr-x 2 connect connect 4096 Jul 28 07:50 kafka-connect-elasticsearch
drwxr-xr-x 2 connect connect 4096 Jul 28 07:58 kafka-connect-hdfs
drwxr-xr-x 2 connect connect 4096 Jul 28 08:06 kafka-connect-jdbc
drwxr-xr-x 2 connect connect 4096 Jul 28 08:04 kafka-connect-s3
drwxr-xr-x 2 connect connect 4096 Jul 28 07:52 kafka-connect-storage-common
drwxr-xr-x 2 connect connect 4096 Jul 28 07:48 kafka-rest
drwxr-xr-x 2 connect connect 4096 Jul 28 07:42 rest-utils
drwxr-xr-x 2 connect connect 4096 Jul 28 07:45 schema-registry
使用 file-source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 配置 file-source
$ cat ./etc/kafka/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
# 如果不需要 Schema Registry 功能,需要同时制定 key.converter 和 value.converter 参数为org.apache.kafka.connect.json.JsonConverter
# 往 test.txt 里面写入测试数据
$ for i in {1..3}; do echo "log line $i"; done > test.txt
# 装载 file-source 连接器,并确认 file-source 的配置
$ confluent load file-source
{"name":"file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","file":"test.txt","topic":"connect-test","name":"file-source"},"tasks":[]}
# 确认 connecter 是否已经装载
$ confluent status connectors
["file-source"]
# 检查 file-source 连接器的运行状态
$ confluent status file-source
{"name":"file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.101:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.1.101:8083"}]}
# 检查 Kafka 是否接收到数据
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"log line 1"
"log line 2"
"log line 3"
使用 file-sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 配置 file-sink
$ cat ./etc/kafka/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
# 装载 file-sink 连接器,并确认 file-sink 的配置
$ confluent load file-sink
{"name":"file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","file":"test.sink.txt","topics":"connect-test","name":"file-sink"},"tasks":[]}
# 确认 connecter 是否已经装载
$ confluent status connectors
["file-source","file-sink"]
# 检查 file-sink 连接器的运行状态
$ confluent status file-sink
{"name":"file-sink","connector":{"state":"RUNNING","worker_id":"192.168.1.101:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.1.101:8083"}]}
# 检查 Kafka 数据是否已经写入到文件
$ tail -f test.sink.txt
log line 1
log line 2
log line 3
# 另起窗口,并执行写入 test.txt 数据的脚本,可以看到对应 test.sink.txt 里面已经被写入数据
$ for i in {4..1000}; do echo "log line $i"; done >> test.txt
清理工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 卸载 file-source / file-sink connector
$ confluent unload file-source
$ confluent unload file-sink
# 关闭 connect 进程
$ confluent stop connect
Stopping connect
connect is [DOWN]
# 停止所有 confluent 进程
$ confluent stop
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
# 停止所有 confluent 进程,并删除临时文件
$ confluent destroy
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /tmp/confluent.A8TzcjSE

支持的组件

Kafka 2 HDFS

特性
Exactly Once Delivery

 Connector 使用 WAL 文件保证每条导入到 HDFS 的数据都是有且仅有一条
 通过将 Kafka offset 信息编码到 WAL 文件中,就可以在 task 失败或者重启的时候,获取到最后一条提交的 offset 了

Extensible Data Format

 原生支持 Avro / Parquet 格式,其他数据格式,可以通过扩展 Format 类获得支持

Hive Integration

 支持 Hive 的整合
 激活该功能后,Connector 会自动给每一个导入 HDFS 的 topic 创建 Hive 的外部分区表

Schema Evolution

 支持模式演进和不同的 Schema 兼容级别
 整合 Hive 时,支持给 schema.compatibility 配置 BACKWARD / FORWARD / FULL 三种模式
 Hive 表可以查询同一个 Topic 下,以不同 Schema 写入的所有表数据

Secure HDFS and Hive Metastore Support

 支持 Kerberos 权限控制,所以可以和带权限控制的 HDFS 或 Hive metastore 进行整合

Pluggable Partitioner

 支持默认的 Partitioner、基于 field 的 Partitioner、基于时间的 Partitioner(包括 daily / hourly 等粒度)
 可以实现 Partitioner 类扩展自己的 Partitioner
 也可以实现 TimeBasedPartitioner 类扩展基于时间的 Partitioner

实战 HDFS
安装 HDFS

 此处略,详见我的另一篇博客《Apache Eagle

配置 Confluent
1
2
3
4
5
6
7
$ vim etc/kafka-connect-hdfs/quickstart-hdfs.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=3
启用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$ confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
$ cd ~/software/confluent
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入一下三行数据
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
$ confluent load hdfs-sink -d etc/kafka-connect-hdfs/quickstart-hdfs.properties
{
"name":"hdfs-sink",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":"1",
"topics":"test_hdfs",
"hdfs.url":"hdfs://localhost:9000",
"flush.size":"3",
"name":"hdfs-sink"
},
"tasks":[
{
"connector":"hdfs-sink",
"task":0
}
]
}
验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ hadoop fs -ls /topics/test_hdfs/partition=0
Found 1 items
-rw-r--r-- 3 connect supergroup 197 2017-11-12 16:58 /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
$ wget http://mirror.metrocast.net/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar
# 直接在线上执行 avro 2 json 操作
$ hadoop jar avro-tools-1.8.2.jar tojson hdfs://localhost:9000/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
# 或者,拷贝 avro 文件到本地,再执行 avro 2 json 操作
$ hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro /tmp/test_hdfs+0+0000000000+0000000002.avro
$ java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
实战 Hive
安装 Hive
  • 下载 apache-hive-2.1.1-bin.tar.gz

  • 修改环境变量

    1
    2
    3
    $ vim ~/.bashrc
    export HIVE_HOME=/usr/local/hadoop/hive
    export PATH=$HIVE_HOME/bin:$HIVE_HOME/conf:$PATH
  • 修改配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    $ vim hive-env.sh
    export HADOOP_HEAPSIZE=1024
    # Set HADOOP_HOME to point to a specific hadoop install directory
    HADOOP_HOME=/usr/local/hadoop
    # Hive Configuration Directory can be controlled by:
    export HIVE_CONF_DIR=/usr/local/hadoop/hive/conf
    # Folder containing extra ibraries required for hive compilation/execution can be controlled by:
    export HIVE_AUX_JARS_PATH=/usr/local/hadoop/hive/lib
    1
    2
    3
    4
    5
    $ vim hive-site.xml
    # 该参数指定了 Hive 的数据存储目录,默认位置在 HDFS 上面的 /user/hive/warehouse 路径下
    hive.metastore.warehouse.dir
    # 该参数指定了 Hive 的数据临时文件目录,默认位置为 HDFS 上面的 /tmp/hive 路径下
    hive.exec.scratchdir
  • 初始化和启动运行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 初始化
    $ ./schematool -initSchema -dbType derby
    # 启动
    $ nohup bin/hive --service metastore > logs/metastore.log &
    $ nohup bin/hive --service hiveserver2 > logs/hiveserver2.log &
    # 验证
    $ bin/hive
    > show databases;
配置 Confluent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ vim etc/kafka-connect-hdfs/quickstart-hdfs.properties
# hdfs 部分
name=hdfs_sink_18
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
topics.dir=/user/connect/topics
flush.size=1
topics=hdfs_sink_18
tasks.max=1
hdfs.url=hdfs://192.168.1.101:9000
logs.dir=/user/connect/logs
schema.cache.size=1
value.converter.schema.registry.url=http://localhost:8081
format.class=io.confluent.connect.hdfs.avro.AvroFormat
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
# hive 部分
hive.integration=true
hive.metastore.uris=thrift://192.168.1.101:9083
schema.compatibility=BACKWARD
hive.database=hive
hive.conf.dir=/home/connect/software/hive/conf
hive.home=/home/connect/software/hive
启用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ confluent load hdfs-sink-18 -d etc/kafka-connect-hdfs/quickstart-schema.properties
$ cd ~/software/confluent
$ ./bin/kafka-avro-console-producer --broker-list 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 --topic hdfs_sink_18 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入以下数据
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
{"f1":"value4"}
{"f1":"value4"}
{"f1":"value4"}
{"f1":"value4"}
{"f1":"value4"}
{"f1":"value4"}
{"f1":"value5"}
{"f1":"value6"}
验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ bin/hive
$ use hive;
$ select * from hdfs_sink_18 limit 10;
OK
value4 0
value4 0
value6 0
value2 1
value4 1
value4 1
value5 1
value1 2
value4 2
value4 2
Time taken: 8.458 seconds, Fetched: 10 row(s)

Kafka 2 ElasticSearch

特性
Exactly Once Delivery

 Connect 使用 ElasticSearch 的幂等写入语句和设置 ES Document IDs,来确保写入 ES 的数据都是有且仅有一条
 如果 Kafka 消息里面包含了 Key 值,那么这些 Key 值会自动转化为 ES Document IDs;相反,如果 Kafka 消息里面没有包含这些 Key 值,或者是明确指定不要使用 Kafka 消息里面的 Key 值,Kafka Connect 将会自动使用 topic+partition+offset 作为 Key 值,以确保每一条写入 ES 的数据,都有一个唯一对应的 Document

Mapping Inference

 启用该功能时,Connector 可以自动依据 Schema Register 来推断 ES mapping 结构
 但是,该功能的推断受限于 field 类型和默认值。如果需要增加更多的限制(例如,用户自定义的解析器),则需要自己手动创建 mapping

Schema Evolution

 支持模式演进和不同的 Schema 兼容级别
 可以处理一些模式不兼容的更改,比如

  • 增加字段

    当增加一个或者多个 field 到 Kafka 消息中,并且 ES 开启了 dynamic mapping 功能,那么 ES 将会自动增加新的 field 到 mapping 中

  • 移除字段

    当从 Kafka 消息中移除一个或者多个 field 时,则这些缺失的值,将会被作为 null 值进行处理

  • 更改字段类型

    例如将一个 field 从 string 改成 integer 类型,ES 将会自动将 string 转化为 integer

Delivery Semantics

 Connector 支持 batchpipeline 两种写入 ES 的模式,以此来增加吞吐量。batch 模式下,允许并行地处理多个 batch
 通过使用分区级 Kafka 偏移量作为 ES 文档版本,并使用 version_mode = external 配置,来确保文档级(Document-level)更新顺序

Reindexing with Zero Downtime

 利用 ES 的 Index Aliases 接口,可以完成零停机 reindexing 操作(ES 本身提供了 Reindex 接口,不过性能不高),具体对 ES 集群操作的步骤如下

  1. Create an alias for the index with the old mapping.
  2. The applications that uses the index are pointed to the alias.
  3. Create a new index with the updated mapping.
  4. Move data from old to the new index.
  5. Atomically move the alias to the new index.
  6. Delete the old index.

 为了保证用户无感知,需要在 reindexing 期间,仍然可以处理写数据的请求。但是别名是无法同时往新旧 index 写入数据的。为了解决这个问题,可以使用两个 ElasticSearch Connector 同时将相同的数据,写入到新旧 index 中,具体对 ES Connector 的操作步骤如下

  1. The connector job that ingest data to the old indices continue writing to the old indices.
  2. Create a new connector job that writes to new indices. This will copy both some old data and new data to the new indices as long as the data is in Kafka.
  3. Once the data in the old indices are moved to the new indices by the reindexing process, we can stop the old connector job.
实战
安装 ElasticSearch

 此处略写,详见我的另一篇博客《ElasticSearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 下载
$ cd ~/install
$ curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.4.tar.gz
$ tar zxvf elasticsearch-5.6.4.tar.gz -C ~/software/
$ cd ~/software
$ ln -s elasticsearch-5.6.4/ elasticsearch
# 环境变量
$ vim ~/.bashrc
export JAVA_HOME=~/software/java
export ELASTIC_SEARCH_HOME=~/software/elasticsearch
export PATH=$JAVA_HOME/bin:$ELASTIC_SEARCH_HOME/bin:$PATH
$ source ~/.bashrc
$ elasticsearch -version
Version: 5.6.4, Build: 8bbedf5/2017-10-31T18:55:38.105Z, JVM: 1.8.0_131
# 后台启动
$ elasticsearch -Ecluster.name=yuzhouwan -Enode.name=yuzhouwan_kafka_connect_test -d
# 检查
$ jps -ml
25332 org.elasticsearch.bootstrap.Elasticsearch -Ecluster.name=yuzhouwan -Enode.name=yuzhouwan_kafka_connect_test -d
$ netstat -nap | grep 9200
tcp 0 0 ::ffff:127.0.0.1:9200 :::* LISTEN 25332/java
tcp 0 0 ::1:9200 :::* LISTEN 25332/java
$ curl -XGET 'http://localhost:9200/_cluster/health' | jq
{
"cluster_name": "yuzhouwan",
"status": "yellow",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 5,
"active_shards": 5,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 5,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50
}
配置 Confluent
1
2
3
4
5
6
7
8
$ vim etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
启用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
$ cd ~/software/confluent
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ ./bin/kafka-server-start ./etc/kafka/server.properties
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
$ ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
$ cd ~/software/confluent
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入一下三行数据
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
$ confluent load elasticsearch-sink | jq
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test-elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink"
},
"tasks": [
{
"connector": "elasticsearch-sink",
"task": 0
}
]
}
验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
$ curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty'
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 1.0,
"hits" : [
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+0",
"_score" : 1.0,
"_source" : {
"f1" : "value1"
}
},
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+2",
"_score" : 1.0,
"_source" : {
"f1" : "value3"
}
},
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+1",
"_score" : 1.0,
"_source" : {
"f1" : "value2"
}
}
]
}
}
踩过的坑
max file descriptors [32768] for elasticsearch process is too low, increase to at least [65536]
  • 解决
1
2
3
4
5
$ su root
$ vim /etc/security/limits.d/90-nproc.conf
* - nproc 20480
* - nofile 65536
* - memlock unlimited

Kafka 2 Druid

 官方决定使用 Kafka Indexing Service 方案,方便构建自己的生态圈。不过,目前的 Kafka Indexing Service 仍然存在一定的缺陷。因此,Kafka Connect 2 Druid 功能需要自己基于 Druid Tranquility 组件进行定制开发。同时,可以在关闭 Kafka Auto Commit 功能后,自己控制事务提交,来保证不丢不重,进而弥补了 Tranquility 组件没有 Exactly-once 特性的缺陷

Tips: Stream Reactor 中基于 Tranquility 实现了 K2D

Kafka 2 Kafka

 Connect Replicator 可以轻松可靠地将 Topic 从一个 Kafka 集群复制到另一个。除了复制消息之外,此连接器还会根据需要创建主题,以保留源群集中的 Topic 配置。这包括保留分区数量,副本数以及为单个 Topic 指定对任何配置的覆盖

 下图显示了一个典型的多数据中心Data Center)部署,其中来自位于不同数据中心的两个 Kafka 群集的数据聚合在位于另一个数据中心的单独群集中。这里,复制数据的来源称为源簇,而复制数据的目标称为目的地

(图片来源:Confluent™ 官网
特性
  • Topic 的选择,可以使用白名单、黑名单和正则表达式
  • 支持使用匹配的 Partition 数量、 Replicaton 因子和 Topic 配置覆盖,在目标集群动态创建 Topic
  • 当新的 partition 被加入到源集群之后,目标集群会自动扩容对应的 Topic
  • 其他源集群的配置更改,都会被自动同步到目标集群

单机版 Worker

启动

1
$ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

配置

1
2
3
4
5
# 存储 connector 的 offset
offset.storage.file.filename
# RESTful 端口,接受 HTTP 请求
rest.port

Tips: 单机模式下,偏移量 offset 保存在 /tmp/connect.offset

分布式 Worker

启动 Confluent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ cd ~/software/confluent
# 启动 ZooKeeper
$ nohup ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties > zookeeper.log &
# 启动 Kafka
$ nohup ./bin/kafka-server-start ./etc/kafka/server.properties > kafka.log &
# 启动 Schema Registry
$ nohup ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties > schema.log &
# 启动 Kafka REST
$ nohup ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties > kafka-rest.log &
# 以分布式模式启动 Connect
$ nohup ./bin/connect-distributed ./etc/kafka/connect-distributed.properties > connect-distribute.log &

创建 Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 获取帮助文档
$ bin/kafka-topics --help
# 创建 connect-configs / connect-offsets / connect-status 三个 Topic
# connect-configs 存储 connector 和 task 的配置信息
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# connect-offsets 存储 connector 和 task 的 offset 信息
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# connect-status 存储 connector 和 task 的状态变更信息
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
# 查看已存在的 topic
$ bin/kafka-topics --list --zookeeper localhost:2181
__consumer_offsets
_schemas
connect-configs
connect-offsets
connect-statuses
# 检查 topic 状态
$ bin/kafka-topics --describe --zookeeper localhost:2181 --topic connect-configs
Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:
Topic:connect-configs Partition: 0 Leader: 0 Replicas: 0 Isr: 0

删除 Topic

1
2
3
4
5
$ vim etc/kafka/server.properties
# 如果需要删除 topic,需要先设置 delete.topic.enable 为 true
delete.topic.enable=true
$ bin/kafka-topics --delete --zookeeper localhost:2181 --topic connect-configs

Connector 和 Task 状态

 所有 Connector 和隶属于这些 Connector 的 Tasks,都通过 status.storage.topic 发布状态更新。因为 Worker 消费 status.storage.topic 是异步的,所以从 API 获取最新状态的时候,会存在一些短暂的延迟

 Connector 或者某个 Task 的状态,可能为:

  • UNASSIGNED

    Connector 或者 Task 尚未被分配到某个 Worker 上

  • RUNNING

    运行中

  • PAUSED

    Connector 或者 Task 被管理员暂停

  • FAILED

    通常因为出现异常,导致 Connector 或者 Task 失败

 一般 Pause / Resume API 的操作,适用的场景是 消费端系统需要维护,通过停止掉 Kafka Connector,来避免数据一直被积压着。同时,停止操作不是临时的,即便重启了集群,仍需要再次操作 Resume 接口,暂停的 Connector 才会恢复到 Running 状态。另外,FAILED 状态的任务是不允许执行 Pause 操作的,需要重启恢复到 Running 状态才行

Worker 之间如何建立通讯

 只要保证各个 Worker 使用的是统一的 group.id(可以看做是 Cluster Name),还有 config.storage.topicoffset.storage.topicstatus.storage.topic 三个 Topic 也需要保持一致,Worker 就会自动发现同一个集群中的其他 Worker 进程

Relance 机制

1
2
3
4
5
6
7
[2017-12-04 11:32:18,607] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1187)
[2017-12-04 11:32:18,608] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
[2017-12-04 11:32:18,608] INFO (Re-)joining group connect-cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
[2017-12-04 11:32:18,612] INFO Successfully joined group connect-cluster with generation 2 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
[2017-12-04 11:32:18,612] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-efb9e92c-27a6-4062-9fcc-92480f8e9e03', leaderUrl='http://192.168.1.101:8083/', offset=-1, connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1166)
[2017-12-04 11:32:18,612] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:815)
[2017-12-04 11:32:18,613] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:825)

完全分布式

替换原生的 Zookeeper

 具体安装过程略,详见我的另一篇博客《Zookeeper 原理与优化

1
2
3
4
5
6
7
8
9
10
11
$ cd /home/connect/software/confluent
$ vim ./etc/kafka/server.properties
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
zookeeper.connection.timeout.ms=6000
$ vim ./etc/schema-registry/connect-avro-distributed.properties
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
$ zkServer.sh start

替换原生的 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 创建日志目录
$ mkdir -p /home/connect/kafka_log
$ cd /home/connect/software/kafka
# 配置 Kafka Server
$ vim config/server.properties
# 每个 Kafka Broker 节点,配置不同的 broker.id
broker.id=0
# 允许删除 topic
delete.topic.enable=true
# 配置成当前 Kafka Borker 节点的 IP
listeners=PLAINTEXT://192.168.1.101:9092
advertised.listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/home/connect/kafka_log
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
# 配置 Kafka Zookeeper
$ vim config/zookeeper.properties
dataDir=/home/connect/zkdata
clientPort=2181
maxClientCnxns=60
# 配置 Kafka Consumer
$ vim config/consumer.properties
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
# 启动 Kafka
$ nohup /home/connect/software/kafka/bin/kafka-server-start.sh /home/connect/software/kafka/config/server.properties > /home/connect/kafka_log/kafka.server.log 2>&1 &
# 查看 Kafka 日志
$ tail -f ~/kafka_log/kafka.server.log
# 更新 Confluent 配置
$ cd /home/connect/software/confluent
$ vim etc/kafka/connect-distributed.properties
bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
# 创建 connect-configs / connect-offsets / connect-status 三个 Topic
# connect-configs 存储 connector 和 task 的配置信息
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# connect-offsets 存储 connector 和 task 的 offset 信息
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-offsets --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# connect-status 存储 connector 和 task 的状态变更信息
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-status --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# 查看已存在的 topic
$ bin/kafka-topics.sh --list --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
test-elasticsearch-sink
# 检查 topic 状态
$ bin/kafka-topics --describe --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-configs
Topic:connect-configs PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic:connect-configs Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,0,1

更新 Kafka Rest 配置

1
2
3
4
5
6
7
$ cd /home/connect/software/confluent
$ vim etc/kafka-rest/kafka-rest.properties
id=kafka-rest-test-server
schema.registry.url=http://192.168.1.103:8081
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
$ nohup ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties &

更新 Schema Register 配置

1
2
3
4
5
6
7
8
9
$ cd /home/connect/software/confluent
$ vim etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
kafkastore.topic=_schemas
debug=false
$ nohup ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
$ tail -f logs/schema-registry.log

更新 Worker 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ vim /home/connect/software/confluent/etc/kafka/connect-distributed.properties
bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 考虑到方便压测,可以关闭 schema 功能,将下面两个配置项置为 false
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.1.103:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.1.103:8081
$ nohup /home/connect/software/confluent/bin/connect-distributed /home/connect/software/confluent/etc/kafka/connect-distributed.properties > /home/connect/software/confluent/logs/connect-distributed.log &
# 分发配置,并在各节点启动 worker
$ scp /home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties 192.168.1.102:/home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties
$ scp /home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties 192.168.1.103:/home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties

踩过的坑

WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!
解决
1
2
$ vim .ssh/known_hosts
# 删除本机 IP 下的秘钥
Timed out while checking for or creating topic(s) ‘connect-offsets’
描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[2017-12-01 17:39:13,503] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:206)
org.apache.kafka.connect.errors.ConnectException: Timed out while checking for or creating topic(s) 'connect-offsets'. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:243)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:146)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:99)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:194)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
参考
Request to leader to reconfigure connector tasks failed
描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[2018-03-13 10:30:28,303] ERROR Unexpected error during connector task reconfiguration: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:933)
[2018-03-13 10:30:28,303] ERROR Task reconfiguration for FileStreamSinkConnector failed unexpectedly, this connector will not be properly reconfigured unless manually triggered. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:934)
[2018-03-13 10:30:28,306] INFO 192.168.1.102 - - [13/Mar/2018:02:30:28 +0000] "POST /connectors/FileStreamSinkConnector/tasks?forward=false HTTP/1.1" 409 113 1 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2018-03-13 10:30:28,307] INFO 192.168.1.102 - - [13/Mar/2018:02:30:28 +0000] "POST /connectors/FileStreamSinkConnector/tasks?forward=true HTTP/1.1" 409 113 3 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2018-03-13 10:30:28,307] INFO 192.168.1.102 - - [13/Mar/2018:02:30:28 +0000] "POST /connectors/FileStreamSinkConnector/tasks HTTP/1.1" 409 113 4 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2018-03-13 10:30:28,307] ERROR Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder:996)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Cannot complete request because of a conflicting operation (e.g. worker rebalance)
at org.apache.kafka.connect.runtime.rest.RestServer.httpRequest(RestServer.java:229)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$18.run(DistributedHerder.java:993)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2018-03-13 10:30:28,307] ERROR Failed to reconfigure connector´s tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:922)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Cannot complete request because of a conflicting operation (e.g. worker rebalance)
at org.apache.kafka.connect.runtime.rest.RestServer.httpRequest(RestServer.java:229)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$18.run(DistributedHerder.java:993)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2018-03-13 10:30:28,557] INFO SinkConnectorConfig values:
connector.class = org.apache.kafka.connect.file.FileStreamSinkConnector
key.converter = null
name = FileStreamSinkConnector
tasks.max = 1
topics = [kafka-connect-ui-file-sink]
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2018-03-13 10:30:28,558] INFO EnrichedConnectorConfig values:
connector.class = org.apache.kafka.connect.file.FileStreamSinkConnector
key.converter = null
name = FileStreamSinkConnector
tasks.max = 1
topics = [kafka-connect-ui-file-sink]
transforms = null
value.converter = null
解决
1
2
3
4
5
6
7
8
9
10
$ vim etc/kafka/connect-distributed.properties
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=0.0.0.0
rest.port=8083
# 增加如下两个配置
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
rest.advertised.host.name=192.168.1.101 # 当前机器 IP 地址
rest.advertised.port=8083
参考
io.confluent.kafka.schemaregistry.client.rest.RestService Connection refused
描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic kafka-connect-ui-file-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 发送数据
{"f1": "value1"}
[2018-03-13 10:48:24,211] ERROR Failed to send HTTP request to endpoint: http://localhost:8081/subjects/kafka-connect-ui-file-sink-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService:156)
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
解决
1
2
3
# 检查 schema register 进程是否存在
$ nohup ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
$ tail -f logs/schema-registry.log

Docker 镜像

Docker 环境安装

更新镜像源
1
2
3
4
# 添加国内 yum 源
$ sudo yum-config-manager --add-repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo
# 更新 yum 软件源缓存
$ sudo yum makecache fast
安装 Docker-CE
1
2
3
4
5
6
7
8
# 安装 依赖包
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
# 安装 docker-ce
$ sudo yum install docker-ce
# 或者直接一键安装
$ curl -fsSL get.docker.com -o get-docker.sh
$ sudo sh get-docker.sh --mirror Aliyun
启动
1
2
$ sudo systemctl enable docker
$ sudo systemctl start docker
建立 Docker 用户组
1
2
3
4
# 建立 docker 组
$ sudo groupadd docker
# 将当前用户加入 docker 组
$ sudo usermod -aG docker $USER

MySQL - Kafka Connect - HDFS 实战

下载

 MySQL - Kafka Connect - HDFS 的集成环境,官方已经提供了 kafka_connect_blog.ova 镜像文件

安装
VirtualBox

 Vagrant 来管理和安装 VirtualBox 虚拟机,相关安装步骤,详见我的另一篇博客《Python

导入虚拟机镜像

 选择合适的资源,导入 kafka_connect_blog.ova 文件即可
 默认用户名密码均为 vagrant

操作虚拟机镜像
1
2
3
4
# 更新下载源索引
$ sudo apt-get update
$ ./setup.sh
$ ./start.sh

Tips: 机器配置过低 和 网络代理受阻 的原因,未完待续…

踩过的坑

bridge-nf-call-iptables is disabled
描述
1
2
WARNING: bridge-nf-call-iptables is disabled
WARNING: bridge-nf-call-ip6tables is disabled
解决
1
2
3
4
5
6
7
8
# 添加内核配置参数
$ sudo tee -a /etc/sysctl.conf <<-EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF
# 重新加载 sysctl.conf
$ sudo sysctl -p

常用配置

Worker 通用配置

bootstrap.servers
1
2
3
# 建立到 Kafka 的初始连接
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
[key | value].converter
1
2
3
4
5
6
7
8
# 指定 Kafka 中的数据如何转为到 Connect
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.[key | value].converter
1
2
3
4
5
6
7
8
# 指定 Connect 内部的数据转化
# The offsets, status, and configurations are written to the topics using converters specified through the following required properties.
# Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.host.name & rest.port
1
2
3
4
5
# 配置 RESTful 服务的 IP 和 Port
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=0.0.0.0
rest.port=8083
plugin.path
1
2
3
4
5
6
7
8
9
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/connect/plugins

分布式 Worker 配置

group.id
1
2
# The group ID is a unique identifier for the set of workers that form a single Kafka Connect cluster
group.id=connect-cluster
[config | offset | status].storage.topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
[config | offset | status].storage.replication.factor
1
2
3
4
5
6
7
8
# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That´s okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
[offset | status].storage.partitions
1
2
3
4
5
6
7
# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
offset.storage.partitions=25
status.storage.partitions=5

Connector 配置

说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Unique name for the connector. Attempting to register again with the same name will fail.
name
# The Java class for the connector
connector.class
# The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
tasks.max
# (optional) Override the default key converter class set by the worker.
key.converter
# (optional) Override the default value converter class set by the worker.
value.converter
# Sink connectors also have one additional option to control their input
# A list of topics to use as input for this connector
topics
实例
单机版 Connector 配置
1
2
3
4
5
name=local-file-sink
connector.class=FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=connect-test
分布式 Connector 配置
1
2
3
4
$ curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
# 或者直接指定一个包含了 JSON 格式的配置文件
$ curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors

RESTful 接口

说明

 Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083.

 The following are the currently supported endpoints:

  • GET /connectors - return a list of active connectors
  • POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
  • GET /connectors/{name} - get information about a specific connector
  • GET /connectors/{name}/config - get the configuration parameters for a specific connector
  • PUT /connectors/{name}/config - update the configuration parameters for a specific connector
  • GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
  • GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
  • PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
  • PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
  • POST /connectors/{name}/restart - restart a connector (typically because it has failed)
  • POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
  • DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration

 Kafka Connect also provides a REST API for getting information about connector plugins:

  • GET /connector-plugins- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
  • PUT /connector-plugins/{connector-type}/config/validate - validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

实例

Worker 版本信息
1
2
3
4
5
$ curl localhost:8083/ | jq
{
"version": "0.10.0.1-cp1",
"commit": "ea5fcd28195f168b"
}
Worker 支持的 Connector 插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
$ curl localhost:8083/connector-plugins | jq
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "0.11.0.0-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
}
]
Worker 上激活的 Connector 插件
1
2
3
4
5
$ curl localhost:8083/connectors | jq
[
"file-source",
"file-sink"
]
重启 Connector
1
2
3
4
5
6
7
$ curl -X POST localhost:8083/connectors/file-sink/restart
# 如果成功了,不会返回任何信息
# 如果失败了,会打印如下类似信息
{
"error_code": 404,
"message": "Unknown connector: local-file-sink"
}
获得某个 Connector上的所有 Tasks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ curl localhost:8083/connectors/file-sink/tasks | jq
[
{
"id": {
"connector": "file-sink",
"task": 0
},
"config": {
"topics": "connect-test",
"file": "test.sink.txt",
"task.class": "org.apache.kafka.connect.file.FileStreamSinkTask"
}
}
]
重启 Task
1
2
3
4
5
6
7
$ curl -X POST localhost:8083/connectors/file-sink/tasks/0/restart | jq
# 如果成功了,不会返回任何信息
# 如果失败了,会打印如下类似信息
{
"error_code": 404,
"message": "Unknown task: file-sink-1"
}
暂停 Connector
1
$ curl -X PUT localhost:8083/connectors/file-sink/pause | jq
恢复 Connector
1
$ curl -X PUT localhost:8083/connectors/file-sink/resume | jq
更新 Connector 配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"FileStreamSinkConnector","file":"test.sink.txt","tasks.max":"2","topics":"connect-test","name":"local-file-sink"}' localhost:8083/connectors/local-file-sink/config
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSinkConnector",
"file": "test.sink.txt",
"tasks.max": "2",
"topics": "connect-test",
"name": "local-file-sink"
},
"tasks": [
{
"connector": "local-file-sink",
"task": 0
},
{
"connector": "local-file-sink",
"task": 1
}
]
}
获取 Connector 状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ curl localhost:8083/connectors/file-sink/status | jq
{
"name": "file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.101:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "192.168.1.101:8083"
}
]
}
获取 Connector 配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ curl localhost:8083/connectors/file-sink | jq
{
"name": "file-sink",
"config": {
"topics": "connect-test",
"file": "test.sink.txt",
"name": "file-sink",
"tasks.max": "1",
"connector.class": "FileStreamSink"
},
"tasks": [
{
"connector": "file-sink",
"task": 0
}
]
}
删除 Connector
1
$ curl -X DELETE localhost:8083/connectors/file-sink

Tips: 每个 Connector 进程在启动的时候,都会内置地启动一个 REST 服务端(默认端口 8083)

Schema Registry

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 列出所有 Schema
$ curl -X GET http://localhost:8081/subjects | jq
[
"hdfs_sink_13-value",
"kafka-connect-ui-file-sink-value",
"hdfs_sink_16-value",
"hdfs_sink_17-value",
"hdfs_sink_18-value"
]
# 删除 hdfs_sink_13-value 的所有版本
$ curl -X DELETE http://localhost:8081/subjects/hdfs_sink_13-value
[1]
# 删除 hdfs_sink_16-value 的第一个版本
$ curl -X DELETE http://localhost:8081/subjects/hdfs_sink_16-value/versions/1
1
# 删除 hdfs_sink_17-value 最后一个版本
$ curl -X DELETE http://localhost:8081/subjects/hdfs_sink_17-value/versions/latest
1

可视化

Confluent UI

 未开源

Landoop UI

kafka-connect-ui
启动
1
2
3
4
5
6
7
8
# 下载源码
$ git clone https://github.com/Landoop/kafka-connect-ui.git
$ cd kafka-connect-ui
# 安装
$ npm install -g bower http-server
$ npm install
# 启动
$ http-server -p 8080 .
开启 Confluent Rest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 打开 REST 接口
$ cd software/confluent
$ vim ./etc/kafka/connect-distributed.properties
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=0.0.0.0
rest.port=8083
# 如果不设置这两个配置,Worker 节点之间将无法互相识别
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
rest.advertised.host.name=192.168.1.101 # 当前机器 IP 地址
rest.advertised.port=8083
# 如果不设置这两个配置,页面上将会看到 prod http://xxxx:8083 N/A N/A N/A
access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
access.control.allow.origin=*
# 重新启动 Worker
$ nohup ./bin/connect-distributed ./etc/kafka/connect-distributed.properties > connect-distribute.log &
# 浏览器访问,验证
$ curl http://192.168.1.101:8083/
{"version":"0.11.0.1-cp1","commit":"3735a6ca8b6432db"}
创建 File Sink
  • 选择 Kafka Connect 集群

  • 查看 Dashboard

  • 点击 Create 按钮

  • 配置 Connector

  • 创建成功

  • 查看 Connector 详情

发送数据
1
2
3
4
5
6
7
8
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic kafka-connect-ui-file-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 发送数据
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
# 持续发送
for i in {1..3}; do echo "{\"f1\": \"value$i\"}" | ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic kafka-connect-ui-file-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'; done
接收成功
1
2
3
4
$ cat ./software/confluent-3.3.1/kafka-connect-ui-file-sink.txt
Struct{f1=value1}
Struct{f1=value2}
Struct{f1=value3}
创建 File Source
1
2
3
4
5
6
# 具体操作步骤,参考上述 File Sink 创建过程,配置如下
name=FileStreamSourceConnector
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
file=kafka-connect-ui-file-source.txt
tasks.max=1
topic=kafka-connect-ui-file-sink

 查看 Dashboard 效果如下:

写入数据到文件
1
2
3
4
5
6
$ cd /home/connect/software/confluent
$ echo "{\"f1\": \"400\"}" > kafka-connect-ui-file-source.txt
$ tail -f kafka-connect-ui-file-sink.txt
{"f1": "400"}
# 可以看到数据已经落入到 File Sink 配置的本地文件中了
# 但是,数据格式和直接发送带 Schema 的数据给 Kafka,然后直接传输给 File Sink 的数据(Struct{f1=397}),有所不同
踩过的坑
  • org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.security.AccessControlException: Permission denied: user=connect, access=WRITE, inode=”/“:bigdata:supergroup:drwxr-xr-x

    • 解决

      创建 Kafka 2 HDFS 任务时,指定 logs.dir=/user/connect/logs 参数

  • Connector 和 HDFS 集群均未报错,但是数据无法落 HDFS

    • 解决

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      # 除了基本的配置外,
      connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
      topics.dir=/user/connect/topics
      flush.size=1
      topics=hdfs_sink_16
      tasks.max=1
      hdfs.url=hdfs://192.168.1.101:9000
      logs.dir=/user/connect/logs
      schema.cache.size=1
      # 对数据格式相关配置,进行显式指定
      format.class=io.confluent.connect.hdfs.avro.AvroFormat
      key.converter=io.confluent.connect.avro.AvroConverter
      key.converter.schema.registry.url=http://localhost:8081
      value.converter=io.confluent.connect.avro.AvroConverter
      value.converter.schema.registry.url=http://localhost:8081
  • Missing required configuration “schema.registry.url” which has no default value

    • 解决

      需要指定 schemas.enable=false 配置项

Tips: Live demo is here.

kafka-topic-ui
启动
1
2
3
4
5
6
7
8
9
10
# 下载源码
$ git clone https://github.com/Landoop/kafka-topics-ui.git
$ cd kafka-topics-ui
# 安装
$ npm install -g bower
$ npm install -g http-server
$ npm install
$ bower install
# 启动
$ http-server -p 8081 .
启动 Kafka Rest
1
2
3
4
5
6
7
8
9
$ vim etc/kafka-rest/kafka-rest.properties
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=192.168.1.101:2015,192.168.1.102:2015,192.168.1.103:2015
# 显式地指定 bootstrap server
bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
# 开启 Web 访问的权限
access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
access.control.allow.origin=*
效果图

Kafka Topics UI

踩过的坑
  • WARN Connection to node -1 could not be established. Broker may not be available.

    • 解决

      1
      2
      3
      $ vim etc/kafka-rest/kafka-rest.properties
      # 显式地指定 bootstrap server
      bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
schema-registry-ui
启动
1
2
3
4
5
6
7
# 下载源码
$ git clone https://github.com/Landoop/schema-registry-ui.git
$ cd schema-registry-ui
# 安装
$ npm install
# 启动
$ npm start
启动 Schema Registry
1
2
3
4
5
6
7
8
$ vim etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.1.101:2015,192.168.1.102:2015,192.168.1.103:2015
kafkastore.topic=_schemas
debug=false
# 开启 Web 访问的权限
access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
access.control.allow.origin=*
效果图

Kafka Schema Register UI

二次开发
1
2
3
4
5
# cmd 命令行中启动
$ npm start
# 拷贝浏览器中的 `http://localhost:8080/#/cluster/prod` 连接
# 并在 WebStorm 里,创建 `JavaScript Debug`,并粘贴 URL 连接,运行即可

使用社区 Connector

1
2
3
4
5
6
7
8
9
10
11
# 下载源码
$ git clone git@github.com:confluentinc/kafka-connect-hdfs.git
# 切换至稳定版本,并打包编译
$ cd kafka-connect-hdfs; git checkout v3.0.1; mvn package
# 在 plugins 目录下创建对应的 kafka-connect-hdfs 子目录
$ mkdir -p /usr/local/share/kafka/plugins/kafka-connect-hdfs
# 拷贝编译出来的 jar 包到 plugins 目录
$ cp target/kafka-connect-hdfs-3.0.1-package/share/java/kafka-connect-hdfs/* /usr/local/share/kafka/plugins/kafka-connect-hdfs/

实现自己的 Connector

Tips: Connector Developer Guide

修复已知 Bug

 使用 jira 语法查询出 Kafka Connect 组件的已知 bug,具体语法如下

1
project = KAFKA AND issuetype = Bug AND component = KafkaConnect

实用技巧

zkCli

CommandComment
get /consumers/<topic>/owners查看 topic 实时消费的 group id
get /consumers/<topic>/offsets/<group id>/<partitionor>查看 offset 情况(ctime: 创建时间; mtime: 修改时间)

kafka-run-class

删除 Topic

1
$ bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper <zk host>:2181,<zk host>:2181,<zk host>:2181 --topic <topic>

Consumer 指定 offset 进行消费,从而达到补数的效果

Java Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", false);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", maxPollRecords);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// consumer.subscribe(Arrays.asList(topic));
TopicPartition p = new TopicPartition(topic, 2);
consumer.assign(Arrays.asList(p));
consumer.seek(p, 1024);
// while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
String V = record.value();
// ...
}
// }

Shell Console

1
2
3
4
5
# 指定从哪个 offset 开始消费
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-1 --consumer.config config/consumer.properties --offset 90 --partition 0
# 将 offset 重置到某一个 offset
$ kafka-consumer-groups --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-offset 1024 --all-topics --execute

参考

哪些配置可以在 Kafka Connect 中进行指定

 New Consumer Configs

优雅地停止 Kafka 服务

命令

1
2
3
4
$ vim config/server.properties
controlled.shutdown.enable=true
$ bin/kafka-server-stop.sh

参考

性能优化

架构层面

RingBuffer + Lock Free

 使用 RingBuffer + Lock Free 实现高效的 Producer-Consumer 设计模式,极大地提升 Kafka Producer 发送 Message 的性能

Avro 压缩

 使用 Avro 压缩,保证了高效地解压缩数据的同时,可以减少网络传输的数据量

多 Partitioner + 连接池

 使用连接池可以动态地增减所需的 Kafka 连接,并自己实现 Kafka Partitioner 充分利用多 Partitioner 提高并发度

Tips: Full code is here.

参数层面

Producer

1
2
3
4
5
6
7
8
9
kafka.key.serializer.class=kafka.serializer.StringEncoder
kafka.serializer.class=kafka.serializer.DefaultEncoder
kafka.request.required.acks=0
kafka.async=async
kafka.queue.buffering.max.ms=5000
kafka.queue.buffering.max.messages=10000
kafka.queue.enqueue.timeout.ms=-1
kafka.batch.num.messages=200
kafka.send.buffer.bytes=102400

踩到的坑

Avro 中 Decimal 字段反序列化之后,全部变成了 [pos=0 lim=0 cap=0]

描述
1
2
3
4
// 正常进行 double 转 bytebuffer,并从 bytebuffer 转为 double 都是可以的
val bb: ByteBuffer = ByteBuffer.allocate(8).putDouble("666.8".toDouble)
val dd: Double = ByteBuffer.wrap(bb.array()).getDouble()
// 但是,传入 avro 类中,作为 decimal 类型的字段值,然后进行序列化 和 反序列化,就会出现问题
解决
  • 定义 decimal 类型字段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {
    "namespace": "com.yuzhouwan.bean",
    "type": "record",
    "name": "BeanA",
    "fields": [
    {"name": "id", "type": "int"},
    {"name": "price", "type": {"type": "bytes", "logicalType": "decimal", "precision": 8, "scale": 4}}
    ]
    }
  • 配置 Maven 自动解析 Avro 类

  • 初始化 Avro 类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.avro.{Conversions, LogicalTypes, Schema}
    val decimal: java.math.BigDecimal = java.math.BigDecimal.valueOf("666.8".toDouble)
    val logicalType: LogicalTypes.Decimal = LogicalTypes.decimal(8, 4)
    val schema: Schema = new Schema.Parser().parse(
    """
    |{
    | "namespace": "com.yuzhouwan.bean",
    | "type": "record",
    | "name": "BeanA",
    | "fields": [
    | {"name": "id", "type": "int"},
    | {"name": "price", "type": {"type": "bytes", "logicalType": "decimal", "precision": 8, "scale": 4}}
    | ]
    |}
    """.stripMargin)
    val conversion: ByteBuffer = decimalConversion.toBytes(decimal, schema, logicalType)
    val trans: Transaction = new Transaction(1, conversion)
  • 反序列化 decimal 字段

    1
    2
    3
    4
    val precision = schema.getJsonProp("precision")
    val scale = schema.getJsonProp("scale")
    val logicalType = LogicalTypes.decimal(precision.toString.toInt, scale.toString.toInt)
    val d: java.math.BigDecimal = decimalConversion.fromBytes(trans.getSale, schema, logicalType)
补充

 看到这里,是不是能感觉出来 decimal 使用起来还是很麻烦的,而且踩到的坑还远不止这一个。那我们不禁要想,为啥非要发明这么一个东西呢,直接用 float(单精度浮点数)、double(双精度浮点数)岂不是很省事?

 其实不然,因为一旦遇到 0.30000000000000004 问题,这些依照 IEEE 754 标准构建的浮点数(Java / Scala / Python…),都将束手无策。我们可以在命令行运行一个简单的例子,进行验证。这里以 Python 为例,运行 print(.1 + .2) 后会发现,得到的结果不是预期的 0.3 而是 0.30000000000000004。如果是第一次看到这个结果,想必一定会三观崩塌,开始怀疑是不是下载了一个假 Python。然而,一旦理解其中原理,也就不足为奇了。而想要摸清门道也很简单,只需通过基数连除连乘法,先将浮点数转换为二进制表示,再对其做加法操作即可。转换后,0.1 + 0.2 就变成了 0.00011001100110011001100110011001100110011001100110011001 + 0.00110011001100110011001100110011001100110011001100110011,运算后可以得到 0.01001100110011001100110011001100110011001100110011001100,再将二进制结果转换为十进制的浮点数,就是我们看到的 0.30000000000000004 了。从本质上来说,还是因为二进制是连续的,十进制是非连续的。为什么这么说呢?举个例子,小数点后四位用二进制表示时,数值范围为 0.00000.1111 之间。而对应到十进制,却只能表示出 0.06250.1250.250.5 这四个数值的组合结果。另外,有些十进制的小数,转换为二进制之后,还会变成了循环小数。但是,计算机里面浮点数只能存有限的位数,所以,同样也无法精确地表示

 这时候,我们再来看,如果用 decimal 则不会遇到这个问题:

1
2
3
4
5
6
>>> from decimal import *
>>> d = Decimal(".1") + Decimal(".2")
>>> d
Decimal('0.3')
>>> float(d)
0.3

 当然,除了上述精确计算的好处之外,decimal 还有便于控制精度、自动和输入保持精度一致,以及允许除以 0 得到 Infinity 等优点。如此看来,decimal 还真的不是一无是处呢 :D

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> Context(prec=6, Emax=999, clamp=1).create_decimal('1.23e999')
Decimal('1.23000E+999')
>>> 1.00 + 0.10
1.1
>>> Decimal("1.00") + Decimal("0.10")
Decimal('1.10')
>>> setcontext(ExtendedContext)
>>> Decimal(1) / Decimal(0)
Decimal('Infinity')
>>> Decimal(-1) / Decimal(0)
Decimal('-Infinity')
>>> Decimal(1) / Decimal(0) + Decimal(-1) / Decimal(0)
Decimal('NaN')

源码阅读

Kafka 阅读环境搭建

安装 gradle

 在 gradle 下载页面,下载 gradle-4.3.1-all.zip 文件,解压至 D:\apps\gradle,并添加环境变量 PATH=D:\apps\gradle\gradle-4.3.1\bin

1
2
3
4
5
6
7
8
9
10
11
12
13
# 检查是否安装成功
$ gradle -v
------------------------------------------------------------
Gradle 4.3.1
------------------------------------------------------------
Build time: 2017-11-08 08:59:45 UTC
Revision: e4f4804807ef7c2829da51877861ff06e07e006d
Groovy: 2.4.12
Ant: Apache Ant(TM) version 1.9.6 compiled on June 29 2015
JVM: 1.8.0_111 (Oracle Corporation 25.111-b14)
OS: Windows 7 6.1 amd64

gradle 代理设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ gradle xxx -Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=1080
# 或者修改 gradle.properties 配置文件
$ vim gradle.properties
systemProp.http.proxyHost=192.168.1.101
systemProp.http.proxyPort=8080
systemProp.http.nonProxyHosts=*.nonproxyrepos.com|localhost
systemProp.https.proxyHost=192.168.1.101
systemProp.https.proxyPort=8080
systemProp.https.nonProxyHosts=*.nonproxyrepos.com|localhost
#systemProp.http.proxyUser=userid
#systemProp.http.proxyPassword=password
# 【注意】如果在 Kafka 源码目录下修改的 gradle.properties 无法生效,可以直接拷贝到 $USER_HOME/.gradle 目录下

编译源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 下载依赖
$ gradle
> Configure project :
Building project 'core' with Scala version 2.11.11
Download https://repo1.maven.org/maven2/org/scoverage/scalac-scoverage-plugin_2.11/1.3.1/scalac-scoverage-plugin_2.11-1.3.1.pom
<===----------> 26% CONFIGURING [4m 11s]
> Resolve files of :core:scoverage
> IDLE
> scala-library-2.11.11.jar > 1.58 MB/5.48 MB downloaded
BUILD SUCCESSFUL in 7m 9s
# 增加 `-x test` 参数,可跳过单元测试
$ gradle -x test
# 随后,生成可以用 Intellij Idea 打开的工程
$ gradle idea

Tips: 如果 gradle 的配置文件里面,没有设置 idea 插件,导致项目模块无法被识别。则需要删除 .idea 文件夹,并以 build.gradle 打开为项目,即可

Kafka Connect

阅读环境搭建

 Kafka Connect 项目的 接口定义组件模块化 做得相当到位,整个工程被拆成很多子项目。同时,搭建源码阅读环境的时候,会发现很多依赖在中央仓库中是找不到的,这时候需要下载源码进行本地编译安装。首先,下载 kafka-connect-common / kafka-connect-storage-common 两个父工程,以及相关的依赖子工程 rest-utils / schema-registry 的源码。通过 git fetch --tags 命令将 tags 全部下载下来之后,选定一个稳定版本 (这里以 v3.3.1 为例),并创建新分支 v3.3.1,再在新分支下执行 git reset --hard <commit> 统一所有项目代码到 v3.3.1 版本 (这里还有一个依赖的子项目 aggdesigner 也需要本地编译安装,对应版本为 pentaho-aggdesigner-5.1.5-jhyde)。另外,Kafka 也用了一个非常规的版本 0.11.0.1-cp1,这里只需要选择一个略高的版本 0.11.0.1 保证兼容性即可(当然也可以在 Kafka trunk 分支下执行 gradlew installAll 命令编译出对应版本),最后,依次执行 mvn clean install 命令完成打包编译

主体架构

ConnectDistributed

 Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers and work is distributed among them. This is useful for running Connect as a service, where connectors can be submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker instances.

Connect

 This class ties together all the components of a Kafka Connect process (herder, worker, storage, command interface), managing their lifecycle.

Connector

 Connectors manage integration of Kafka Connect with another system, either as an input that ingests data into Kafka or an output that passes data to an external system. Implementations should not use this class directly; they should inherit from SourceConnector or SinkConnector.

 Connectors have two primary tasks. First, given some configuration, they are responsible for creating configurations for a set of {@link Task}s that split up the data processing. For example, a database Connector might create Tasks by dividing the set of tables evenly among tasks. Second, they are responsible for monitoring inputs for changes that require reconfiguration and notifying the Kafka Connect runtime via the ConnectorContext. Continuing the previous example, the connector might periodically check for new tables and notify Kafka Connect of additions and deletions. Kafka Connect will then request new configurations and update the running Tasks.

SinkConnector

 SourceConnectors implement the connector interface to pull data from another system and send it to Kafka.

SourceConnector

 SinkConnectors implement the Connector interface to send Kafka data to another system.

Herder

 The herder interface tracks and manages workers and connectors. It is the main interface for external components to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one of the workers.

 This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks, get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple wrappers of the functionality provided by this interface.

 In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case, the implementation will mainly be delegating tasks directly to other components. For example, when creating a new connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the same process, so the standalone herder implementation can immediately instantiate and start the connector and its tasks.

Worker

 Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
data to/from Kafka.

 Since each task has a dedicated thread, this is mainly just a container for them.

Task

 Tasks contain the code that actually copies data to/from another system. They receive a configuration from their parent Connector, assigning them a fraction of a Kafka Connect job´s work. The Kafka Connect framework then pushes/pulls data from the Task. The Task must also be able to respond to reconfiguration requests.

 Task only contains the minimal shared functionality between
{@link org.apache.kafka.connect.source.SourceTask} and
{@link org.apache.kafka.connect.sink.SinkTask}.

SourceTask

 SourceTask is a Task that pulls records from another system for storage in Kafka.

SinkTask

 SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the {@link #put(Collection)} API, which should either write them to the downstream system or batch them for later writing. Periodically, Connect will call {@link #flush(Map)} to ensure that batched records are actually pushed to the downstream system..

状态机

  • Initialization

    SinkTasks are first initialized using {@link #initialize(SinkTaskContext)} to prepare the task´s context and {@link #start(Map)} to accept configuration and start any services needed for processing.

  • Partition Assignment

    After initialization, Connect will assign the task a set of partitions using {@link #open(Collection)}. These partitions are owned exclusively by this task until they have been closed with {@link #close(Collection)}.

  • Record Processing

    Once partitions have been opened for writing, Connect will begin forwarding records from Kafka using the {@link #put(Collection)} API. Periodically, Connect will ask the task to flush records using {@link #flush(Map)} as described above.

  • Partition Rebalancing

    Occasionally, Connect will need to change the assignment of this task. When this happens, the currently assigned partitions will be closed with {@link #close(Collection)} and the new assignment will be opened using {@link #open(Collection)}.

  • Shutdown

    When the task needs to be shutdown, Connect will close active partitions (if there are any) and stop the task using {@link #stop()}

Storage
OffsetBackingStore

 OffsetBackingStore is an interface for storage backends that store key-value data. The backing store doesn´t need to handle serialization or deserialization. It only needs to support reading/writing bytes. Since it is expected these operations will require network operations, only bulk operations are supported.

 Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances that are associated with individual tasks, the caller must be sure keys include information about the connector so that the shared namespace does not result in conflicting keys.

  • KafkaOffsetBackingStore

    1
    2
    3
     Implementation of OffsetBackingStore that uses a Kafka topic to store offset data.
     Internally, this implementation both produces to and consumes from a Kafka topic which stores the offsets. It accepts producer and consumer overrides via its configuration but forces some settings to specific values to ensure correct behavior (e.g. acks, auto.offset.reset).
StatusBackingStore
  • KafkaStatusBackingStore

    1
    2
    3
    4
    5
    6
    7
    8
    9
     StatusBackingStore implementation which uses a compacted topic for storage of connector and task status information. When a state change is observed, the new state is written to the compacted topic. The new state will not be visible until it has been read back from the topic.
     In spite of their names, the putSafe() methods cannot guarantee the safety of the write (since Kafka itself cannot provide such guarantees currently), but it can avoid specific unsafe conditions. In particular, we putSafe() allows writes in the following conditions:
     1) It is (probably) safe to overwrite the state if there is no previous value.
     2) It is (probably) safe to overwrite the state if the previous value was set by a worker with the same workerId.
     3) It is (probably) safe to overwrite the previous state if the current generation is higher than the previous .
     Basically all these conditions do is reduce the window for conflicts. They obviously cannot take into account in-flight requests.
ConfigBackingStore
  • KafkaConfigBackingStore

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
     Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
     This class manages both connector and task configurations. It tracks three types of configuration entries:
     1. Connector config: map of string -> string configurations passed to the Connector class, with support for expanding this format if necessary. (Kafka key: connector-[connector-id]). These configs are *not* ephemeral. They represent the source of truth. If the entire Connect cluster goes down, this is all that is really needed to recover.
     2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding this format if necessary. (Kafka key: task-[connector-id]-[task-id]). These configs are ephemeral; they are stored here to
      a) disseminate them to all workers while ensuring agreement and
      b) to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply result in the same configuration as before the failure.
     3. Task commit "configs": records indicating that previous task config entries should be committed and all task configs for a connector can be applied. (Kafka key: commit-[connector-id]. This config has two effects. First, it records the number of tasks the connector is currently running (and can therefore increase/decrease parallelism). Second, because each task config is stored separately but they need to be applied together to ensure each partition is assigned to a single task, this record also indicates that task configs for the specified connector can be "applied" or "committed".
     This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows us to clean up outdated configurations over time. However, this combination has some important implications for the implementation of this class and the configuration state that it may expose.
     Connector configurations are independent of all other configs, so they are handled easily. Writing a single record is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any others, and they do not need to coordinate with the connector's task configuration at all.
     The most obvious implication for task configs is the need for the commit messages. Because Kafka does not currently have multi-record transactions or support atomic batch record writes, task commit messages are required to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs were applied immediately you could be using half the old configs and half the new configs. In that condition, some partitions may be double-assigned because the old config and new config may use completely different assignments. Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them once a commit message has been read.
     However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was always available, but we would like to be able to enable compaction so our configuration topic does not grow indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading from the beginning of the log in order to build up the full current configuration will see task commits, but some records required for those commits will have been removed because the same keys have subsequently been rewritten. For example, if you have a sequence of record keys "[connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]", we can end up with a compacted log containing "[connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]". When read back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
     Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario as the previous one, but in this case both the first and second update will write 2 task configs. However, the second write fails half of the way through: "[connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]". Now compaction occurs and we are left with "[connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]". At the first commit, we donot have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task configs for connector "foo".
     Because we can encounter these inconsistencies and addressing them requires support from the rest of the system (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data. This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle of updating task configurations).
     Note that the expectation is that this config storage system has only a single writer at a time. The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
     Since processing of the config log occurs in a background thread, callers must take care when using accessors. To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster. Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require synchronization across workers to commit offsets and update the configuration, callbacks and updates during the rebalance must be deferred.
RestServer

 Embedded server for the REST API that provides the control plane for Kafka Connect workers.

Plugin
PluginDesc
DelegatingClassLoader

Confluent 开源版本

模块架构图

(利用 Axure™ 绘制而成)
kafka-connect-common

 该项目是整个 Kafka Connect 工程的根目录级父工程,其中主要包含了,common 接口定义、配置读取、Metrics / JMX 指标设计、公共工具类,以及如何 build 编译、package 打包哪些文件、license 著作权标识 等工程类的部分

kafka-connect-storage-common

 官方对该项目定位描述为简单的一句话:”Kafka Connect common packages for connectors transferring data between Kafka and distributed filesystems or cloud storage”。实际包含了很多实质性内容,Common 模块、Core 模块、Format 模块、Partitioner 模块、WAL 模块、Hive 模块 等

Common 模块

 主要定义了 ComposableConfig / SchemaGenerator 接口,分别用于 配置组合 和 Schema 生成

Core 模块

 主要定义了 Storage 接口,用于分布式存储。基于 Storage 接口的不同实现,一个 Storage 对象可以是分布式文件系统里面的一个文件或者一个文件夹,也可以是对象存储系统中的一个存储对象。类似地,路径就对应于分布式文件系统中的实际路径和对象存储库中的查找键

Format 模块

 主要定义了 Format / SchemaFileReader / RecordWriter 三个接口,分别用于 存储类型格式化(e.g. Path in HDFS, String in S3)、从 Storage 中读取 Schema 和 如何将记录写入 Storage

Partitioner 模块

 该模块主要定义了 Partitioner 接口,用来对接收到的消息进行分区,生成对应的目录和文件名。支持 Default / Hourly / Daily / TimeBase / Field 和自定义的 Partitioner,另外 TimeBase 的 Partitioner 还同时支持 ingestion time / event time 两种时间类型

WAL 模块

 定义了 WAL 接口,用于规范 Kafka offset 信息如何编码到 WAL

Hive 模块

 主要定义了 HiveUtil 抽象类、HiveFactory 接口(HiveUtil 的工厂类) 和 Schema / Config / Exception / MetaStore 等相关实现类

kafka-connect-hdfs

 首先可以看出 State 枚举类中给出了 RECOVERY_STARTED, RECOVERY_PARTITION_PAUSED, WAL_APPLIED, WAL_TRUNCATED, OFFSET_RESET, WRITE_STARTED, WRITE_PARTITION_PAUSED, SHOULD_ROTATE, TEMP_FILE_CLOSED, WAL_APPENDED, FILE_COMMITTED 一共 11 种任务运行状态,写入的逻辑全都在 TopicPartitionWriterwrite 方法里面了,而异常恢复的逻辑,则在 recover 方法中实现

(利用 Axure™ 绘制而成)

落脚点

Kafka Connect 组件的 ClassLoader 隔离

 Worker 启动一个 Task

分布式情况下,如何做到在一个节点上,更新了 ClassLoader,广播到整个集群

 不需要广播,滚动升级,灰度发布,即可

社区跟进

 详见:《开源社区》

资料

Doc

Kafka Connect

Docker

Blog

Common

Kafka Connect

Introduction
Commity
Docker Image
Principle

Kafka Stream

Book

欢迎加入我们的技术群,一起交流学习

人工智能 1020982(高级)& 1217710(进阶)| BigData 1670647

  • 本文作者: Benedict Jin
  • 本文链接: https://yuzhouwan.com/posts/26002/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
显示 Gitment 评论