Apache Storm 与 Kafka 的整合应用


 Apache Storm 和 Apache Kafka 相关知识,可以分别参考《Apache Storm 简介》和《Apache Kafka 分布式消息队列框架

搭建 Storm 和 Kafka 的基础环境

搭建 Storm / Kafka 集群

 具体安装步骤,详见我的另一篇博客《Apache Eagle

启动 Kafka

  • Start the zookeeper and kafka server
1
2
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
  • Create a topic
1
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-replicated-topic
  • List topics
1
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

发送 Message 往 Kafka

编写 SendMessageToKafka

  • 根据 kafka 中 cluster 的属性,定义好 Producer
  • 利用 Producer.send(KeyedMessage) 方法,将 topic - message 发送给 Kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SendMessageToKafka {

private static Producer<String, String> producer;

private static void init() {
Properties props = new Properties();
props.put("zk.connect", "192.168.1.201:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "192.168.1.201:9092");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}

public static void main(String... arg) {
init();
KeyedMessage<String, String> data = new KeyedMessage<String, String>("my-replicated-topic", "asdf2015");
producer.send(data);
producer.close();
}
}
  • Run the main method
1
com.yuzhouwan.hadoop.customer_behaviour_analyse.kafka.SendMessageToKafka
  • Check out the message that tht broker catched
1
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

 Then, u will see that the asdf2015 message was sent sucessfully.

从 Kafka 中获得 Message

编写 TestMessageScheme

  • deserialize(byte[]) 方法中将 message 显示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestMessageScheme implements Scheme {
// ...
@Override
public List<Object> deserialize(byte[] ser) {

String msg;
try {
msg = new String(ser, "UTF-8");
System.out.println("$$$$$$$$$$$$$$" + msg);
return new Values(msg);
} catch (UnsupportedEncodingException e) {
LOGGER.error("Can not parse the provide message from bytes.");
throw new RuntimeException(e);
}
}
// ...
}

编写 ShowKafkaMessageBolt

  • prepare(Map, TopologyContext, OutputCollector) 中得到 OutputCollector(emit 方法完成 message 的发射)、Context(提供 name/id 之类的属性)
  • execute(Tuple) 中完成 message 处理工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ShowKafkaMessageBolt implements IRichBolt {

private OutputCollector collector;
private String name;
private int id;

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
System.out.println("Bolt: " + name + " and Id: " + id + " prepared ##################");
}

@Override
public void execute(Tuple input) {

if (input != null) {
String message = input.getString(0);
collector.emit(new Values(message));
System.out.println(message);
}
collector.ack(input);
}
// ...
}

编写 BehaviourAnalyse

  • 基于 ZooKeeper 属性 定义 Broker
  • 整合 broker、topic、zkRoot、spoutId 和 TestMessageScheme 为 SpoutConfig,完成 KafkaSpout 的实例化
  • 利用 LocalCluster 完成 topology 的提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BehaviourAnalyse {

public static void main(String[] args) {

BrokerHosts brokerHosts = new ZkHosts("192.168.1.201:2181");
String topic = "my-replicated-topic";

/**
* We can get the param from the 'config/zookeeper.properties' path.<BR>
* # the directory where the snapshot is stored.<BR>
* dataDir=/tmp/zookeeper
*/
String zkRoot = "/tmp/zookeeper";
String spoutId = "myKafka";

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,
spoutId);
spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());

TopologyBuilder builder = new TopologyBuilder();
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafka-spout", kafkaSpout);

builder.setBolt("show-message-bolt", new ShowKafkaMessageBolt())
.shuffleGrouping("kafka-spout");

Config conf = new Config();
conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Show-Message-From-Kafka", conf, builder.createTopology());
}
}
  • run the main method
1
com.yuzhouwan.hadoop.customer_behaviour_analyse.BehaviourAnalyse
  • start a kafka´s producer
1
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
  • Input a sentence ending, like:
1
This is a message from kafka.

 You will see the information which shows in console sucessfully :-)

小技巧

  • Get the value of metadata.broker.list
1
$ vim config/producer.properties
  • 查看 Storm 与其他框架的兼容版本

 http://mvnrepository.com/artifact/org.apache.storm

欢迎加入我们的技术群,一起交流学习

群名称 群号
人工智能(高级)
人工智能(进阶)
BigData
算法