Storm 与 Kafka 的整合之二:Kafka

系列文章:


Kafka 是什么?

 Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design – Official website

为什么要有 Kafka?

分布式

 具备经济、快速、可靠、易扩充、数据共享、设备共享、通讯方便、灵活等,分布式所具备的特性

高吞吐量

 同时为数据生产者和消费者提高吞吐量

高可靠性

 支持多个消费者,当某个消费者失败的时候,能够自动负载均衡

离线 & 实时性

 能将消息持久化,进行批量处理

解耦

 作为各个系统连接的桥梁,避免系统之间的耦合

Kafka 工作机制

一些主要概念

  • Topic(主题)
    A topic is a category or feed name to which messages are published.

  • Producers(发布者)
    Producers publish data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic.

  • Consumers(订阅者)
    Consumers label themselves with a _consumer group_ name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

示意图

Kafka Connect

Kafka Connect 是什么

 Kafka Connect 是一款可扩展且稳定的、可在 Apache Kafka 和其他系统之间进行数据传输的框架。能够快速定义,将大量数据导入导出 Kafka 的连接器。Source Connector 可以接受整个数据库,将表转化为 Stream 更新到 Kafka Topic 中。也支持将应用服务器的指标收集到 Kafka Topic,使得数据可用于低延迟场景的流处理。Sink Connector 则可以将数据从 Kafka Topic 传送到搜索引擎(如 ElasticSearch)或离线分析系统(如 Hadoop

特性

  • Kafka Connector 通用框架,提供统一的 API 集成接口
  • 同时支持分布式单机模式
  • 提供 RESTful 接口,用来查看和管理 Kafka Connectors
  • 自动化的 offset 管理
  • 分布式、可扩展,基于现有的 Group 管理协议,可以通过增加 Task / Worker 实现动态扩展
  • 更方便集成其他 流 / 批 处理系统
  • 丰富的 Metrics 监控指标
  • 支持 C / C++ / Go / Java / JMS / .Net / Python 等多种语言的客户端

基本概念

Connector

 Connector 是为了方便协调数据流任务,而提出的高层抽象

Task

 具体实现数据从 Kafka 导入导出的功能

 同时具备 Task 自动容灾的能力 (不过 Worker 本身不支持 自动重启和扩容进程资源的功能)

Worker

 执行 Connector 和 Task 的进程

Converter

 用于转化 Kafka Connect 和 其他系统之间交互的数据格式

Transform

 对 Connector 接收和发送的数据进行一些简单的处理逻辑

架构

Connector model

 Connector 是一个指定的 Connector 实现类,配置了需要拷贝哪些数据,以及如何处理数据的相关属性。Connector 实例由一组 Tasks 组成。Kafka Connect 实际管理 Tasks,Connector 只负责生成 Tasks,并在框架指定更新的时候,完成配置更改。Source 端 / Sink 端 的 Connectors/Tasks API 接口是独立定义的,主要为了能够保证 API 定义的简洁性

Worker model

 Kafka Connect 群集由一组 Worker 进程组成,这些进程是执行 Connector 和 Task 的容器。Worker 自动和其他分布式的 Worker 协调工作,提供可扩展性和容错性。同时 Worker 进程的资源管理可以托管于 Yarn / Mesos,配置管理可以与 Chef / Puppet 框架整合,生命周期管理也可以使用 Oozie / Falcon

Data model

 Connectors 复制数据流从一个 partitioned 输入流到一个 partitioned 输出流,其中输入输出端至少有一个总是 Kafka。每一个 Stream 都是一个有序的数据流,流里面的消息都有一个对应的偏移量。这些偏移量的格式和语义由 Connector 定义,以支持与各种系统的集成;然而,为了在故障的情况下实现某些传递语义,需要确保 Stream 内的偏移是唯一的,并且流可以 seek 任意的偏移。同时,Kafka Connect 支持插件式的转换器,以便支持各种序列化格式,来存储这些数据。另外,Schema 是内置的,使得关于数据格式的重要元数据,得以在复杂的数据管道中传播。但是,当 Schema 不可用时,也可以使用无模式的数据

支持的组件

Kafka 2 HDFS

特性
Exactly Once Delivery

 Connector 使用 WAL 文件保证每条导入到 HDFS 的数据都是有且仅有一条
 通过将 Kafka offset 信息编码到 WAL 文件中,就可以在 task 失败或者重启的时候,获取到最后一条提交的 offset 了

Extensible Data Format

 原生支持 Avro / Parquet格式,其他数据格式,可以通过扩展 Format 类获得支持

Hive Integration

 支持 Hive 的整合
 激活该功能后,Connector 会自动给每一个导入 HDFS 的 topic 创建 Hive 的外部分区表

Schema Evolution

 支持模式演进和不同的 Schema 兼容级别
 整合 Hive 时,支持给 schema.compatibility 配置 BACKWARD / FORWARD / FULL 三种模式
 Hive 表可以查询同一个 Topic 下,以不同 Schema 写入的所有表数据

Secure HDFS and Hive Metastore Support

 支持 Kerberos 权限控制,所以可以和带权限控制的 HDFS 或 Hive metastore 进行整合

Pluggable Partitioner

 支持默认的 Partitioner、基于 field 的 Partitioner、基于时间的 Partitioner(包括 daily / hourly 等粒度)
 可以实现 Partitioner 类扩展自己的 Partitioner
 也可以实现 TimeBasedPartitioner 类扩展基于时间的 Partitioner

实战
安装 HDFS

 此处略,详见我的另一篇博客《Apache Eagle

配置 Confluent
1
2
3
4
5
6
7
$ vim etc/kafka-connect-hdfs/quickstart-hdfs.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=3
启用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$ confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
$ cd ~/software/confluent
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入一下三行数据
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
$ confluent load hdfs-sink -d etc/kafka-connect-hdfs/quickstart-hdfs.properties
{
"name":"hdfs-sink",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":"1",
"topics":"test_hdfs",
"hdfs.url":"hdfs://localhost:9000",
"flush.size":"3",
"name":"hdfs-sink"
},
"tasks":[
{
"connector":"hdfs-sink",
"task":0
}
]
}
验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ hadoop fs -ls /topics/test_hdfs/partition=0
Found 1 items
-rw-r--r-- 3 connect supergroup 197 2017-11-12 16:58 /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
$ wget http://mirror.metrocast.net/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar
# 直接在线上执行 avro 2 json 操作
$ hadoop jar avro-tools-1.8.2.jar tojson hdfs://localhost:9000/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
# 或者,拷贝 avro 文件到本地,再执行 avro 2 json 操作
$ hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro /tmp/test_hdfs+0+0000000000+0000000002.avro
$ java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

