kafka小記

1.基本概念

  • broker
    kafka由一臺(tái)或多臺(tái)機(jī)器組成,每一臺(tái)機(jī)器都是一個(gè)broker

  • topic
    每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為T(mén)opic。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

  • partition
    Parition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.

  • Segment
    partition物理上由多個(gè)segment組成

  • offset
    每個(gè)partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。partition中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset,用于partition唯一標(biāo)識(shí)一條消息.

  • producer
    負(fù)責(zé)發(fā)布消息到Kafka broker

  • consumer
    消息消費(fèi)者,向Kafka broker讀取消息的客戶端。

  • Consumer Group
    每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)

2. kafka拓?fù)浣Y(jié)構(gòu)

3.小結(jié)

  1. 每個(gè)topic對(duì)應(yīng)一個(gè)或多個(gè)partition,每個(gè)partion都是一個(gè)單獨(dú)的文件夾
  2. 消費(fèi)者消費(fèi)完消息之后并不會(huì)真的從物理上刪除這條數(shù)據(jù),這條數(shù)據(jù)依舊會(huì)被保留,刪除的時(shí)間根據(jù)配置文件決定
  3. 使用Consumer high level API時(shí),同一Topic的一條消息只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi),但多個(gè)Consumer Group可同時(shí)消費(fèi)這一消息

4.常用命令

# kafka集群?jiǎn)?dòng)
nohup /home/kafka/bin/kafka-server-start.sh -daemon /home/kafka/config/server.properties 1>/export/logs/kafka/stdout.log 2>/export/logs/kafka/stderr.log &

# 關(guān)閉kafka集群
sudo ./kafka-server-stop.sh

# 創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 3 --partitions 3 --topic test

# 查看topic信息
./kafka-topics.sh --zookeeper emr-header-1:2181 --topic test --describe

# 獲取topic的最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list emr-header-1:9092 --topic test2 --time -1

# 生產(chǎn)者
./kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test

# 消費(fèi)者
./kafka-console-consumer.sh --zookeeper emr-header-1:2181 --topic test --from-beginning


# 刪除topic
# 法一
./bin/kafka-topics.sh  --delete --zookeeper emr-header-1:2181  --topic test

# 法二
# 登錄zookeeper客戶端
/usr/lib/zookeeper-current/bin/zkCli.sh
# 找到topic所在的目錄
ls /brokers/topics/
ls /admin/delete_topics/
# 刪除topic
rmr /brokers/topics/名稱
rmr /admin/delete_topics/名稱
刪除log存儲(chǔ)位置對(duì)應(yīng)的partition

5.python操作Kafka

生產(chǎn)者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer

def test_producer():
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all', api_version=(0, 10))
    for i in range(10):
        producer.send('test2', str(i), partition=i % 3)
        producer.flush()

    producer.close()
    print 'ok'


if __name__ == '__main__':
    test_producer()

消費(fèi)者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer

def test_consumer():
    consumer = KafkaConsumer('test2', bootstrap_servers=['localhost:9092'], group_id='test_group', api_version=(0, 10))
    while True:
        for message in consumer:
            print message.value

        import time
        time.sleep(1)


if __name__ == '__main__':
    test_consumer()

6.優(yōu)秀的文章

【美團(tuán)】Kafka文件存儲(chǔ)機(jī)制那些事

【infoq】Kafka背景及架構(gòu)介紹

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容