Kafka、Zookeeper基于docker安裝,kafka-python簡單使用

Kafka、Zookeeper基于docker安裝,kafka-python簡單使用

kafka-python文檔:KafkaConsumer - kafka-python 2.0.2-dev documentation

一、基本概念

  • Topic:一組消息數(shù)據(jù)的標記符;

  • Producer:生產(chǎn)者,用于生產(chǎn)數(shù)據(jù),可將生產(chǎn)后的消息送入指定的Topic;

  • Consumer:消費者,獲取數(shù)據(jù),可消費指定的Topic;

  • Group:消費者組,同一個group可以有多個消費者,一條消息在一個group中,只會被一個消費者獲?。?/p>

  • Partition:分區(qū),為了保證kafka的吞吐量,一個Topic可以設置多個分區(qū)。同一分區(qū)只能被一個消費者訂閱。

二、本地安裝與啟動(基于Docker)

docker安裝教程參考菜鳥教程:https://www.runoob.com/docker/centos-docker-install.html

  1. 下載zookeeper鏡像與kafka鏡像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
  1. 本地啟動zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper 
  1. 本地啟動kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=外網(wǎng)IP:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://外網(wǎng)IP:9092 \       //
--env KAFKA_LISTENERS=PLAINTEXT://:9092 \
--env KAFKA_JVM_PERFORMANCE_OPTS=" -Xmx256m -Xms256m" \
wurstmeister/kafka:latest

注意:

  1. 上述代碼,將kafka啟動在9092端口
  1. 如果內(nèi)容不足可能會報一個內(nèi)存溢出的錯誤(OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Out of memory' (errno=12)),此時只需要加上 --env KAFKA_JVM_PERFORMANCE_OPTS=" -Xmx256m -Xms256m"指定jvm的內(nèi)存大小即可
–link 用于容器直接的互通。
-e KAFKA_BROKER_ID=0 一個 kafka節(jié)點 就是一個 broker。一個集群由多個 broker 組成。一個 broker可以容納多個 topic
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 配置zookeeper管理kafka的路徑
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://139.226.179.239:9092 把kafka的地址端口注冊給zookeeper,若遠程訪問要改成外網(wǎng)IP,千萬注意是外網(wǎng)IP,很多文章只說是宿主機IP, 演示例子上寫的是內(nèi)網(wǎng)IP,很容易被誤導
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的監(jiān)聽端口
-v /etc/localtime:/etc/localtime 容器時間同步虛擬機的時間
  1. 進入kafka bash
docker exec -it kafka bash
cd /opt/kafka/bin
  1. 創(chuàng)建Topic,分區(qū)為2,Topic name為'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo
  1. 查看當前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
  1. 刪除Topic
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic myDeleteTopic
  1. 安裝kafka-python
pip install kafka-python

三、生產(chǎn)者(Producer)與消費者(Consumer)

生產(chǎn)者和消費者的簡易Demo,這里一起演示:

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json


def producer_demo():
    # 假設生產(chǎn)的消息為鍵值對(不是一定要鍵值對),且序列化方式為json
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 發(fā)送三條消息
    for i in range(0, 3):
        future = producer.send(
            'kafka_demo',
            key='count_num',  # 同一個key值,會被送至同一個分區(qū)
            value=str(i),
            partition=1)  # 向分區(qū)1發(fā)送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10) # 監(jiān)控是否發(fā)送成功           
        except kafka_errors:  # 發(fā)送失敗拋出kafka_errors
            traceback.format_exc()


def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo', 
        bootstrap_servers=':9092',
        group_id='test'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )

這里建議起兩個terminal,或者兩個jupyter notebook頁面來驗證。

先執(zhí)行消費者:

consumer_demo()

再執(zhí)行生產(chǎn)者:

producer_demo()

會看到如下輸出:

>>> producer_demo()
send 0
send 1
send 2
>>> consumer_demo()
receive, key: count_num, value: 0
receive, key: count_num, value: 1
receive, key: count_num, value: 2

四、消費者進階操作

(1)初始化參數(shù):

列舉一些KafkaConsumer初始化時的重要參數(shù):

  • group_id

高并發(fā)量,則需要有多個消費者協(xié)作,消費進度,則由group_id統(tǒng)一。例如消費者A與消費者B,在初始化時使用同一個group_id。在進行消費時,一條消息被消費者A消費后,在kafka中會被標記,這條消息不會再被B消費(前提是A消費后正確commit)。

  • key_deserializer, value_deserializer

與生產(chǎn)者中的參數(shù)一致,自動解析。

  • auto_offset_reset

消費者啟動的時刻,消息隊列中或許已經(jīng)有堆積的未消費消息,有時候需求是從上一次未消費的位置開始讀(則該參數(shù)設置為earliest),有時候的需求為從當前時刻開始讀之后產(chǎn)生的,之前產(chǎn)生的數(shù)據(jù)不再消費(則該參數(shù)設置為latest)。

  • enable_auto_commit, auto_commit_interval_ms

是否自動commit,當前消費者消費完該數(shù)據(jù)后,需要commit,才可以將消費完的信息傳回消息隊列的控制中心。enable_auto_commit設置為True后,消費者將自動commit,并且兩次commit的時間間隔為auto_commit_interval_ms。

(2)手動commit

def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo', 
        bootstrap_servers=':9092',
        group_id='test',
        enable_auto_commit=False
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
        consumer.commit()

(3)查看kafka堆積剩余量

在線環(huán)境中,需要保證消費者的消費速度大于生產(chǎn)者的生產(chǎn)速度,所以需要檢測kafka中的剩余堆積量是在增加還是減小。可以用如下代碼,觀測隊列消息剩余量:

consumer = KafkaConsumer(topic, **kwargs)
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]

print("start to cal offset:")

# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
print("total offset: {}".format(str(toff)))
    
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
print("current offset: {}".format(str(coff)))

# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
print("kafka left: {}".format(left_sum))

原文連接:https://zhuanlan.zhihu.com/p/279784873

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容