Kafka 2 ElasticSearch

特性
Exactly Once Delivery

 Connect 使用 ElasticSearch 的幂等写入语句和设置 ES Document IDs,来确保写入 ES 的数据都是有且仅有一条
 如果 Kafka 消息里面包含了 Key 值,那么这些 Key 值会自动转化为 ES Document IDs;相反,如果 Kafka 消息里面没有包含这些 Key 值,或者是明确指定不要使用 Kafka 消息里面的 Key 值,Kafka Connect 将会自动使用 topic+partition+offset 作为 Key 值,以确保每一条写入 ES 的数据,都有一个唯一对应的 Document

Mapping Inference

 启用该功能时,Connector 可以自动依据 Schema Register 来推断 ES mapping 结构
 但是,该功能的推断受限于 field 类型和默认值。如果需要增加更多的限制(例如,用户自定义的解析器),则需要自己手动创建 mapping

Schema Evolution

 支持模式演进和不同的 Schema 兼容级别
 可以处理一些模式不兼容的更改,比如

  • 增加字段

    当增加一个或者多个 field 到 Kafka 消息中,并且 ES 开启了 dynamic mapping 功能,那么 ES 将会自动增加新的 field 到 mapping 中

  • 移除字段

    当从 Kafka 消息中移除一个或者多个 field 时,则这些缺失的值,将会被作为 null 值进行处理

  • 更改字段类型

    例如将一个 field 从 string 改成 integer 类型,ES 将会自动将 string 转化为 integer

Delivery Semantics

 Connector 支持 batchpipeline 两种写入 ES 的模式,以此来增加吞吐量。batch 模式下,允许并行地处理多个 batch
 通过使用分区级 Kafka 偏移量作为 ES 文档版本,并使用 version_mode = external 配置,来确保文档级(Document-level)更新顺序

Reindexing with Zero Downtime

 利用 ES 的 Index Aliases 接口,可以完成零停机 reindexing 操作(ES 本身提供了 Reindex 接口,不过性能不高),具体对 ES 集群操作的步骤如下

  1. Create an alias for the index with the old mapping.
  2. The applications that uses the index are pointed to the alias.
  3. Create a new index with the updated mapping.
  4. Move data from old to the new index.
  5. Atomically move the alias to the new index.
  6. Delete the old index.

 为了保证用户无感知,需要在 reindexing 期间,仍然可以处理写数据的请求。但是别名是无法同时往新旧 index 写入数据的。为了解决这个问题,可以使用两个 ElasticSearch Connector 同时将相同的数据,写入到新旧 index 中,具体对 ES Connector 的操作步骤如下

  1. The connector job that ingest data to the old indices continue writing to the old indices.
  2. Create a new connector job that writes to new indices. This will copy both some old data and new data to the new indices as long as the data is in Kafka.
  3. Once the data in the old indices are moved to the new indices by the reindexing process, we can stop the old connector job.
实战
安装 ElasticSearch

 此处略写,详见我的另一篇博客《ElasticSearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 下载
