Kafka 是什么?
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
为什么要有 Kafka?
分布式
具备经济、快速、可靠、易扩充、数据共享、设备共享、通讯方便、灵活等,分布式所具备的特性
高吞吐量
同时为数据生产者和消费者提高吞吐量
高可靠性
支持多个消费者,当某个消费者失败的时候,能够自动负载均衡
离线 & 实时性
能将消息持久化,进行批量处理
解耦
作为各个系统连接的桥梁,避免系统之间的耦合
Kafka 工作机制
一些主要概念
Topic(主题)
A topic is a category or feed name to which messages are published.Producers(发布者)
Producers publish data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic.Consumers(订阅者)
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
示意图
Kafka Connect
Kafka Connect 是什么
Kafka Connect 是一款可扩展且稳定的、可在 Apache Kafka 和其他系统之间进行数据传输的框架。能够快速定义,将大量数据导入导出 Kafka 的连接器。Source Connector 可以接受整个数据库,将表转化为 Stream 更新到 Kafka Topic 中。也支持将应用服务器的指标收集到 Kafka Topic,使得数据可用于低延迟场景的流处理。Sink Connector 则可以将数据从 Kafka Topic 传送到搜索引擎(如 ElasticSearch)或离线分析系统(如 Hadoop)
特性
- Kafka Connector 通用框架,提供统一的 API 集成接口
- 同时支持分布式和单机模式
- 提供 RESTful 接口,用来查看和管理 Kafka Connectors
- 自动化的 Offset 管理
- 分布式、可扩展,基于现有的 Group 管理协议,可以通过增加 Task / Worker 实现动态扩展
- 更方便集成其他 流 / 批 处理系统
- 丰富的 Metrics 监控指标
- 支持 C / C++ / Go / Java / JMS / .Net / Python 等多种语言的客户端
设计目标
- 只专注于可靠、可扩展地同步数据(将转换、抽取等任务交给专门的数据处理框架)
- 尽可能地保持粗粒度(比如,同步数据库时,是将整个数据库作为默认的处理单元,而不是一张张表)
- 并行地数据同步(支持数据处理能力的自动扩展)
- 支持 exactly-once 强语义(包括,类似 HDFS 这样没有主键用于区分重复的存储系统)
- 当源系统提供了数据结构和类型之后,需要在数据同步过程中保存元数据
- API 定义需要简洁、可重用、易于理解,方便实现自定义的 Connector
- 同时支持单机开发测试,也支持分布式生产应用
基本概念
Connector
Connector 是为了方便协调数据流任务,而提出的高层抽象
Task
具体实现数据从 Kafka 导入导出的功能
同时具备 Task 自动容灾的能力(不过 Worker 本身不支持 自动重启和扩容进程资源的功能)
Worker
执行 Connector 和 Task 的进程
Converter
用于转化 Kafka Connect 和 其他系统之间交互的数据格式
Transform
对 Connector 接收和发送的数据进行一些简单的处理逻辑
架构
Connector model
Connector 模型,定义了 Kafka Connector 与外部系统的操作接口,包括 Connector 和 Task
Connector 是一个指定的 Connector
实现类,配置了需要拷贝哪些数据,以及如何处理数据的相关属性。Connector 实例由一组 Tasks 组成。Kafka Connect 实际管理 Tasks,Connector 只负责生成 Tasks,并在框架指定更新的时候,完成配置更改。Source
端 / Sink
端 的 Connectors
/Tasks
API 接口是独立定义的,主要为了能够保证 API 定义的简洁性
Worker model
Worker 模型,管理 Connector 和 Task 的生命周期,包括 启动、暂停、恢复、重启、停止 等
Kafka Connect 群集由一组 Worker 进程组成,这些进程是执行 Connector 和 Task 的容器。Worker 自动和其他分布式的 Worker 协调工作,提供可扩展性和容错性。同时 Worker 进程的资源管理可以托管于 Yarn / Mesos,配置管理可以与 Chef / Puppet 框架整合,生命周期管理也可以使用 Oozie / Falcon 等
Data model
数据模型,定义了 Kafka Connector 管理的数据结构、记录的序列化
Connectors 复制数据流从一个 partitioned 输入流到一个 partitioned 输出流,其中输入输出端至少有一个总是 Kafka。每一个 Stream 都是一个有序的数据流,流里面的消息都有一个对应的偏移量。这些偏移量的格式和语义由 Connector 定义,以支持与各种系统的集成;然而,为了在故障的情况下实现某些传递语义,需要确保 Stream 内的偏移是唯一的,并且流可以 seek 任意的偏移。同时,Kafka Connect 支持插件式的转换器,以便支持各种序列化格式,来存储这些数据。另外,Schema 是内置的,使得关于数据格式的重要元数据,得以在复杂的数据管道中传播。但是,当 Schema 不可用时,也可以使用无模式的数据
Quick Start
基础环境
1 | # 增加用户,并赋予其密码 |
Confluent
下载
在 https://www.confluent.io/download/ 页面中下载 confluent-oss-3.3.1-2.11.tar.gz 安装包
安装
1 | $ cd ~/install/ |
启动
1 | # 启动 Zookeeper, Kafka, Schema Registry |
发送 avro 数据
1 | $ cd ~/software/confluent/ |
接受 avro 数据
1 | $ ./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning |
发送数据格式不对的 avro 数据
1 | $ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"int"}' |
停止
1 | $ confluent stop schema-registry |
Kafka Connect
启动 confluent
1 | $ confluent start |
查看 connect 日志
1 | $ confluent log connect |
查看支持的 connect 类型
1 | $ confluent list connectors |
使用 file-source
1 | # 配置 file-source |
使用 file-sink
1 | # 配置 file-sink |
清理工作
1 | # 卸载 file-source / file-sink connector |
支持的组件
Kafka 2 HDFS
特性
Exactly Once Delivery
Connector 使用 WAL 文件保证每条导入到 HDFS 的数据都是有且仅有一条的
通过将 Kafka offset 信息编码到 WAL 文件中,就可以在 task 失败或者重启的时候,获取到最后一条提交的 offset 了
Extensible Data Format
原生支持 Avro
/ Parquet
格式,其他数据格式,可以通过扩展 Format
类获得支持
Hive Integration
支持 Hive 的整合
激活该功能后,Connector 会自动给每一个导入 HDFS 的 topic 创建 Hive 的外部分区表
Schema Evolution
支持模式演进和不同的 Schema 兼容级别
整合 Hive 时,支持给 schema.compatibility
配置 BACKWARD
/ FORWARD
/ FULL
三种模式
Hive 表可以查询同一个 Topic 下,以不同 Schema 写入的所有表数据
Secure HDFS and Hive Metastore Support
支持 Kerberos
权限控制,所以可以和带权限控制的 HDFS 或 Hive metastore 进行整合
Pluggable Partitioner
支持默认的 Partitioner、基于 field 的 Partitioner、基于时间的 Partitioner(包括 daily / hourly 等粒度)
可以实现 Partitioner
类扩展自己的 Partitioner
也可以实现 TimeBasedPartitioner
类扩展基于时间的 Partitioner
实战 HDFS
安装 HDFS
此处略,详见我的另一篇博客《Apache Eagle》
配置 Confluent
1 | $ vim etc/kafka-connect-hdfs/quickstart-hdfs.properties |
启用
1 | $ confluent start |
验证
1 | $ hadoop fs -ls /topics/test_hdfs/partition=0 |
实战 Hive
安装 Hive
a) 下载 apache-hive-2.1.1-bin.tar.gz
b) 修改环境变量
1 | $ vim ~/.bashrc |
c) 修改配置文件
1 | $ vim hive-env.sh |
1 | $ vim hive-site.xml |
d) 初始化和启动运行
1 | # 初始化 |
配置 Confluent
1 | $ vim etc/kafka-connect-hdfs/quickstart-hdfs.properties |
启用
1 | $ confluent load hdfs-sink-18 -d etc/kafka-connect-hdfs/quickstart-schema.properties |
验证
1 | $ bin/hive |
Kafka 2 ElasticSearch
特性
Exactly Once Delivery
Connect 使用 ElasticSearch 的幂等写入语句和设置 ES Document IDs,来确保写入 ES 的数据都是有且仅有一条的
如果 Kafka 消息里面包含了 Key 值,那么这些 Key 值会自动转化为 ES Document IDs;相反,如果 Kafka 消息里面没有包含这些 Key 值,或者是明确指定不要使用 Kafka 消息里面的 Key 值,Kafka Connect 将会自动使用 topic+partition+offset
作为 Key 值,以确保每一条写入 ES 的数据,都有一个唯一对应的 Document
Mapping Inference
启用该功能时,Connector 可以自动依据 Schema Register 来推断 ES mapping 结构
但是,该功能的推断受限于 field 类型和默认值。如果需要增加更多的限制(例如,用户自定义的解析器),则需要自己手动创建 mapping
Schema Evolution
支持模式演进和不同的 Schema 兼容级别
可以处理一些模式不兼容的更改,比如
增加字段
当增加一个或者多个 field 到 Kafka 消息中,并且 ES 开启了
dynamic mapping
功能,那么 ES 将会自动增加新的 field 到 mapping 中移除字段
当从 Kafka 消息中移除一个或者多个 field 时,则这些缺失的值,将会被作为
null
值进行处理更改字段类型
例如将一个 field 从
string
改成integer
类型,ES 将会自动将string
转化为integer
Delivery Semantics
Connector 支持 batch 和 pipeline 两种写入 ES 的模式,以此来增加吞吐量。batch 模式下,允许并行地处理多个 batch
通过使用分区级 Kafka 偏移量作为 ES 文档版本,并使用 version_mode = external
配置,来确保文档级(Document-level)更新顺序
Reindexing with Zero Downtime
利用 ES 的 Index Aliases 接口,可以完成零停机 reindexing 操作(ES 本身提供了 Reindex 接口,不过性能不高),具体对 ES 集群操作的步骤如下
- Create an alias for the index with the old mapping.
- The applications that uses the index are pointed to the alias.
- Create a new index with the updated mapping.
- Move data from old to the new index.
- Atomically move the alias to the new index.
- Delete the old index.
为了保证用户无感知,需要在 reindexing 期间,仍然可以处理写数据的请求。但是别名是无法同时往新旧 index 写入数据的。为了解决这个问题,可以使用两个 ElasticSearch Connector 同时将相同的数据,写入到新旧 index 中,具体对 ES Connector 的操作步骤如下
- The connector job that ingest data to the old indices continue writing to the old indices.
- Create a new connector job that writes to new indices. This will copy both some old data and new data to the new indices as long as the data is in Kafka.
- Once the data in the old indices are moved to the new indices by the reindexing process, we can stop the old connector job.
实战
安装 ElasticSearch
此处略写,详见我的另一篇博客《ElasticSearch》
1 | # 下载 |
配置 Confluent
1 | $ vim etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties |
启用
1 | $ cd ~/software/confluent |
验证
1 | $ curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty' |
踩过的坑
max file descriptors [32768] for elasticsearch process is too low, increase to at least [65536]
- 解决
1 | $ su root |
Kafka 2 Druid
官方决定使用 Kafka Indexing Service 方案,方便构建自己的生态圈。不过,目前的 Kafka Indexing Service 仍然存在一定的缺陷。因此,Kafka Connect 2 Druid 功能需要自己基于 Druid Tranquility 组件进行定制开发。同时,可以在关闭 Kafka Auto Commit 功能后,自己控制事务提交,来保证不丢不重,进而弥补了 Tranquility 组件没有 Exactly-once 特性的缺陷
Tips: Stream Reactor 中基于 Tranquility 实现了 K2D
Kafka 2 Kafka
Connect Replicator 可以轻松可靠地将 Topic 从一个 Kafka 集群复制到另一个。除了复制消息之外,此连接器还会根据需要创建主题,以保留源群集中的 Topic 配置。这包括保留分区数量,副本数以及为单个 Topic 指定对任何配置的覆盖
下图显示了一个典型的多数据中心(Data Center)部署,其中来自位于不同数据中心的两个 Kafka 群集的数据聚合在位于另一个数据中心的单独群集中。这里,复制数据的来源称为源簇,而复制数据的目标称为目的地
特性
- Topic 的选择,可以使用白名单、黑名单和正则表达式
- 支持使用匹配的 Partition 数量、 Replicaton 因子和 Topic 配置覆盖,在目标集群动态创建 Topic
- 当新的 partition 被加入到源集群之后,目标集群会自动扩容对应的 Topic
- 其他源集群的配置更改,都会被自动同步到目标集群
单机版 Worker
启动
1 | $ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...] |
配置
1 | # 存储 connector 的 offset |
Tips: 单机模式下,偏移量 offset 保存在 /tmp/connect.offset
中
分布式 Worker
启动 Confluent
1 | $ cd ~/software/confluent |
创建 Topic
1 | # 获取帮助文档 |
删除 Topic
1 | $ vim etc/kafka/server.properties |
Connector 和 Task 状态
所有 Connector 和隶属于这些 Connector 的 Tasks,都通过 status.storage.topic
发布状态更新。因为 Worker 消费 status.storage.topic
是异步的,所以从 API 获取最新状态的时候,会存在一些短暂的延迟
Connector 或者某个 Task 的状态,可能为:
UNASSIGNED
Connector 或者 Task 尚未被分配到某个 Worker 上
RUNNING
运行中
PAUSED
Connector 或者 Task 被管理员暂停
FAILED
通常因为出现异常,导致 Connector 或者 Task 失败
一般 Pause / Resume API 的操作,适用的场景是 消费端系统需要维护,通过停止掉 Kafka Connector,来避免数据一直被积压着。同时,停止操作不是临时的,即便重启了集群,仍需要再次操作 Resume 接口,暂停的 Connector 才会恢复到 Running 状态。另外,FAILED 状态的任务是不允许执行 Pause 操作的,需要重启恢复到 Running 状态才行
Worker 之间如何建立通讯
只要保证各个 Worker 使用的是统一的 group.id
(可以看做是 Cluster Name),还有 config.storage.topic
、offset.storage.topic
和 status.storage.topic
三个 Topic 也需要保持一致,Worker 就会自动发现同一个集群中的其他 Worker 进程
Relance 机制
1 | [2017-12-04 11:32:18,607] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1187) |
完全分布式
替换原生的 Zookeeper
具体安装过程略,详见我的另一篇博客《Zookeeper 原理与优化》
1 | $ cd /home/connect/software/confluent |
替换原生的 Kafka
1 | # 创建日志目录 |
更新 Kafka Rest 配置
1 | $ cd /home/connect/software/confluent |
更新 Schema Register 配置
1 | $ cd /home/connect/software/confluent |
更新 Worker 配置
1 | $ vim /home/connect/software/confluent/etc/kafka/connect-distributed.properties |
踩过的坑
WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!
解决
1 | $ vim .ssh/known_hosts |
Timed out while checking for or creating topic(s) ‘connect-offsets’
描述
1 | [2017-12-01 17:39:13,503] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:206) |
参考
- KAFKA-4942: Kafka Connect: Offset committing times out before expected
- KAFKA-4942: Fixed the commitTimeoutMs being set before the commit actually started #2730
- KAFKA-4942: Fix commitTimeoutMs being set before the commit actually started #2912
Request to leader to reconfigure connector tasks failed
描述
1 | [2018-03-13 10:30:28,303] ERROR Unexpected error during connector task reconfiguration: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:933) |
解决
1 | $ vim etc/kafka/connect-distributed.properties |
参考
io.confluent.kafka.schemaregistry.client.rest.RestService Connection refused
描述
1 | $ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic kafka-connect-ui-file-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' |
1 | [2018-03-13 10:48:24,211] ERROR Failed to send HTTP request to endpoint: http://localhost:8081/subjects/kafka-connect-ui-file-sink-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService:156) |
解决
1 | # 检查 schema register 进程是否存在 |
Docker 镜像
Docker 环境安装
更新镜像源
1 | # 添加国内 yum 源 |
安装 Docker-CE
1 | # 安装 依赖包 |
启动
1 | $ sudo systemctl enable docker |
建立 Docker 用户组
1 | # 建立 docker 组 |
MySQL - Kafka Connect - HDFS 实战
下载
MySQL - Kafka Connect - HDFS
的集成环境,官方已经提供了 kafka_connect_blog.ova 镜像文件
安装
VirtualBox
Vagrant 来管理和安装 VirtualBox 虚拟机,相关安装步骤,详见我的另一篇博客《Python》
导入虚拟机镜像
选择合适的资源,导入 kafka_connect_blog.ova
文件即可
默认用户名密码均为 vagrant
操作虚拟机镜像
1 | # 更新下载源索引 |
Tips: 机器配置过低 和 网络代理受阻 的原因,未完待续…
踩过的坑
bridge-nf-call-iptables is disabled
描述
1 | WARNING: bridge-nf-call-iptables is disabled |
解决
1 | # 添加内核配置参数 |
常用配置
Worker 通用配置
bootstrap.servers
1 | # 建立到 Kafka 的初始连接 |
[key | value].converter
1 | # 指定 Kafka 中的数据如何转为到 Connect |
internal.[key | value].converter
1 | # 指定 Connect 内部的数据转化 |
rest.host.name & rest.port
1 | # 配置 RESTful 服务的 IP 和 Port |
plugin.path
1 | # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins |
分布式 Worker 配置
group.id
1 | # The group ID is a unique identifier for the set of workers that form a single Kafka Connect cluster |
[config | offset | status].storage.topic
1 | # Internal Storage Topics. |
[config | offset | status].storage.replication.factor
1 | # The following properties set the replication factor for the three internal topics, defaulting to 3 for each |
[offset | status].storage.partitions
1 | # The config storage topic must have a single partition, and this cannot be changed via properties. |
Connector 配置
说明
1 | # Unique name for the connector. Attempting to register again with the same name will fail. |
实例
单机版 Connector 配置
1 | name=local-file-sink |
分布式 Connector 配置
1 | $ curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors |
RESTful 接口
说明
Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083.
The following are the currently supported endpoints:
GET /connectors
- return a list of active connectorsPOST /connectors
- create a new connector; the request body should be a JSON object containing a stringname
field and an objectconfig
field with the connector configuration parametersGET /connectors/{name}
- get information about a specific connectorGET /connectors/{name}/config
- get the configuration parameters for a specific connectorPUT /connectors/{name}/config
- update the configuration parameters for a specific connectorGET /connectors/{name}/status
- get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasksGET /connectors/{name}/tasks
- get a list of tasks currently running for a connectorGET /connectors/{name}/tasks/{taskid}/status
- get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failedPUT /connectors/{name}/pause
- pause the connector and its tasks, which stops message processing until the connector is resumedPUT /connectors/{name}/resume
- resume a paused connector (or do nothing if the connector is not paused)POST /connectors/{name}/restart
- restart a connector (typically because it has failed)POST /connectors/{name}/tasks/{taskId}/restart
- restart an individual task (typically because it has failed)DELETE /connectors/{name}
- delete a connector, halting all tasks and deleting its configuration
Kafka Connect also provides a REST API for getting information about connector plugins:
GET /connector-plugins
- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jarsPUT /connector-plugins/{connector-type}/config/validate
- validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.
实例
Worker 版本信息
1 | $ curl localhost:8083/ | jq |
Worker 支持的 Connector 插件
1 | $ curl localhost:8083/connector-plugins | jq |
Worker 上激活的 Connector 插件
1 | $ curl localhost:8083/connectors | jq |
重启 Connector
1 | $ curl -X POST localhost:8083/connectors/file-sink/restart |
获得某个 Connector上的所有 Tasks
1 | $ curl localhost:8083/connectors/file-sink/tasks | jq |
重启 Task
1 | $ curl -X POST localhost:8083/connectors/file-sink/tasks/0/restart | jq |
暂停 Connector
1 | $ curl -X PUT localhost:8083/connectors/file-sink/pause | jq |
恢复 Connector
1 | $ curl -X PUT localhost:8083/connectors/file-sink/resume | jq |
更新 Connector 配置信息
1 | $ curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"FileStreamSinkConnector","file":"test.sink.txt","tasks.max":"2","topics":"connect-test","name":"local-file-sink"}' localhost:8083/connectors/local-file-sink/config |
获取 Connector 状态
1 | $ curl localhost:8083/connectors/file-sink/status | jq |
获取 Connector 配置信息
1 | $ curl localhost:8083/connectors/file-sink | jq |
删除 Connector
1 | $ curl -X DELETE localhost:8083/connectors/file-sink |
Tips: 每个 Connector 进程在启动的时候,都会内置地启动一个 REST 服务端(默认端口 8083)
Schema Registry
常用命令
1 | # 列出所有 Schema |
可视化
Confluent UI
未开源
Landoop UI
kafka-connect-ui
启动
1 | # 下载源码 |
开启 Confluent Rest
1 | # 打开 REST 接口 |
创建 File Sink
选择 Kafka Connect 集群
查看 Dashboard
点击 Create 按钮
配置 Connector
创建成功
查看 Connector 详情
发送数据
1 | $ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic kafka-connect-ui-file-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' |
1 | # 持续发送 |
接收成功
1 | $ cat ./software/confluent-3.3.1/kafka-connect-ui-file-sink.txt |
创建 File Source
1 | # 具体操作步骤,参考上述 File Sink 创建过程,配置如下 |
查看 Dashboard 效果如下:
写入数据到文件
1 | $ cd /home/connect/software/confluent |
踩过的坑
描述
1 | org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.security.AccessControlException: Permission denied: user=connect, access=WRITE, inode="/":bigdata:supergroup:drwxr-xr-x |
解决
创建 Kafka 2 HDFS 任务时,指定 logs.dir=/user/connect/logs
参数
a) Connector 和 HDFS 集群均未报错,但是数据无法落 HDFS
1 | # 除了基本的配置外, |
b) Missing required configuration “schema.registry.url” which has no default value
需要指定 schemas.enable=false
配置项
Tips: Live demo is here.
kafka-topic-ui
启动
1 | # 下载源码 |
启动 Kafka Rest
1 | $ vim etc/kafka-rest/kafka-rest.properties |
效果图
踩过的坑
描述
1 | WARN Connection to node -1 could not be established. Broker may not be available. |
解决
1 | $ vim etc/kafka-rest/kafka-rest.properties |
schema-registry-ui
启动
1 | # 下载源码 |
启动 Schema Registry
1 | $ vim etc/schema-registry/schema-registry.properties |
效果图
二次开发
1 | # cmd 命令行中启动 |
使用社区 Connector
1 | # 下载源码 |
实现自己的 Connector
Tips: Connector Developer Guide
修复已知 Bug
使用 jira 语法查询出 Kafka Connect 组件的已知 bug,具体语法如下
1 | project = KAFKA AND issuetype = Bug AND component = KafkaConnect |
实用技巧
zkCli
Command | Comment |
---|---|
get /consumers/<topic> /owners | 查看 topic 实时消费的 group id |
get /consumers/<topic> /offsets/<group id> /<partitionor> | 查看 offset 情况(ctime: 创建时间; mtime: 修改时间) |
kafka-run-class
删除 Topic
1 | $ bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper <zk host>:2181,<zk host>:2181,<zk host>:2181 --topic <topic> |
Consumer 指定 offset 进行消费,从而达到补数的效果
Java Client
1 | Properties props = new Properties(); |
Shell Console
1 | # 指定从哪个 offset 开始消费 |
参考
哪些配置可以在 Kafka Connect 中进行指定
优雅地停止 Kafka 服务
命令
1 | $ vim config/server.properties |
参考
性能优化
架构层面
RingBuffer + Lock Free
使用 RingBuffer + Lock Free
实现高效的 Producer-Consumer
设计模式,极大地提升 Kafka Producer 发送 Message 的性能
Avro 压缩
使用 Avro 压缩,保证了高效地解压缩数据的同时,可以减少网络传输的数据量
多 Partitioner + 连接池
使用连接池可以动态地增减所需的 Kafka 连接,并自己实现 Kafka Partitioner 充分利用多 Partitioner 提高并发度
Tips: Full code is here.
参数层面
Producer
1 | kafka.key.serializer.class=kafka.serializer.StringEncoder |
踩到的坑
Avro 中 Decimal 字段反序列化之后,全部变成了 [pos=0 lim=0 cap=0]
描述
1 | // 正常进行 double 转 bytebuffer,并从 bytebuffer 转为 double 都是可以的 |
解决
- 定义 decimal 类型字段
1 | { |
配置 Maven 自动解析 Avro 类
初始化 Avro 类
1 | import org.apache.avro.{Conversions, LogicalTypes, Schema} |
- 反序列化 decimal 字段
1 | val precision = schema.getJsonProp("precision") |
补充
看到这里,是不是能感觉出来 decimal 使用起来还是很麻烦的,而且踩到的坑还远不止这一个。那我们不禁要想,为啥非要发明这么一个东西呢,直接用 float(单精度浮点数)、double(双精度浮点数)岂不是很省事?
其实不然,因为一旦遇到 0.30000000000000004 问题,这些依照 IEEE 754 标准构建的浮点数(Java / Scala / Python…),都将束手无策。我们可以在命令行运行一个简单的例子,进行验证。这里以 Python 为例,运行 .1 + .2
后会发现,得到的结果不是预期的 0.3
而是 0.30000000000000004
。如果是第一次看到这个结果,想必一定会三观崩塌,开始怀疑是不是下载了一个假 Python。然而,一旦理解其中原理,也就不足为奇了。而想要摸清门道也很简单,只需通过基数连除和连乘法,先将浮点数转换为二进制表示,再对其做加法操作即可。转换后,0.1 + 0.2
就变成了 0.00011001100110011001100110011001100110011001100110011001
+ 0.00110011001100110011001100110011001100110011001100110011
,运算后可以得到 0.01001100110011001100110011001100110011001100110011001100
,再将二进制结果转换为十进制的浮点数,就是我们看到的 0.30000000000000004
了。从本质上来说,还是因为二进制是连续的,十进制是非连续的。为什么这么说呢?举个例子,小数点后四位用二进制表示时,数值范围为 0.0000
到 0.1111
之间。而对应到十进制,却只能表示出 0.0625
、0.125
、0.25
和 0.5
这四个数值的组合结果。另外,有些十进制的小数,转换为二进制之后,还会变成了循环小数。但是,计算机里面浮点数只能存有限的位数,所以,同样也无法精确地表示
这时候,我们再来看,如果用 decimal 则不会遇到这个问题:
1 | from decimal import * |
当然,除了上述精确计算的好处之外,decimal 还有便于控制精度、自动和输入保持精度一致,以及允许除以 0 得到 Infinity 等优点。如此看来,decimal 还真的不是一无是处呢 :D
1 | 6, Emax=999, clamp=1).create_decimal('1.23e999') Context(prec= |
源码阅读
Kafka 阅读环境搭建
安装 gradle
在 gradle 下载页面,下载 gradle-4.3.1-all.zip 文件,解压至 D:\apps\gradle
,并添加环境变量 PATH=D:\apps\gradle\gradle-4.3.1\bin
1 | # 检查是否安装成功 |
gradle 代理设置
1 | $ gradle xxx -Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=1080 |
编译源码
1 | # 下载依赖 |
Tips: 如果 gradle 的配置文件里面,没有设置 idea 插件,导致项目模块无法被识别。则需要删除 .idea
文件夹,并以 build.gradle
打开为项目,即可
Kafka Connect
阅读环境搭建
Kafka Connect 项目的 接口定义 和 组件模块化 做得相当到位,整个工程被拆成很多子项目。同时,搭建源码阅读环境的时候,会发现很多依赖在中央仓库中是找不到的,这时候需要下载源码进行本地编译安装。首先,下载 kafka-connect-common / kafka-connect-storage-common 两个父工程,以及相关的依赖子工程 rest-utils / schema-registry 的源码。通过 git fetch --tags
命令将 tags 全部下载下来之后,选定一个稳定版本 (这里以 v3.3.1 为例),并创建新分支 v3.3.1
,再在新分支下执行 git reset --hard <commit>
统一所有项目代码到 v3.3.1 版本 (这里还有一个依赖的子项目 aggdesigner 也需要本地编译安装,对应版本为 pentaho-aggdesigner-5.1.5-jhyde
)。另外,Kafka 也用了一个非常规的版本 0.11.0.1-cp1
,这里只需要选择一个略高的版本 0.11.0.1
保证兼容性即可(当然也可以在 Kafka trunk 分支下执行 gradlew installAll
命令编译出对应版本),最后,依次执行 mvn clean install
命令完成打包编译
主体架构
ConnectDistributed
Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers and work is distributed among them. This is useful for running Connect as a service, where connectors can be submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker instances.
Connect
This class ties together all the components of a Kafka Connect process (herder, worker, storage, command interface), managing their lifecycle.
Connector
Connectors manage integration of Kafka Connect with another system, either as an input that ingests data into Kafka or an output that passes data to an external system. Implementations should not use this class directly; they should inherit from SourceConnector or SinkConnector.
Connectors have two primary tasks. First, given some configuration, they are responsible for creating configurations for a set of {@link Task}s
that split up the data processing. For example, a database Connector might create Tasks by dividing the set of tables evenly among tasks. Second, they are responsible for monitoring inputs for changes that require reconfiguration and notifying the Kafka Connect runtime via the ConnectorContext. Continuing the previous example, the connector might periodically check for new tables and notify Kafka Connect of additions and deletions. Kafka Connect will then request new configurations and update the running Tasks.
SinkConnector
SourceConnectors implement the connector interface to pull data from another system and send it to Kafka.
SourceConnector
SinkConnectors implement the Connector interface to send Kafka data to another system.
Herder
The herder interface tracks and manages workers and connectors. It is the main interface for external components to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one of the workers.
This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks, get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple wrappers of the functionality provided by this interface.
In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case, the implementation will mainly be delegating tasks directly to other components. For example, when creating a new connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the same process, so the standalone herder implementation can immediately instantiate and start the connector and its tasks.
Worker
Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
data to/from Kafka.
Since each task has a dedicated thread, this is mainly just a container for them.
Task
Tasks contain the code that actually copies data to/from another system. They receive a configuration from their parent Connector, assigning them a fraction of a Kafka Connect job´s work. The Kafka Connect framework then pushes/pulls data from the Task. The Task must also be able to respond to reconfiguration requests.
Task only contains the minimal shared functionality between{@link org.apache.kafka.connect.source.SourceTask}
and{@link org.apache.kafka.connect.sink.SinkTask}
.
SourceTask
SourceTask is a Task that pulls records from another system for storage in Kafka.
SinkTask
SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the {@link #put(Collection)} API, which should either write them to the downstream system or batch them for later writing. Periodically, Connect will call {@link #flush(Map)} to ensure that batched records are actually pushed to the downstream system..
状态机
Initialization
SinkTasks are first initialized using
{@link #initialize(SinkTaskContext)}
to prepare the task´s context and{@link #start(Map)}
to accept configuration and start any services needed for processing.Partition Assignment
After initialization, Connect will assign the task a set of partitions using
{@link #open(Collection)}
. These partitions are owned exclusively by this task until they have been closed with{@link #close(Collection)}
.Record Processing
Once partitions have been opened for writing, Connect will begin forwarding records from Kafka using the
{@link #put(Collection)}
API. Periodically, Connect will ask the task to flush records using{@link #flush(Map)}
as described above.Partition Rebalancing
Occasionally, Connect will need to change the assignment of this task. When this happens, the currently assigned partitions will be closed with
{@link #close(Collection)}
and the new assignment will be opened using{@link #open(Collection)}
.Shutdown
When the task needs to be shutdown, Connect will close active partitions (if there are any) and stop the task using
{@link #stop()}
Storage
OffsetBackingStore
OffsetBackingStore is an interface for storage backends that store key-value data. The backing store doesn´t need to handle serialization or deserialization. It only needs to support reading/writing bytes. Since it is expected these operations will require network operations, only bulk operations are supported.
Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances that are associated with individual tasks, the caller must be sure keys include information about the connector so that the shared namespace does not result in conflicting keys.
- KafkaOffsetBackingStore
1 | Implementation of OffsetBackingStore that uses a Kafka topic to store offset data. |
StatusBackingStore
- KafkaStatusBackingStore
1 | StatusBackingStore implementation which uses a compacted topic for storage of connector and task status information. When a state change is observed, the new state is written to the compacted topic. The new state will not be visible until it has been read back from the topic. |
ConfigBackingStore
- KafkaConfigBackingStore
1 | Provides persistent storage of Kafka Connect connector configurations in a Kafka topic. |
RestServer
Embedded server for the REST API that provides the control plane for Kafka Connect workers.
Plugin
PluginDesc
DelegatingClassLoader
Confluent 开源版本
模块架构图
kafka-connect-common
该项目是整个 Kafka Connect 工程的根目录级父工程,其中主要包含了,common 接口定义、配置读取、Metrics / JMX 指标设计、公共工具类,以及如何 build 编译、package 打包哪些文件、license 著作权标识 等工程类的部分
kafka-connect-storage-common
官方对该项目定位描述为简单的一句话:”Kafka Connect common packages for connectors transferring data between Kafka and distributed filesystems or cloud storage”。实际包含了很多实质性内容,Common 模块、Core 模块、Format 模块、Partitioner 模块、WAL 模块、Hive 模块 等
Common 模块
主要定义了 ComposableConfig / SchemaGenerator 接口,分别用于 配置组合 和 Schema 生成
Core 模块
主要定义了 Storage 接口,用于分布式存储。基于 Storage 接口的不同实现,一个 Storage 对象可以是分布式文件系统里面的一个文件或者一个文件夹,也可以是对象存储系统中的一个存储对象。类似地,路径就对应于分布式文件系统中的实际路径和对象存储库中的查找键
Format 模块
主要定义了 Format / SchemaFileReader / RecordWriter 三个接口,分别用于 存储类型格式化(e.g. Path in HDFS, String in S3)、从 Storage 中读取 Schema 和 如何将记录写入 Storage
Partitioner 模块
该模块主要定义了 Partitioner 接口,用来对接收到的消息进行分区,生成对应的目录和文件名。支持 Default / Hourly / Daily / TimeBase / Field 和自定义的 Partitioner,另外 TimeBase 的 Partitioner 还同时支持 ingestion time / event time 两种时间类型
WAL 模块
定义了 WAL 接口,用于规范 Kafka offset 信息如何编码到 WAL
Hive 模块
主要定义了 HiveUtil 抽象类、HiveFactory 接口(HiveUtil 的工厂类) 和 Schema / Config / Exception / MetaStore 等相关实现类
kafka-connect-hdfs
首先可以看出 State 枚举类中给出了 RECOVERY_STARTED
, RECOVERY_PARTITION_PAUSED
, WAL_APPLIED
, WAL_TRUNCATED
, OFFSET_RESET
, WRITE_STARTED
, WRITE_PARTITION_PAUSED
, SHOULD_ROTATE
, TEMP_FILE_CLOSED, WAL_APPENDED
, FILE_COMMITTED
一共 11 种任务运行状态,写入的逻辑全都在 TopicPartitionWriter
的 write
方法里面了,而异常恢复的逻辑,则在 recover
方法中实现
落脚点
Kafka Connect 组件的 ClassLoader 隔离
Worker 启动一个 Task
分布式情况下,如何做到在一个节点上,更新了 ClassLoader,广播到整个集群
不需要广播,滚动升级,灰度发布,即可
社区跟进
详见:《开源社区》
资料
Doc
Kafka Connect
- Kafka Connect Doc
- Kafka Connect 3.x
- Kafka Connect 4.x
- Kafka Connect Connector Developer Guide
- Kafka Connect Upgrade
- Kafka / Kafka Connect / KCQL 版本依赖
- Development Environment
- Kafka 2 HDFS Config
- Kafka 2 ElasticSearch Config
- Schema Deletion Guidelines
- KCQL Syntax
- All Worker Configs
Docker
Blog
Common
Kafka Connect
Commity
- TimeBasedPartitioner according to specified field #98
- When I try to compile I get an error about a missing SNAPSHOT dependency
- Web tool for Kafka Connect
- 阿里 RocketMQ 是怎样孵化成 Apache 顶级项目的?
- KIP-128: Add ByteArrayConverter for Kafka Connect
- Streaming reference architecture for ETL with Kafka and Kafka-Connect
- Web tool for Kafka Connect
- 热心的大神:antwnis
- landoop-community.slack.com
- confluentcommunity.slack.com
Docker Image
Principle
Kafka Stream
Book
- Apache Kafka
- Apache Kafka Cookbook
- Kafka: The Definitive Guide
- Kafka 技术内幕:图文详解 Kafka 源码设计与实现
- Apache Kafka 源码剖析