启动 Kafka
Start the zookeeper and kafka server
1 2 $ bin/zookeeper-server-start.sh config/zookeeper.properties $ bin/kafka-server-start.sh config/server.properties
1 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-replicated-topic
1 $ bin/kafka-topics.sh --list --zookeeper localhost:2181
发送 Message 往 Kafka 编写 SendMessageToKafka
根据 kafka 中 cluster 的属性,定义好 Producer
利用 Producer.send(KeyedMessage)
方法,将 topic - message
发送给 Kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class SendMessageToKafka { private static Producer<String, String> producer; private static void init () { Properties props = new Properties (); props.put("zk.connect" , "" ); props.put("serializer.class" , "kafka.serializer.StringEncoder" ); props.put("metadata.broker.list" , "" ); ProducerConfig config = new ProducerConfig (props); producer = new Producer <String, String>(config); } public static void main (String... arg) { init(); KeyedMessage<String, String> data = new KeyedMessage <String, String>("my-replicated-topic" , "asdf2015" ); producer.send(data); producer.close(); } }
1 com.yuzhouwan.hadoop.customer_behaviour_analyse.kafka.SendMessageToKafka
Check out the message that tht broker catched
1 $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
Then, u will see that the asdf2015
message was sent sucessfully.
从 Kafka 中获得 Message 编写 TestMessageScheme
在 deserialize(byte[])
方法中将 message 显示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class TestMessageScheme implements Scheme { @Override public List<Object> deserialize (byte [] ser) { String msg; try { msg = new String (ser, "UTF-8" ); System.out.println("$$$$$$$$$$$$$$" + msg); return new Values (msg); } catch (UnsupportedEncodingException e) { LOGGER.error("Can not parse the provide message from bytes." ); throw new RuntimeException (e); } } }
编写 ShowKafkaMessageBolt
在 prepare(Map, TopologyContext, OutputCollector)
中得到 OutputCollector
(emit 方法完成 message 的发射)、Context
(提供 name/id
在 execute(Tuple)
中完成 message 处理工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ShowKafkaMessageBolt implements IRichBolt { private OutputCollector collector; private String name; private int id; @SuppressWarnings("rawtypes") @Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) { this .collector = collector; this .name = context.getThisComponentId(); this .id = context.getThisTaskId(); System.out.println("Bolt: " + name + " and Id: " + id + " prepared ##################" ); } @Override public void execute (Tuple input) { if (input != null ) { String message = input.getString(0 ); collector.emit(new Values (message)); System.out.println(message); } collector.ack(input); } }
编写 BehaviourAnalyse
基于 ZooKeeper 属性 定义 Broker
整合 broker、topic、zkRoot、spoutId 和 TestMessageScheme 为 SpoutConfig,完成 KafkaSpout 的实例化
利用 LocalCluster 完成 topology 的提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class BehaviourAnalyse { public static void main (String[] args) { BrokerHosts brokerHosts = new ZkHosts ("" ); String topic = "my-replicated-topic" ; String zkRoot = "/tmp/zookeeper" ; String spoutId = "myKafka" ; SpoutConfig spoutConfig = new SpoutConfig (brokerHosts, topic, zkRoot, spoutId); spoutConfig.scheme = new SchemeAsMultiScheme (new TestMessageScheme ()); TopologyBuilder builder = new TopologyBuilder (); KafkaSpout kafkaSpout = new KafkaSpout (spoutConfig); builder.setSpout("kafka-spout" , kafkaSpout); builder.setBolt("show-message-bolt" , new ShowKafkaMessageBolt ()) .shuffleGrouping("kafka-spout" ); Config conf = new Config (); conf.setDebug(true ); conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1 ); LocalCluster cluster = new LocalCluster (); cluster.submitTopology("Show-Message-From-Kafka" , conf, builder.createTopology()); } }
1 com.yuzhouwan.hadoop.customer_behaviour_analyse.BehaviourAnalyse
1 $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
Input a sentence ending, like:
1 This is a message from kafka.
You will see the information which shows in console sucessfully :-)
Get the value of metadata.broker.list
1 $ vim config/producer.properties