$ cd ~/install
$ curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.4.tar.gz
$ tar zxvf elasticsearch-5.6.4.tar.gz -C ~/software/
$ cd ~/software
$ ln -s elasticsearch-5.6.4/ elasticsearch
# 环境变量
$ vim ~/.bashrc
export JAVA_HOME=~/software/java
export ELASTIC_SEARCH_HOME=~/software/elasticsearch
export PATH=$JAVA_HOME/bin:$ELASTIC_SEARCH_HOME/bin:$PATH
$ source ~/.bashrc
$ elasticsearch -version
Version: 5.6.4, Build: 8bbedf5/2017-10-31T18:55:38.105Z, JVM: 1.8.0_131
# 后台启动
$ elasticsearch -Ecluster.name=yuzhouwan -Enode.name=yuzhouwan_kafka_connect_test -d
# 检查
$ jps -ml
25332 org.elasticsearch.bootstrap.Elasticsearch -Ecluster.name=yuzhouwan -Enode.name=yuzhouwan_kafka_connect_test -d
$ netstat -nap | grep 9200
tcp 0 0 ::ffff:127.0.0.1:9200 :::* LISTEN 25332/java
tcp 0 0 ::1:9200 :::* LISTEN 25332/java
$ curl -XGET 'http://localhost:9200/_cluster/health' | jq
{
"cluster_name": "yuzhouwan",
"status": "yellow",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 5,
"active_shards": 5,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 5,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50
}
配置 Confluent
1
2
3
4
5
6
7
8
$ vim etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
启用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
$ cd ~/software/confluent
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ ./bin/kafka-server-start ./etc/kafka/server.properties
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
$ ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
$ cd ~/software/confluent
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入一下三行数据
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
$ confluent load elasticsearch-sink | jq
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test-elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink"
},
"tasks": [
{
"connector": "elasticsearch-sink",
"task": 0
}
]
}
验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
$ curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty'
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 1.0,
"hits" : [
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+0",
"_score" : 1.0,
"_source" : {
"f1" : "value1"
}
},
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+2",
"_score" : 1.0,
"_source" : {
"f1" : "value3"
}
},
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+1",
"_score" : 1.0,
"_source" : {
"f1" : "value2"
}
}
]
}
}
踩过的坑
max file descriptors [32768] for elasticsearch process is too low, increase to at least [65536]
  • 解决
1
2
3
4
5
$ su root
$ vim /etc/security/limits.d/90-nproc.conf
* - nproc 20480
* - nofile 65536
* - memlock unlimited

Kafka 2 Druid

 官方决定使用 Kafka Indexing Service 方案,方便构建自己的生态圈。不过,目前的 Kafka Indexing Service 仍然存在一定的缺陷。因此,Kafka Connect 2 Druid 功能需要自己基于 Druid Tranquility 组件进行定制开发。同时,可以在关闭 Kafka Auto Commit 功能后,自己控制事务提交,来保证不丢不重,进而弥补了 Tranquility 组件没有 Exactly-once 特性的缺陷

Kafka 2 Kafka

 Connect Replicator 可以轻松可靠地将 Topic 从一个 Kafka 集群复制到另一个。除了复制消息之外,此连接器还会根据需要创建主题,以保留源群集中的 Topic 配置。这包括保留分区数量,副本数以及为单个 Topic 指定对任何配置的覆盖

 下图显示了一个典型的多数据中心Data Center)部署,其中来自位于不同数据中心的两个 Kafka 群集的数据聚合在位于另一个数据中心的单独群集中。这里,复制数据的来源称为源簇,而复制数据的目标称为目的地

特性
  • Topic 的选择,可以使用白名单、黑名单和正则表达式
  • 支持使用匹配的 Partition 数量、 Replicaton 因子和 Topic 配置覆盖,在目标集群动态创建 Topic
  • 当新的 partition 被加入到源集群之后,目标集群会自动扩容对应的 Topic
  • 其他源集群的配置更改,都会被自动同步到目标集群

RESTful 接口

说明

 Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083.

 The following are the currently supported endpoints:

  • GET /connectors - return a list of active connectors
  • POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
  • GET /connectors/{name} - get information about a specific connector
  • GET /connectors/{name}/config - get the configuration parameters for a specific connector
  • PUT /connectors/{name}/config - update the configuration parameters for a specific connector
  • GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
  • GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
  • PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
  • PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
  • POST /connectors/{name}/restart - restart a connector (typically because it has failed)
  • POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
  • DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration

 Kafka Connect also provides a REST API for getting information about connector plugins:

  • GET /connector-plugins- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
  • PUT /connector-plugins/{connector-type}/config/validate - validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

实例

Worker 版本信息
1
2
3
4
5
$ curl localhost:8083/ | jq
{
"version": "0.10.0.1-cp1",
"commit": "ea5fcd28195f168b"
}
Worker 支持的 Connector 插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
$ curl localhost:8083/connector-plugins | jq
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "3.3.1"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "0.11.0.0-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "0.11.0.0-cp1"
}
]
Worker 上激活的 Connector 插件
1
2
3
4
5
$ curl localhost:8083/connectors | jq
[
"file-source",
"file-sink"
]
重启 Connector
1
2
3
4
5
6
7
$ curl -X POST localhost:8083/connectors/file-sink/restart
# 如果成功了,不会返回任何信息
# 如果失败了,会打印如下类似信息
{
"error_code": 404,
"message": "Unknown connector: local-file-sink"
}
获得某个 Connector上的所有 Tasks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ curl localhost:8083/connectors/file-sink/tasks | jq
[
{
"id": {
"connector": "file-sink",
"task": 0
},
"config": {
"topics": "connect-test",
"file": "test.sink.txt",
"task.class": "org.apache.kafka.connect.file.FileStreamSinkTask"
}
}
]
重启 Task
1
2
3
4
5
6
7
$ curl -X POST localhost:8083/connectors/file-sink/tasks/0/restart | jq
# 如果成功了,不会返回任何信息
# 如果失败了,会打印如下类似信息
{
"error_code": 404,
"message": "Unknown task: file-sink-1"
}
暂停 Connector
1
$ curl -X PUT localhost:8083/connectors/file-sink/pause | jq
恢复 Connector
1
$ curl -X PUT localhost:8083/connectors/file-sink/resume | jq
更新 Connector 配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"FileStreamSinkConnector","file":"test.sink.txt","tasks.max":"2","topics":"connect-test","name":"local-file-sink"}' localhost:8083/connectors/local-file-sink/config
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSinkConnector",
"file": "test.sink.txt",
"tasks.max": "2",
"topics": "connect-test",
"name": "local-file-sink"
},
"tasks": [
{
"connector": "local-file-sink",
"task": 0
},
{
"connector": "local-file-sink",
"task": 1
}
]
}
获取 Connector 状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ curl localhost:8083/connectors/file-sink/status | jq
{
"name": "file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.101:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "192.168.1.101:8083"
}
]
}
获取 Connector 配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ curl localhost:8083/connectors/file-sink | jq
{
"name": "file-sink",
"config": {
"topics": "connect-test",
"file": "test.sink.txt",
"name": "file-sink",
"tasks.max": "1",
"connector.class": "FileStreamSink"
},
"tasks": [
{
"connector": "file-sink",
"task": 0
}
]
}
删除 Connector
1
$ curl -X DELETE localhost:8083/connectors/file-sink

