Storm 与 Kafka 的整合之三:Combination

系列文章:


搭建 Storm 和 Kafka 的基础环境

搭建 Storm / Kafka集群

 具体安装步骤,详见我的另一篇博客《Apache Eagle

启动 Kafka

  • Start the zookeeper and kafka server

    1
    2
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    $ bin/kafka-server-start.sh config/server.properties
  • Create a topic

    1
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-replicated-topic
  • We can now see that topic if we run the list topic command

    1
    $ bin/kafka-topics.sh --list --zookeeper localhost:2181

发送 Message往 Kafka

编写 SendMessageToKafka

  • 根据 kafka 中 cluster 的属性,定义好 Producer
  • 利用 Producer.send(KeyedMessage) 方法,将 “topic - message” 发送给 Kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class SendMessageToKafka {
    private static Producer<String, String> producer;
    private static void init() {
    Properties props = new Properties();
    props.put("zk.connect", "192.168.1.201:2181");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "192.168.1.201:9092");
    ProducerConfig config = new ProducerConfig(props);
    producer = new Producer<String, String>(config);
    }
    public static void main(String... arg) {
    init();
    KeyedMessage<String, String> data = new KeyedMessage<String, String>("my-replicated-topic", "asdf2015");
    producer.send(data);
    producer.close();
    }
    }
  • Run the main method

    1
    com.yuzhouwan.hadoop.customer_behaviour_analyse.kafka.SendMessageToKafka
  • Check out the message that tht broker catched

    1
    $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

 Then, u will see that the ‘asdf2015’ message was sent sucessfully.

从 Kafka中获得 Message

编写 TestMessageScheme

  • 在 deserialize(byte[]) 方法中将 message 显示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class TestMessageScheme implements Scheme {
    // ...
    @Override
    public List<Object> deserialize(byte[] ser) {
    String msg;
    try {
    msg = new String(ser, "UTF-8");
    System.out.println("$$$$$$$$$$$$$$" + msg);
    return new Values(msg);
    } catch (UnsupportedEncodingException e) {
    LOGGER.error("Can not parse the provide message from bytes.");
    throw new RuntimeException(e);
    }
    }
    // ...
    }

编写 ShowKafkaMessageBolt

  • 在 prepare(Map, TopologyContext, OutputCollector) 中得到:OutputCollector(emit方法完成 message 的发射)、Context(提供 name/id 之类的属性)
  • 在 execute(Tuple) 中完成 message 处理工作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class ShowKafkaMessageBolt implements IRichBolt {
    private OutputCollector collector;
    private String name;
    private int id;
    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    this.name = context.getThisComponentId();
    this.id = context.getThisTaskId();
    System.out.println("Bolt: " + name + " and Id: " + id + " prepared ##################");
    }
    @Override
    public void execute(Tuple input) {
    if (input != null) {
    String message = input.getString(0);
    collector.emit(new Values(message));
    System.out.println(message);
    }
    collector.ack(input);
    }
    // ...
    }

编写 BehaviourAnalyse

  • 基于 zookeeper属性 定义 Broker
  • 整合 broker、topic、zkRoot、spoutId 和 TestMessageScheme 为 SpoutConfig,完成 KafkaSpout 的实例化
  • 利用 LocalCluster 完成 topology 的提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class BehaviourAnalyse {
    public static void main(String[] args) {
    BrokerHosts brokerHosts = new ZkHosts("192.168.1.201:2181");
    String topic = "my-replicated-topic";
    /**
    * We can get the param from the 'config/zookeeper.properties' path.<BR>
    * # the directory where the snapshot is stored.<BR>
    * dataDir=/tmp/zookeeper
    */
    String zkRoot = "/tmp/zookeeper";
    String spoutId = "myKafka";
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,
    spoutId);
    spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
    TopologyBuilder builder = new TopologyBuilder();
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("kafka-spout", kafkaSpout);
    builder.setBolt("show-message-bolt", new ShowKafkaMessageBolt())
    .shuffleGrouping("kafka-spout");
    Config conf = new Config();
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Show-Message-From-Kafka", conf, builder.createTopology());
    }
    }
  • run the main method

    1
    com.yuzhouwan.hadoop.customer_behaviour_analyse.BehaviourAnalyse
  • start a kafka’s producer

    1
    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
  • Input a sentence ending, like:

    1
    This is a message from kafka.

 You will see the information that be show in console sucessfully :-)

Storm和 Kafka双向整合已经完成

 完全的整合,及其运用,详见 here

小技巧

 其他资源: _Source_

更多资源,欢迎加入,一起交流学习

QQ group: (人工智能 1020982 (高级) & 1217710 (进阶) | BigData 1670647)