Apache Storm 与 Kafka 的整合应用


 Apache Storm 和 Apache Kafka 相关知识,可以分别参考《Apache Storm 简介》和《Apache Kafka 分布式消息队列框架

搭建 Storm 和 Kafka 的基础环境

搭建 Storm / Kafka 集群

 具体安装步骤,详见我的另一篇博客《Apache Eagle

启动 Kafka

  • Start the zookeeper and kafka server

    1
    2
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    $ bin/kafka-server-start.sh config/server.properties
  • Create a topic

    1
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-replicated-topic
  • List topics

    1
    $ bin/kafka-topics.sh --list --zookeeper localhost:2181

发送 Message 往 Kafka

编写 SendMessageToKafka

  • 根据 kafka 中 cluster 的属性,定义好 Producer
  • 利用 Producer.send(KeyedMessage) 方法,将 topic - message 发送给 Kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class SendMessageToKafka {
    private static Producer<String, String> producer;
    private static void init() {
    Properties props = new Properties();
    props.put("zk.connect", "192.168.1.201:2181");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "192.168.1.201:9092");
    ProducerConfig config = new ProducerConfig(props);
    producer = new Producer<String, String>(config);
    }
    public static void main(String... arg) {
    init();
    KeyedMessage<String, String> data = new KeyedMessage<String, String>("my-replicated-topic", "asdf2015");
    producer.send(data);
    producer.close();
    }
    }
  • Run the main method

    1
    com.yuzhouwan.hadoop.customer_behaviour_analyse.kafka.SendMessageToKafka
  • Check out the message that tht broker catched

    1
    $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

 Then, u will see that the asdf2015 message was sent sucessfully.

从 Kafka 中获得 Message

编写 TestMessageScheme

  • deserialize(byte[]) 方法中将 message 显示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class TestMessageScheme implements Scheme {
    // ...
    @Override
    public List<Object> deserialize(byte[] ser) {
    String msg;
    try {
    msg = new String(ser, "UTF-8");
    System.out.println("$$$$$$$$$$$$$$" + msg);
    return new Values(msg);
    } catch (UnsupportedEncodingException e) {
    LOGGER.error("Can not parse the provide message from bytes.");
    throw new RuntimeException(e);
    }
    }
    // ...
    }

编写 ShowKafkaMessageBolt

  • prepare(Map, TopologyContext, OutputCollector) 中得到 OutputCollector(emit 方法完成 message 的发射)、Context(提供 name/id 之类的属性)
  • execute(Tuple) 中完成 message 处理工作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class ShowKafkaMessageBolt implements IRichBolt {
    private OutputCollector collector;
    private String name;
    private int id;
    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    this.name = context.getThisComponentId();
    this.id = context.getThisTaskId();
    System.out.println("Bolt: " + name + " and Id: " + id + " prepared ##################");
    }
    @Override
    public void execute(Tuple input) {
    if (input != null) {
    String message = input.getString(0);
    collector.emit(new Values(message));
    System.out.println(message);
    }
    collector.ack(input);
    }
    // ...
    }

编写 BehaviourAnalyse

  • 基于 Zookeeper 属性 定义 Broker
  • 整合 broker、topic、zkRoot、spoutId 和 TestMessageScheme 为 SpoutConfig,完成 KafkaSpout 的实例化
  • 利用 LocalCluster 完成 topology 的提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class BehaviourAnalyse {
    public static void main(String[] args) {
    BrokerHosts brokerHosts = new ZkHosts("192.168.1.201:2181");
    String topic = "my-replicated-topic";
    /**
    * We can get the param from the 'config/zookeeper.properties' path.<BR>
    * # the directory where the snapshot is stored.<BR>
    * dataDir=/tmp/zookeeper
    */
    String zkRoot = "/tmp/zookeeper";
    String spoutId = "myKafka";
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,
    spoutId);
    spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
    TopologyBuilder builder = new TopologyBuilder();
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("kafka-spout", kafkaSpout);
    builder.setBolt("show-message-bolt", new ShowKafkaMessageBolt())
    .shuffleGrouping("kafka-spout");
    Config conf = new Config();
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Show-Message-From-Kafka", conf, builder.createTopology());
    }
    }
  • run the main method

    1
    com.yuzhouwan.hadoop.customer_behaviour_analyse.BehaviourAnalyse
  • start a kafka´s producer

    1
    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
  • Input a sentence ending, like:

    1
    This is a message from kafka.

 You will see the information which shows in console sucessfully :-)

Storm 和 Kafka 双向整合已经完成

 完全的整合,及其运用,详见 here

小技巧

 其他资源:Source

欢迎加入我们的技术群,一起交流学习

人工智能 1020982(高级)& 1217710(进阶)| BigData 1670647

  • 本文作者: Benedict Jin
  • 本文链接: https://yuzhouwan.com/posts/25015/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
显示 Gitment 评论