Quick Start

基础环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 增加用户,并赋予其密码
$ adduser connect
$ passwd connect # ur password for connect user
# 赋予用户可以 sudo 的权限
$ chmod u+w /etc/sudoers
$ vim /etc/sudoers
# 找到 `root ALL=(ALL) ALL` 这行,并在下面添加 connect 用户
connect ALL=(ALL) ALL
$ chmod u-w /etc/sudoers
# 切换到 connect 用户
$ su - connect
$ cd /home/connect
# 存放软件目录 & 安装目录 & 日志目录
$ mkdir install && mkdir software && mkdir logs

Confluent

下载

 在 https://www.confluent.io/download/ 页面中下载 confluent-oss-3.3.1-2.11.tar.gz 安装包

安装
1
2
3
4
5
6
7
8
9
$ cd ~/install/
$ tar zxvf confluent-oss-3.3.1-2.11.tar.gz -C ~/software/
$ cd ~/software/
$ ln -s confluent-3.3.1/ confluent
$ vim ~/.bashrc
export CONFLUENT_HOME=/home/connect/software/confluent
export PATH=$CONFLUENT_HOME/bin:$PATH
$ source ~/.bashrc
启动
1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动 Zookeeper, Kafka, Schema Registry
$ confluent start schema-registry
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
$ jps -ml
32680 org.apache.zookeeper.server.quorum.QuorumPeerMain /tmp/confluent.A8TzcjSE/zookeeper/zookeeper.properties
348 io.confluent.support.metrics.SupportedKafka /tmp/confluent.A8TzcjSE/kafka/kafka.properties
483 io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain /tmp/confluent.A8TzcjSE/schema-registry/schema-registry.properties
发送 avro 数据
1
2
3
4
5
6
7
$ cd ~/software/confluent/
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 输入以下三行JSON串
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
# Ctrl+C 停止进程
接受 avro 数据
1
2
3
4
5
6
$ ./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning
# 接收到producer发送的三行JSON串
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
# Ctrl+C 停止进程
发送数据格式不对的 avro 数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"int"}'
# 输入以下一行 JSON 串
{"f1":"value1"}
# 获得如下报错
org.apache.kafka.common.errors.SerializationException: Error deserializing json {"f1":"value1"} to Avro of schema "int"
Caused by: org.apache.avro.AvroTypeException: Expected int. Got START_OBJECT
at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:698)
at org.apache.avro.io.JsonDecoder.readInt(JsonDecoder.java:172)
at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:83)
at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:503)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at io.confluent.kafka.formatter.AvroMessageReader.jsonToAvro(AvroMessageReader.java:191)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:158)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:58)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
停止
1
2
3
4
5
6
7
$ confluent stop schema-registry
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]

Kafka Connect

