Kafka集群安裝步驟:
? ? 1、解壓
?? 2、修改server.properties
???????????? broker.id=1
???????????? zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181
? 3、將zookeeper集群啟動
? 4、在每一臺節(jié)點(diǎn)上啟動broker
??????????? bin/kafka-server-start.sh config/server.properties
?? 5、在kafka集群中創(chuàng)建一個topic
?????????? bin/kafka-topics.sh --create --zookeeper zk_hostname:2181 --replication-factor 3 --partitions 1 --topic topic_name
?? 6、用一個producer向某一個topic中寫入消息
????????? bin/kafka-console-producer.sh --broker-list broker_hostname:9092 --topic topic_name
??? 7、用一個comsumer從某一個topic中讀取信息
???????? bin/kafka-console-consumer.sh --zookeeper zk_hostname:2181 --from-beginning --topic topic-name
??? 8、查看一個topic的分區(qū)及副本狀態(tài)信息
???????? bin/kafka-topics.sh --describe --zookeeper zk_hostname:2181 --topic topic_name
Java API
???? Producer端:
?????????????? public class ProducerDemo{
?????????????????????? public static void main(String[] args){
?????????????????????????????? //創(chuàng)建Properties實(shí)例,設(shè)置屬性
????????????????????????????? Properties properties = new Properties();
????????????????????????????? //聲明zk ???????????????????????????
???????????????????????????? properties.put("zk.connect","zk_hostname:2181,....");
?????????????????????????????? //聲明broker ??????????????????????????
????????????????????????????? properties.put("metadata.broker.list","broker_hostname:9092,.....");
????????????????????????????? properties.put("serializer.class","kafka.serializer.StringEncoder");
?????????????????????????????? //創(chuàng)建ProducerConfig配置實(shí)例 ????????????????????????????
????????????????????????????? ProducerConfig config = new ProducerConfig(properties);
?????????????????????????????? //創(chuàng)建Producer實(shí)例 ???????????????????????????
????????????????????????????? Producer<String,String> producer = new Producer<String,String>(config);
????????????????????????????? //發(fā)送消息 ????????????????????????
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? producer.send(new KeyedMessage<String,String>(topic_name,content));
???????????? }
}
Consumer端:
???????? public class ConsumerDemo{
????????????????????? public static void main(String[] args){
???????????????????????????? //創(chuàng)建Properties實(shí)例,設(shè)置屬性
????????????????????????? Properties properties = new Properties();
????????????????????????? properties.put("zookeeper.connect","zk_hostname:2181,....");
????????????????????? ? ? ? ? //如果生產(chǎn)者和消費(fèi)者在同一個組中,則不能訪問同一組Topic內(nèi)的數(shù)據(jù)
????????????????????????? properties.put("group.id","id_name");
?????????????????????????? //聲明ConsumerConnector
??????????????????????? ConsumerConnector consumer =? Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
? ? ? ? ? ? ? ? ? ? ? ? MaptopicCountMap = new HashMap();
??????????????????????? //一次從一個主題中讀取數(shù)據(jù)
?????????????????????? topicCountMap.put(topic, 1);
??????????????????????? topicCountMap.put(“topic_name", 1);
??????????????????????? topicCountMap.put("topic_name1", 1);
? ? ? ? ? ? ? ? ? ? ? ? Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =
???????????????????????????????????????? consumer.createMessageStreams(topicCountMap);
? ? ? ? ? ? ? ? ? ? ? List<KafkaStream<byte[],byte[]>> streams = consumerMap.get("topic_name");
??????????????????????? for(final KafkaStreamkafkaStream : streams){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? new Thread(new Runnable() {
???????????????????????????????????????????????? @Override
???????????????????????????????????????? public void run() {
??????????????????????????????????????????????? for(MessageAndMetadatamm : kafkaStream){
????????????????????????????????????????????? ? ? ?? ?? String msg = new String(mm.message());
??????????????????????????????????????????? ? ? ? ? ? ?? System.out.println(msg);
?????????????????????????????????????????????????? }
??????????????????????????????????? }
????????? ? ? ? ?? ??? }).start();
??????? ? ? ?? }
???????? }
}