1.基本概念
broker
kafka由一臺(tái)或多臺(tái)機(jī)器組成,每一臺(tái)機(jī)器都是一個(gè)brokertopic
每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為T(mén)opic。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)partition
Parition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.-
Segment
partition物理上由多個(gè)segment組成
offset
每個(gè)partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。partition中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset,用于partition唯一標(biāo)識(shí)一條消息.producer
負(fù)責(zé)發(fā)布消息到Kafka brokerconsumer
消息消費(fèi)者,向Kafka broker讀取消息的客戶端。Consumer Group
每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)
2. kafka拓?fù)浣Y(jié)構(gòu)
3.小結(jié)
- 每個(gè)topic對(duì)應(yīng)一個(gè)或多個(gè)partition,每個(gè)partion都是一個(gè)單獨(dú)的文件夾
- 消費(fèi)者消費(fèi)完消息之后并不會(huì)真的從物理上刪除這條數(shù)據(jù),這條數(shù)據(jù)依舊會(huì)被保留,刪除的時(shí)間根據(jù)配置文件決定
- 使用Consumer high level API時(shí),同一Topic的一條消息只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi),但多個(gè)Consumer Group可同時(shí)消費(fèi)這一消息
4.常用命令
# kafka集群?jiǎn)?dòng)
nohup /home/kafka/bin/kafka-server-start.sh -daemon /home/kafka/config/server.properties 1>/export/logs/kafka/stdout.log 2>/export/logs/kafka/stderr.log &
# 關(guān)閉kafka集群
sudo ./kafka-server-stop.sh
# 創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 3 --partitions 3 --topic test
# 查看topic信息
./kafka-topics.sh --zookeeper emr-header-1:2181 --topic test --describe
# 獲取topic的最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list emr-header-1:9092 --topic test2 --time -1
# 生產(chǎn)者
./kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
# 消費(fèi)者
./kafka-console-consumer.sh --zookeeper emr-header-1:2181 --topic test --from-beginning
# 刪除topic
# 法一
./bin/kafka-topics.sh --delete --zookeeper emr-header-1:2181 --topic test
# 法二
# 登錄zookeeper客戶端
/usr/lib/zookeeper-current/bin/zkCli.sh
# 找到topic所在的目錄
ls /brokers/topics/
ls /admin/delete_topics/
# 刪除topic
rmr /brokers/topics/名稱
rmr /admin/delete_topics/名稱
刪除log存儲(chǔ)位置對(duì)應(yīng)的partition
5.python操作Kafka
生產(chǎn)者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
def test_producer():
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', api_version=(0, 10))
for i in range(10):
producer.send('test2', str(i), partition=i % 3)
producer.flush()
producer.close()
print 'ok'
if __name__ == '__main__':
test_producer()
消費(fèi)者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
def test_consumer():
consumer = KafkaConsumer('test2', bootstrap_servers=['localhost:9092'], group_id='test_group', api_version=(0, 10))
while True:
for message in consumer:
print message.value
import time
time.sleep(1)
if __name__ == '__main__':
test_consumer()