启动 confluent
1
2
3
4
5
6
7
8
9
$ confluent start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
查看 connect 日志
1
2
3
4
5
6
$ confluent log connect
$ confluent current
/tmp/confluent.A8TzcjSE
$ cd /tmp/confluent.A8TzcjSE
$ less connect/connect.stderr
查看支持的 connect 类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ confluent list connectors
Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink
$ ll etc/
drwxr-xr-x 2 connect connect 4096 Jul 28 08:07 camus
drwxr-xr-x 2 connect connect 4096 Jul 28 07:41 confluent-common
drwxr-xr-x 2 connect connect 4096 Jul 28 07:28 kafka
drwxr-xr-x 2 connect connect 4096 Jul 28 07:50 kafka-connect-elasticsearch
drwxr-xr-x 2 connect connect 4096 Jul 28 07:58 kafka-connect-hdfs
drwxr-xr-x 2 connect connect 4096 Jul 28 08:06 kafka-connect-jdbc
drwxr-xr-x 2 connect connect 4096 Jul 28 08:04 kafka-connect-s3
drwxr-xr-x 2 connect connect 4096 Jul 28 07:52 kafka-connect-storage-common
drwxr-xr-x 2 connect connect 4096 Jul 28 07:48 kafka-rest
drwxr-xr-x 2 connect connect 4096 Jul 28 07:42 rest-utils
drwxr-xr-x 2 connect connect 4096 Jul 28 07:45 schema-registry
使用 file-source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 配置 file-source
$ cat ./etc/kafka/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
# 如果不需要 Schema Registry 功能,需要同时制定 key.converter 和 value.converter 参数为org.apache.kafka.connect.json.JsonConverter
# 往 test.txt 里面写入测试数据
$ for i in {1..3}; do echo "log line $i"; done > test.txt
# 装载 file-source 连接器,并确认 file-source 的配置
$ confluent load file-source
{"name":"file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","file":"test.txt","topic":"connect-test","name":"file-source"},"tasks":[]}
# 确认 connecter 是否已经装载
$ confluent status connectors
["file-source"]
# 检查 file-source 连接器的运行状态
$ confluent status file-source
{"name":"file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.101:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.1.101:8083"}]}
# 检查 Kafka 是否接收到数据
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"log line 1"
"log line 2"
"log line 3"
使用 file-sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 配置 file-sink
$ cat ./etc/kafka/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
# 装载 file-sink 连接器,并确认 file-sink 的配置
$ confluent load file-sink
{"name":"file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","file":"test.sink.txt","topics":"connect-test","name":"file-sink"},"tasks":[]}
# 确认 connecter 是否已经装载
$ confluent status connectors
["file-source","file-sink"]
# 检查 file-sink 连接器的运行状态
$ confluent status file-sink
{"name":"file-sink","connector":{"state":"RUNNING","worker_id":"192.168.1.101:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.1.101:8083"}]}
# 检查 Kafka 数据是否已经写入到文件
$ tail -f test.sink.txt
log line 1
log line 2
log line 3
# 另起窗口,并执行写入 test.txt 数据的脚本,可以看到对应 test.sink.txt 里面已经被写入数据
$ for i in {4..1000}; do echo "log line $i"; done >> test.txt
清理工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 卸载 file-source / file-sink connector
$ confluent unload file-source
$ confluent unload file-sink
# 关闭 connect 进程
$ confluent stop connect
Stopping connect
connect is [DOWN]
# 停止所有 confluent 进程
$ confluent stop
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
# 停止所有 confluent 进程,并删除临时文件
$ confluent destroy
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /tmp/confluent.A8TzcjSE

单机版 Worker

启动

1
$ bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

配置

1
2
3
4
5
# 存储 connector 的 offset
offset.storage.file.filename
# RESTful 端口,接受 HTTP 请求
rest.port

分布式 Worker

启动 Confluent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ cd ~/software/confluent
# 启动 ZooKeeper
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
# 启动 Kafka
$ ./bin/kafka-server-start ./etc/kafka/server.properties
# 启动 Schema Registry
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
# 启动 Kafka REST
$ ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
# 以分布式模式启动 Connect
$ ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties

创建 Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 获取帮助文档
$ bin/kafka-topics --help
# 创建 connect-configs / connect-offsets / connect-status 三个 Topic
# connect-configs 存储 connector 和 task 的配置信息
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# connect-offsets 存储 connector 和 task 的 offset 信息
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# connect-status 存储 connector 和 task 的状态变更信息
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
# 查看已存在的 topic
$ bin/kafka-topics --list --zookeeper localhost:2181
__consumer_offsets
_schemas
connect-configs
connect-offsets
connect-statuses
# 检查 topic 状态
$ bin/kafka-topics --describe --zookeeper localhost:2181 --topic connect-configs
Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:
Topic:connect-configs Partition: 0 Leader: 0 Replicas: 0 Isr: 0

删除 Topic

1
2
3
4
5
$ vim etc/kafka/server.properties
# 如果需要删除 topic,需要先设置 delete.topic.enable为 true
delete.topic.enable=true
$ bin/kafka-topics --delete --zookeeper localhost:2181 --topic connect-configs

Connector 和 Task 状态

 所有 Connector 和隶属于这些 Connector 的 Tasks,都通过 status.storage.topic 发布状态更新。因为 Worker 消费 status.storage.topic 是异步的,所以从 API 获取最新状态的时候,会存在一些短暂的延迟

 Connector 或者某个 Task 的状态,可能为:

  • UNASSIGNED

    Connector 或者 Task 尚未被分配到某个 Worker 上

  • RUNNING

    运行中

  • PAUSED

    Connector 或者 Task 被管理员暂停

  • FAILED

    通常因为出现异常,导致 Connector 或者 Task 失败

 一般 Pause / Resume API 的操作,适用的场景是 消费端系统需要维护,通过停止掉 Kafka Connector,来避免数据一直被积压着。同时,停止操作不是临时的,即便重启了集群,仍需要再次操作 Resume 接口,暂停的 Connector 才会恢复到 Running 状态。另外,FAILED 状态的任务是不允许执行 Pause 操作的,需要重启恢复到 Running 状态才行

Worker 之间如何建立通讯

 只要保证各个 Worker 使用的是统一的 group.id(可以看做是 Cluster Name),还有 config.storage.topicoffset.storage.topicstatus.storage.topic 三个 Topic 也需要保持一致,Worker 就会自动发现同一个集群中的其他 Worker 进程

Relance 机制

1
2
3
4
5
6
7
[2017-12-04 11:32:18,607] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1187)
[2017-12-04 11:32:18,608] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
[2017-12-04 11:32:18,608] INFO (Re-)joining group connect-cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
[2017-12-04 11:32:18,612] INFO Successfully joined group connect-cluster with generation 2 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
[2017-12-04 11:32:18,612] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-efb9e92c-27a6-4062-9fcc-92480f8e9e03', leaderUrl='http://192.168.1.101:8083/', offset=-1, connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1166)
[2017-12-04 11:32:18,612] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:815)
[2017-12-04 11:32:18,613] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:825)

