Kafka 原理介紹及安裝部署
標簽:kafka 安裝
簡介
Kafka 是 Linkedin 于 2010 年 12 月份開源的消息系統(tǒng),它主要用于處理活躍的流式數(shù)據(jù),包括網(wǎng)站的點擊量、用戶訪問或搜索的內(nèi)容等。
Kafka 是一個輕量級的/分布式的/具備 replication 能力的日志采集組件,通常被集成到應用系統(tǒng)中,收集“用戶行為日志”等,并可以使用各種消費終端(consumer)將消息轉(zhuǎn)存到 HDFS 等其他結(jié)構(gòu)化數(shù)據(jù)存儲系統(tǒng)中。
Kafka 的作用類似于緩存,即活躍的數(shù)據(jù)和離線處理系統(tǒng)之間的緩存。
特性
高吞吐率:即使在普通的節(jié)點上每秒鐘也能處理成百上千的 message。
顯式分布式:即所有的 producer、broker 和 consumer 都會有多個,均為分布式的。
易于擴展:可以由一個節(jié)點擴展至數(shù)千個節(jié)點,不需要停止集群。
使用場景
Messaging
對于一些常規(guī)的消息系統(tǒng),kafka 是個不錯的選擇。Kafka 的 partitons/replication 和容錯,使其具有良好的擴展性和性能優(yōu)勢。
但是,kafka 并沒有提供 JMS 中的“事務性”、“消息傳輸擔保(消息確認機制)”、“消息分組”等企業(yè)級特性。kafka 只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)。
Websit activity tracking
Kafka 可以作為“網(wǎng)站活性跟蹤”的最佳工具,可以將網(wǎng)頁/用戶操作等信息發(fā)送到 kafka 中,并進行實時監(jiān)控,或者離線統(tǒng)計分析等。
Log Aggregation
kafka 的特性決定它非常適合作為“日志收集中心”,應用程序可以將操作日志“批量”“異步”的發(fā)送到 kafka 集群中,而不是保存在本地或者數(shù)據(jù)庫中。
Kafka 可以批量提交消息/壓縮消息等,這對生產(chǎn)者而言,幾乎感覺不到性能的開支。此時消費者可以使用 Hadoop 等其他系統(tǒng)化的存儲和分析系統(tǒng)。
原理架構(gòu)
原理
Kafka 的設計初衷是希望做為一個統(tǒng)一的信息收集平臺,能夠?qū)崟r的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯能力。
Kafka 使用文件存儲消息(append only log),這就直接決定了 kafka 在性能上嚴重依賴文件系統(tǒng)的本身特性。為了減少磁盤寫入的次數(shù),broker 會將消息暫時緩存起來,當消息的個數(shù)(或尺寸)達到一定閥值時,再一起刷新到磁盤,這樣會減少磁盤 IO 調(diào)用的次數(shù)。
Producer 將會和 topic 下所有 partition leader 保持 socket 連接。消息由 producer 直接通過 socket 發(fā)送到 broker,中間不會經(jīng)過任何“路由層”。
事實上,消息被路由到哪個 partition 上,由 producer 客戶端決定,默認方式為“輪詢”。
Consumer 端向 broker 發(fā)送 “fetch” 請求,并告知其獲取消息的 offset,此后 consumer 將會獲得一定條數(shù)的消息。Consumer 端也可以重置 offset 來重新消費消息。
Kafka 將每個 partition 數(shù)據(jù)復制到多個 server 上,任何一個 partition 都有一個 leader 和多個 follower (可以沒有)。
備份的個數(shù)可以通過 broker 配置文件來設定,其中 leader 處理所有的讀寫請求,follower 需要和 leader 保持同步。
當 leader 失效時,需在 followers 中重新選取出新的 leader,可能此時 follower 落后于 leader,因此需要選擇一個 “up-to-date” 的 follower。選擇 follower 時需要兼顧一個問題,就是新的 leader server 上所已經(jīng)承載的 partition leader 的個數(shù),如果一個 server 上有過多的 partition leader,意味著此 server 將承受著更多的 IO 壓力,因此在選舉新 leader 時,需要考慮到“負載均衡”。
Kafka 中所有的 topic 內(nèi)容都是以日志的形式存儲在 broker 上。如果一個 topic 的名稱為 “my_topic”,它有 2 個 partitions,那么日志將會保存在 my_topic_0 和 my_topic_1 兩個目錄中。
日志文件中保存了一序列 “l(fā)og entries” (日志條目),每個 log entry 格式為“4個字節(jié)的數(shù)字 N 表示消息的長度” + “N 個字節(jié)的消息內(nèi)容”。每個日志都有一個 offset 來唯一的標記一條消息,offset 的值為8個字節(jié)的數(shù)字,表示此消息在此 partition 中所處的起始位置。
部署架構(gòu)
Kafka 集群、producer 和 consumer 都依賴于 zookeeper 來保證系統(tǒng)的可用性,保存一些元數(shù)據(jù)信息。
kafka 集群幾乎不需要維護任何 consumer 和 producer 狀態(tài)信息,這些信息由 zookeeper 保存,因此 producer 和 consumer 的客戶端實現(xiàn)非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響。
Producer 端使用 zookeeper 用來發(fā)現(xiàn) broker 列表,以及和 Topic 下每個 partition leader 建立 socket 連接并發(fā)送消息。
Broker 端使用 zookeeper 用來注冊 broker 信息,監(jiān)測 partition leader 存活性。
Consumer 端使用 zookeeper 用來注冊 consumer 信息,其中包括 consumer 消費的 partition 列表等,同時也用來發(fā)現(xiàn) broker 列表,并和 partition leader 建立 socket 連接,獲取消息。
安裝部署
安裝
Kafka 的安裝比較簡單,只需要保證 zookeeper 集群運行正常,并配置好 server.properties 文件即可。
修改配置文件中的以下幾項,并保證在各節(jié)點上保持一致:
broker.id=0 //該屬性的值要保證各個節(jié)點之間不能重復,該值可以為隨意的整數(shù)
port=9092
log.dirs=/opt/kafka-0.8.2/data
zookeeper.connect=localhost:2181 //此處需要修改成使用的 zookeeper 集群的信息,逗號分隔
啟動
保證 zookeeper 集群正常運行,然后在每個節(jié)點上執(zhí)行以下命令,啟動進程:
/opt/kafka-0.8.2/bin/kafka-server-start.sh /opt/kafka-0.8.2/config/server.properties &
驗證
可以使用 kafka 自帶的 producer 和 consumer 來驗證集群是否能正常工作。
使用 bin 目錄下的 kafka-console-consumer.sh 和 kafka-console-producer.sh 腳本可以啟動 consumer 和 producer 客戶端。
- 進入 kafka 的安裝目錄,執(zhí)行以下命令(假設 zookeeper 集群信息為:server1:2181,server2:2181,server3:2181),創(chuàng)建一個名為 “my_topic”的topic:
bin/kafka-topics.sh --create --zookeeper server1:2181,server2:2181,server3:2181 --replication-factor 1 --partitions 1 --topic my_topic
- 啟動一個 producer,將消息發(fā)送到 “my_topic”,執(zhí)行以下命令(假設 kafka 集群信息為:server1:9092,server2:9092,server3:9092):
bin/kafka-console-producer.sh --borker-list server1:9092,server2:9092,server3:9092 --topic my_topic
- 輸入以下消息:
This is a message.
This is another message.
- 在集群中的另一個節(jié)點上,進入 kafka 的安裝目錄,然后啟動一個 consumer,訂閱 “my_topic” 的消息,執(zhí)行以下命令:
bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic my_topic --from-beginning
- 然后可以看到終端上輸出以下內(nèi)容,證明集群可以正常使用:
This is a message.
This is another message.
API
Producer
0.8 以前版本的 Procuder API 有兩種:kafka.producer.SyncProducer 和 kafka.producer.async.AsyncProducer.它們都實現(xiàn)了同一個接口。
0.8 以后的新版本 Producer API 提供了以下功能:
可以將多個消息緩存到本地隊列里,然后異步的批量發(fā)送到 broker,可以通過參數(shù) producer.type=async 做到。
自己編寫 Encoder 來序列化消息,只需實現(xiàn)下面這個接口。默認的 Encoder 是 kafka.serializer.DefaultEncoder。
提供了基于 Zookeeper 的 broker 自動感知能力,可以通過參數(shù) zk.connect 實現(xiàn)。如果不使用 Zookeeper,也可以使用 broker.list 參數(shù)指定一個靜態(tài)的 brokers 列表,這樣消息將被隨機的發(fā)送到一個 broker 上,一旦選中的 broker 失敗了,消息發(fā)送也就失敗了。
通過分區(qū)函數(shù) kafka.producer.Partitioner 類對消息分區(qū),可以通過參數(shù) partitioner.class 定制分區(qū)函數(shù)。
Consumer
Consumer API 有兩個級別:低級別和高級別。
低級別的和一個指定的 broker 保持連接,并在接收完消息后關(guān)閉連接,這個級別是無狀態(tài)的,每次讀取消息都帶著 offset。
高級別的 API 隱藏了和 brokers 連接的細節(jié),在不必關(guān)心服務端架構(gòu)的情況下和服務端通信。還可以自己維護消費狀態(tài),并可以通過一些條件指定訂閱特定的 topic,比如白名單黑名單或者正則表達式。
低級別 API
低級別的 API 是高級別 API 實現(xiàn)的基礎(chǔ),也是為了一些對維持消費狀態(tài)有特殊需求的場景,比如 Hadoop consumer 這樣的離線 consumer。
高級別 API
這個 API 圍繞著由 KafkaStream 實現(xiàn)的迭代器展開,每個流代表一系列從一個或多個分區(qū)的 broker 上匯聚來的消息,每個流由一個線程處理,所以客戶端可以在創(chuàng)建的時候通過參數(shù)指定想要幾個流。
一個流是多個分區(qū)多個 broker 的合并,但是每個分區(qū)的消息只會流向一個流。
代碼示例
以下是兩個簡單的 Producer 和 Consumer 的代碼示例。
Producer(循環(huán)向topic中發(fā)送消息):
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread{
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic){
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.106.1.234:9092"); //需要替換成自己的broker信息
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
public void run() {
int messageNo = 1;
while(true){
String messageStr = new String("Message_" + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
}
}
}
Consumer(訂閱topic消息,并在控制臺輸出):
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Consumer extends Thread{
private final ConsumerConnector consumer;
private final String topic;
public Consumer(String topic){
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig(){
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect); //需要將zkConnect替換成自己的Zookeeper集群信息
props.put("group.id", "group1");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String(it.next().message()));
}
}
對比
Kafka VS Flume
Kafka 是一個非常通用的系統(tǒng)。多個生產(chǎn)者和消費者可以共享多個主題。相比之下,F(xiàn)lume 被設計為一個旨在往 HDFS 或 HBase 發(fā)送數(shù)據(jù)的專用工具,它對 HDFS 有特殊的優(yōu)化,并且集成了 Hadoop 的安全特性。
Flume 內(nèi)置了很多的 source 和 sink 組件。而 Kafka 只有一個更小的生產(chǎn)消費者生態(tài)系統(tǒng),并且 Kafka 的社區(qū)支持不好。使用 Kafka 通常需要自己編寫生產(chǎn)者和消費者代碼。
Flume 可以使用攔截器實時處理數(shù)據(jù),這對于數(shù)據(jù)屏蔽或者過量是很有用的。而 Kafka 需要外部的流處理系統(tǒng)才能做到。
Kafka 和 Flume 都是可靠的系統(tǒng),通過適當?shù)呐渲枚寄鼙WC零數(shù)據(jù)丟失。然而,F(xiàn)lume 不支持副本事件,如果 Flume 代理的一個節(jié)點奔潰了,即使使用了可靠的文件管道方式,也會丟失這些事件直到恢復這些磁盤。而 Kafka 則沒有這個問題。
Kafka VS RabbitMQ
RabbitMQ 遵循 AMQP 協(xié)議,以 broker 為中心,有消息的確認機制。Kafka 遵從一般的 MQ 結(jié)構(gòu),以 consumer 為中心,無消息確認機制。
Kafka 具有很高的吞吐量,內(nèi)部采用消息的批量處理,消息處理的效率很高。RabbitMQ 在吞吐量方面稍遜于 Kafka,支持對消息的可靠的傳遞,支持事務,但不支持批量的操作,基于存儲的可靠性的要求存儲可以采用內(nèi)存或者硬盤。
Kafka 采用 Zookeeper 對集群中的 broker、consumer 進行管理,可以注冊 topic 到 Zookeeper 上;通過 Zookeeper 的協(xié)調(diào)機制,producer 保存對應 topic 的 broker 信息,可以隨機或者輪詢發(fā)送到 broker 上;并且 producer 可以基于語義指定分片,消息發(fā)送到 broker 的某分片上。而 RabbitMQ 的負載均衡需要單獨的 loadbalancer 進行支持。