完全分布式

替换原生的 Zookeeper

 具体安装过程略,详见我的另一篇博客《Zookeeper 原理与优化

1
2
3
4
5
6
7
8
9
10
11
$ cd /home/connect/software/confluent
$ vim ./etc/kafka/server.properties
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
zookeeper.connection.timeout.ms=6000
$ vim ./etc/schema-registry/connect-avro-distributed.properties
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
$ zkServer.sh start

替换原生的 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 创建日志目录
$ mkdir -p /home/connect/kafka_log
$ cd /home/connect/software/kafka
# 配置 Kafka Server
$ vim config/server.properties
# 每个 Kafka Broker 节点,配置不同的 broker.id
broker.id=0
# 允许删除 topic
delete.topic.enable=true
# 配置成当前 Kafka Borker 节点的 IP
listeners=PLAINTEXT://192.168.1.101:9092
advertised.listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/home/connect/kafka_log
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
# 配置 Kafka Zookeeper
$ vim config/zookeeper.properties
dataDir=/home/connect/zkdata
clientPort=2181
maxClientCnxns=60
# 配置 Kafka Consumer
$ vim config/consumer.properties
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
# 启动 Kafka
$ nohup /home/connect/software/kafka/bin/kafka-server-start.sh /home/connect/software/kafka/config/server.properties > /home/connect/kafka_log/kafka.server.log 2>&1 &
# 查看 Kafka 日志
$ tail -f ~/kafka_log/kafka.server.log
# 更新 Confluent 配置
$ cd /home/connect/software/confluent
$ vim etc/kafka/connect-distributed.properties
bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
# 创建 connect-configs / connect-offsets / connect-status 三个 Topic
# connect-configs 存储 connector 和 task 的配置信息
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# connect-offsets 存储 connector 和 task 的 offset 信息
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-offsets --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# connect-status 存储 connector 和 task 的状态变更信息
$ bin/kafka-topics.sh --create --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-status --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# 查看已存在的 topic
$ bin/kafka-topics.sh --list --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
test-elasticsearch-sink
# 检查 topic 状态
$ bin/kafka-topics --describe --zookeeper 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 --topic connect-configs
Topic:connect-configs PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic:connect-configs Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,0,1

更新 Kafka Rest 配置

1
2
3
4
5
6
7
$ cd /home/connect/software/confluent
$ vim etc/kafka-rest/kafka-rest.properties
id=kafka-rest-test-server
schema.registry.url=http://192.168.1.103:8081
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
$ nohup ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties &

更新 Schema Register 配置

1
2
3
4
5
6
7
8
9
$ cd /home/connect/software/confluent
$ vim etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
kafkastore.topic=_schemas
debug=false
$ nohup ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
$ tail -f logs/schema-registry.log

更新 Worker 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ vim /home/connect/software/confluent/etc/kafka/connect-distributed.properties
bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 考虑到方便压测,可以关闭 schema 功能,将下面两个配置项置为 false
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.1.103:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.1.103:8081
$ nohup /home/connect/software/confluent/bin/connect-distributed /home/connect/software/confluent/etc/kafka/connect-distributed.properties > /home/connect/software/confluent/logs/connect-distributed.log &
# 分发配置,并在各节点启动 worker
$ scp /home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties 192.168.1.102:/home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties
$ scp /home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties 192.168.1.103:/home/connect/software/confluent/etc/schema-registry/connect-avro-distributed.properties

踩过的坑

WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!
解决
1
2
$ vim .ssh/known_hosts
# 删除本机 IP 下的秘钥
Timed out while checking for or creating topic(s) ‘connect-offsets’
描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[2017-12-01 17:39:13,503] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:206)
org.apache.kafka.connect.errors.ConnectException: Timed out while checking for or creating topic(s) 'connect-offsets'. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:243)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:146)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:99)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:194)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
参考

Docker 镜像

更新镜像源

1
2
3
4
# 添加国内 yum 源
$ sudo yum-config-manager --add-repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo
# 更新 yum 软件源缓存
$ sudo yum makecache fast

安装 Docker-CE

1
2
3
4
5
6
7
8
# 安装 依赖包
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
# 安装 docker-ce
$ sudo yum install docker-ce
# 或者直接一键安装
$ curl -fsSL get.docker.com -o get-docker.sh
$ sudo sh get-docker.sh --mirror Aliyun

启动

1
2
$ sudo systemctl enable docker
$ sudo systemctl start docker

建立 Docker 用户组

1
2
3
4
# 建立 docker 组
$ sudo groupadd docker
# 将当前用户加入 docker 组
$ sudo usermod -aG docker $USER

踩过的坑

bridge-nf-call-iptables is disabled
描述
1
2
WARNING: bridge-nf-call-iptables is disabled
WARNING: bridge-nf-call-ip6tables is disabled
解决
1
2
3
4
5
6
7
8
# 添加内核配置参数
$ sudo tee -a /etc/sysctl.conf <<-EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF
# 重新加载 sysctl.conf
$ sudo sysctl -p

MySQL - Kafka Connect - HDFS 实战

下载

 MySQL - Kafka Connect - HDFS 集成环境,官方已经提供了 kafka_connect_blog.ova 镜像文件

安装

VirtualBox

 Vagrant 来管理和安装 VirtualBox 虚拟机,相关安装步骤,参见我的另一篇博客《Python

导入虚拟机镜像

 选择合适的资源,导入 kafka_connect_blog.ova 文件即可
 默认用户名密码均为 vagrant

操作虚拟机镜像

1
2
3
4
# 更新下载源索引
$ sudo apt-get update
$ ./setup.sh
$ ./start.sh

常用配置

Worker 通用配置

bootstrap.servers
1
2
3
# 建立到 Kafka 的初始连接
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
[key | value].converter
1
2
3
4
5
6
7
8
# 指定 Kafka 中的数据如何转为到 Connect
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.[key | value].converter
1
2
3
4
5
6
7
8
# 指定 Connect 内部的数据转化
# The offsets, status, and configurations are written to the topics using converters specified through the following required properties.
# Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.host.name & rest.port
1
2
3
4
5
# 配置 RESTful 服务的 IP 和 Port
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=0.0.0.0
rest.port=8083
plugin.path
1
2
3
4
5
6
7
8
9
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/connect/plugins

分布式 Worker 配置

group.id
1
2
# The group ID is a unique identifier for the set of workers that form a single Kafka Connect cluster
group.id=connect-cluster
[config | offset | status].storage.topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
[config | offset | status].storage.replication.factor
1
2
3
4
5
6
7
8
# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
[offset | status].storage.partitions
1
2
3
4
5
6
7
# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
offset.storage.partitions=25
status.storage.partitions=5

Connector 配置

说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Unique name for the connector. Attempting to register again with the same name will fail.
name
# The Java class for the connector
connector.class
# The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
tasks.max
# (optional) Override the default key converter class set by the worker.
key.converter
# (optional) Override the default value converter class set by the worker.
value.converter
# Sink connectors also have one additional option to control their input
# A list of topics to use as input for this connector
topics
实例
单机版 Connector 配置
1
2
3
4
5
name=local-file-sink
connector.class=FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=connect-test
分布式 Connector 配置
1
2
3
4
$ curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
# 或者直接指定一个包含了 JSON 格式的配置文件
$ curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors

使用社区 Connector

1
2
3
4
5
6
7
8
9
10
11
# 下载源码
$ git clone git@github.com:confluentinc/kafka-connect-hdfs.git
# 切换至稳定版本,并打包编译
$ cd kafka-connect-hdfs; git checkout v3.0.1; mvn package
# 在 plugins 目录下创建对应的 kafka-connect-hdfs 子目录
$ mkdir -p /usr/local/share/kafka/plugins/kafka-connect-hdfs
# 拷贝编译出来的 jar 包到 plugins 目录
$ cp target/kafka-connect-hdfs-3.0.1-package/share/java/kafka-connect-hdfs/* /usr/local/share/kafka/plugins/kafka-connect-hdfs/

实现自己的 Connector

源码分析

阅读环境搭建

 Kafka Connect 项目的 接口定义组件模块化 做得相当到位,整个工程被拆成很多子项目。同时,搭建源码阅读环境的时候,会发现很多依赖在中央仓库中是找不到的,这时候需要下载源码进行本地编译安装。首先,下载 kafka-connect-common / kafka-connect-storage-common 两个父工程,以及相关的依赖子工程 rest-utils / schema-registry 的源码。通过 git fetch --tags 命令将 tags 全部下载下来之后,选定一个稳定版本 (这里以 v3.3.1 为例),并创建新分支 v3.3.1,再在新分支下执行 git reset --hard <commit> 统一所有项目代码到 v3.3.1 版本 (这里还有一个依赖的子项目 aggdesigner 也需要本地编译安装,对应版本为 pentaho-aggdesigner-5.1.5-jhyde)。另外,Kafka 也用了一个非常规的版本 0.11.0.1-cp1,这里只需要选择一个略高的版本 0.11.0.1 保证兼容性即可(当然也可以在 Kafka trunk 分支下执行 gradlew installAll 命令编译出对应版本),最后,依次执行 mvn clean install 命令完成打包编译

模块架构图

kafka-connect-common

 该项目是整个 Kafka Connect 工程的根目录级父工程,其中主要包含了,common 接口定义、配置读取、Metrics / JMX 指标设计、公共工具类,以及如何 build 编译、package 打包哪些文件、license 著作权标识 等工程类的部分

kafka-connect-storage-common

 官方对该项目定位描述为简单的一句话:”Kafka Connect common packages for connectors transferring data between Kafka and distributed filesystems or cloud storage”。实际包含了很多实质性内容,Common 模块、Core 模块、Format 模块、Partitioner 模块、WAL 模块、Hive 模块 等

Common 模块

 主要定义了 ComposableConfig / SchemaGenerator 接口,分别用于 配置组合 和 Schema 生成

Core 模块

 主要定义了 Storage 接口,用于分布式存储。基于 Storage 接口的不同实现,一个 Storage 对象可以是分布式文件系统里面的一个文件或者一个文件夹,也可以是对象存储系统中的一个存储对象。类似地,路径就对应于分布式文件系统中的实际路径和对象存储库中的查找键

Format 模块

 主要定义了 Format / SchemaFileReader / RecordWriter 三个接口,分别用于 存储类型格式化(e.g. Path in HDFS, String in S3)、从 Storage 中读取 Schema 和 如何将记录写入 Storage

Partitioner 模块

 该模块主要定义了 Partitioner 接口,用来对接收到的消息进行分区,生成对应的目录和文件名。支持 Default / Hourly / Daily / TimeBase / Field 和自定义的 Partitioner,另外 TimeBase 的 Partitioner 还同时支持 ingestion time / event time 两种时间类型

WAL 模块

 定义了 WAL 接口,用于规范 Kafka offset 信息如何编码到 WAL

Hive 模块

 主要定义了 HiveUtil 抽象类、HiveFactory 接口(HiveUtil 的工厂类) 和 Schema / Config / Exception / MetaStore 等相关实现类

kafka-connect-hdfs

 首先可以看出 State 枚举类中给出了 RECOVERY_STARTED, RECOVERY_PARTITION_PAUSED, WAL_APPLIED, WAL_TRUNCATED, OFFSET_RESET, WRITE_STARTED, WRITE_PARTITION_PAUSED, SHOULD_ROTATE, TEMP_FILE_CLOSED, WAL_APPENDED, FILE_COMMITTED 一共 11 种任务运行状态,写入的逻辑全都在 TopicPartitionWriterwrite 方法里面了,而异常恢复的逻辑,则在 recover 方法中实现

修复已知 Bug

 使用 jira 语法查询出 Kafka Connect 组件的已知 bug,具体语法如下

1
project = KAFKA AND issuetype = Bug AND component = KafkaConnect

实用技巧

zkCli

CommandComment
get /consumers/<topic>/owners查看 topic 实时消费的 group id
get /consumers/<topic>/offsets/<group id>/<partitionor>查看 offset 情况 (ctime: 创建时间; mtime: 修改时间)

kafka-run-class

删除 Topic

1
$ bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper <zk host>:2181,<zk host>:2181,<zk host>:2181 --topic <topic>

性能优化

架构层面

RingBuffer + Lock Free

 使用RingBuffer + Lock Free 实现高效的 Producer-Consumer 设计模式,极大地提升 Kafka Producer 发送 Message 的性能

Avro 压缩

 使用 Avro 压缩,保证了高效地解压缩数据的同时,可以减少网络传输的数据量

多 Partitioner + 连接池

 使用连接池可以动态地增减所需的 Kafka 连接,并利用多 Partitioner 提高并发度

Tips: Full code is here.

参数层面

Producer

1
2
3
4
5
6
7
8
9
kafka.key.serializer.class=kafka.serializer.StringEncoder
kafka.serializer.class=kafka.serializer.DefaultEncoder
kafka.request.required.acks=0
kafka.async=async
kafka.queue.buffering.max.ms=5000
kafka.queue.buffering.max.messages=10000
kafka.queue.enqueue.timeout.ms=-1
kafka.batch.num.messages=200
kafka.send.buffer.bytes=102400

源码阅读

阅读环境搭建

安装 gradle

 在 gradle 下载页面,下载 gradle-4.3.1-all.zip 文件,解压至 D:\apps\gradle,并添加环境变量 PATH=D:\apps\gradle\gradle-4.3.1\bin

1
2
3
4
5
6
7
8
9
10
11
12
13
# 检查是否安装成功
$ gradle -v
------------------------------------------------------------
Gradle 4.3.1
------------------------------------------------------------
Build time: 2017-11-08 08:59:45 UTC
Revision: e4f4804807ef7c2829da51877861ff06e07e006d
Groovy: 2.4.12
Ant: Apache Ant(TM) version 1.9.6 compiled on June 29 2015
JVM: 1.8.0_111 (Oracle Corporation 25.111-b14)
OS: Windows 7 6.1 amd64

gradle 代理设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ gradle xxx -Dhttp.proxyHost=127.0.0.1 -Dhttp.proxyPort=1080
# 或者修改 gradle.properties 配置文件
$ vim gradle.properties
systemProp.http.proxyHost=192.168.1.101
systemProp.http.proxyPort=8080
systemProp.http.nonProxyHosts=*.nonproxyrepos.com|localhost
systemProp.https.proxyHost=192.168.1.101
systemProp.https.proxyPort=8080
systemProp.https.nonProxyHosts=*.nonproxyrepos.com|localhost
#systemProp.http.proxyUser=userid
#systemProp.http.proxyPassword=password
# 【注意】如果在 Kafka 源码目录下修改的 gradle.properties 无法生效,可以直接拷贝到 $USER_HOME/.gradle 目录下

编译源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 下载依赖
$ gradle
> Configure project :
Building project 'core' with Scala version 2.11.11
Download https://repo1.maven.org/maven2/org/scoverage/scalac-scoverage-plugin_2.11/1.3.1/scalac-scoverage-plugin_2.11-1.3.1.pom
<===----------> 26% CONFIGURING [4m 11s]
> Resolve files of :core:scoverage
> IDLE
> scala-library-2.11.11.jar > 1.58 MB/5.48 MB downloaded
BUILD SUCCESSFUL in 7m 9s
# 生成可以用 Intellij Idea 打开的工程
$ gradle idea

资料

Doc

Kafka Connect

Docker

Blog

Common

Kafka Connect

Introduction
Commity

Kafka Stream

Book

更多资源,欢迎加入,一起交流学习

QQ group: (人工智能 1020982 (高级) & 1217710 (进阶) | BigData 1670647)