kafka (python)

分布式消息發(fā)布和訂閱系統(tǒng)
提供了類似JMS的特性

對用戶行為收集
日志收集

Broker:一個kafka服務(wù)
Producer
consumer
topic: kafka集群的類別,一類數(shù)據(jù)的集合
partition: 每一個topic中具體的物理分區(qū)
consumer group:每一個consumer都有一個對應(yīng)的group 對應(yīng)一個topic,達(dá)到發(fā)布訂閱的功能
官網(wǎng)
博客
視頻

LMS、AQMP消息模型

  • JMS(java消息服務(wù))
    • 點(diǎn)對點(diǎn)(一對一)
      • Quene
  • AMQP(高級消息隊列協(xié)議)
    • 隊列
    • 信箱
    • 綁定


      image.png

創(chuàng)建多broker集群

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

更改屬性

因為原文檔是在一臺機(jī)器上搭建的為分布式,所以在多臺機(jī)器上搭建的時候只用更改brokerid以及對應(yīng)的zookeeper節(jié)點(diǎn)即可
然后我們啟動各個機(jī)器上的broker
多節(jié)點(diǎn)多broker
創(chuàng)建一個新的擁有備份的topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看一下這個topic的描述

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

第一行介紹了所有的分區(qū),其他的行,每一行都介紹了一個分區(qū)的信息,因為我們只創(chuàng)建了一個分區(qū)所以這里只有一行數(shù)據(jù)

  • ‘leader’就是該分區(qū)所屬的broker,負(fù)責(zé)這個分區(qū)的一些讀寫操作。
  • replicas 就是這個分區(qū)的日志備份brokers,無論他們是否是leader還是是否alive
  • isr 這個記錄了被leader捕獲并且還活著的上面replicas的子集

使用kafka connect 來導(dǎo)入/導(dǎo)出數(shù)據(jù)

我們經(jīng)常需要用kafka導(dǎo)入別的數(shù)據(jù)源,或者導(dǎo)出到別的系統(tǒng)。所以kafka提供了個工具叫做kafka connect.
Kafka Connect是Kafka附帶的工具,用于向Kafka導(dǎo)入和導(dǎo)出數(shù)據(jù)。 它是一個可擴(kuò)展的工具,運(yùn)行連接器,實現(xiàn)與外部系統(tǒng)交互的自定義邏輯。
官方文檔給的案例是一個kafak connect,它實現(xiàn)了從文件的導(dǎo)入和導(dǎo)出,producer可以從文件1讀取數(shù)據(jù)進(jìn)入kafka, consumer則讀取數(shù)據(jù)并寫入文件2中,實現(xiàn)了在文件系統(tǒng)中的發(fā)布訂閱。

python kafka

安裝
pip install confluent-kafka
官方文檔
github

Admin API

kafka的控制端,創(chuàng)建、瀏覽、改變、刪除主題和資源


class confluent_kafka.admin.AdminClient(conf)

AdminClient 為kafka 的brokers提供一些控制操作,topics、groups、以及其他borker支持的資源類型。
Admin API方法是異步的,并返回由實體鍵入的concurrent.futures.Future對象的dict。
實體是一個topic 名字供create_topics(), delete_topics(), create_partitions()調(diào)用,并且一個ConfigResource 供alter_configs(), describe_configs()調(diào)用。

查看使用案例 examples/adminapi.py

下面是可以調(diào)用的函數(shù):
alter_configs(resources, **kwargs)
改變配置
create_partitions(new_partitions, **kwargs)
為給定的topic創(chuàng)建新分區(qū)
create_topics(new_topics, **kwargs)
集群創(chuàng)建新topic
delete_topics(topics, **kwargs)
刪除topic
describe_configs(resources, **kwargs)
查看某個特定資源的配置


class confluent_kafka.admin.BrokerMetadata

包含kafka broker 信息的類


class confluent_kafka.admin.ClusterMetadata

包含kafka 集群、brokers、topics信息的對象


class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>,  \
is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])

describe_configs()的返回對象


class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None)

展示擁有配置的資源,通過資源類型和名字進(jìn)行實例化。
ConfigResource.Type 描繪了kafka 資源的type


ConfigResource.set_config(name, value, overwrite=True)
設(shè)置或者重寫配置參數(shù)


class confluent_kafka.admin.ConfigSource

Config sources returned in ConfigEntry by describe_configs().



class confluent_kafka.admin.PartitionMetadata

| Variables: |

  • id (int) – Partition id.
  • leader (int) – Current leader broker for this partition, or -1.
  • replicas (list(int)) – List of replica broker ids for this partition.
  • isrs (list(int)) – List of in-sync-replica broker ids for this partition.
  • error (KafkaError) – Partition error, or None. Value is a KafkaError object.

class confluent_kafka.admin.TopicMetadata

Variables:
topic (str) – Topic name.
partitions (dict) – Map of partitions indexed by partition id. Value is PartitionMetadata object.
error (KafkaError) – Topic error, or None. Value is a KafkaError object.


Producer API

  • 采用異步發(fā)送消息
    image.png
  • 采用同步發(fā)送消息
  • 批量發(fā)送消息
    • 消息大小
    • 延遲時間
  • acks
    生產(chǎn)者發(fā)送消息后,服務(wù)器的回執(zhí)
    • 0
      生產(chǎn)者不等待服務(wù)器,消息發(fā)送到緩沖區(qū)就ok了
    • 1
      broker收到就ok, 不管follower是否備份
    • -1/all
      等到broker收到, follower備份
  • retries
    當(dāng)消息發(fā)送失敗重復(fù)的次數(shù),默認(rèn)為0
  • 至多一次
    acks=0 或acks=1
  • 至少一次
    acks =-1或all
    retries >0
  • 精確一次與冪等
    enable.idempotence = true
    //retries = integer.MAX_VALUE
    acts = all
  • 事務(wù)
    消息要么全部成功,要么全部失敗

class confluent_kafka.Producer

kafka 異步 producer
Producer(config)
config(dict)—參數(shù)屬性,至少需要設(shè)置bootstrap.severs


len()
返回等待傳遞給broker的消息數(shù)和請求數(shù), type:int


flush([timeout])
等待producer隊列中的所有消息被發(fā)送。timeout是最大的堵塞時間,返回仍在隊列中的消息數(shù)


poll([timeout])
輪詢生產(chǎn)者的事件并調(diào)用相應(yīng)的回調(diào)(已注冊的)

  • on_delivery :produce()的回調(diào)
    參數(shù):timeout-最大堵塞時間(秒)
    返回:被處理的事件數(shù)(調(diào)用回調(diào))(int)

list_topics([topic=None][,timeout=-1])
從集群中請求元數(shù)據(jù)。這個方法提供了listTopics(), describeTopics() and describeCluster() 在java客戶端中同樣的信息
參數(shù)

  • topic(str) - 如果提供了這個參數(shù),那么僅僅顯示有關(guān)這個topic的信息,兜著返回所有集群中的topic信息。
  • timeout 最大的響應(yīng)時間在超時之間, -1是無限timeout
    Return type:ClusterMetadata
    Raises: KafkaException

produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
向topic發(fā)送消息。這是一個異步操作,一個應(yīng)用可能會使用回調(diào)(別名 on_delivery)參數(shù)來傳遞一個函數(shù)或者匿名函數(shù),當(dāng)消息被成功發(fā)送或者永久失敗時,就會被poll()調(diào)用。
參數(shù)

  • topic(str)
  • value(str|bytes)- 消息負(fù)載
  • key(str|bytes)-消息key
  • partition(int)- 發(fā)送消息的分區(qū),否則使用默認(rèn)的
  • on_delivery(err,msg) (func) -被調(diào)用的報告回調(diào)函數(shù)
  • timestamp(int)-消息時間戳
  • dict|list (headers)
    消息的消息頭部。頭部的key必須是一個字符串,值必須是二進(jìn)制,unicode或者None。 Accepts a list of (key,value) or a dict。
    Return type:None

Consumer API

class confluent_kafka.Consumer

Consumer(config)
Parameters: config (dict) – 配置參數(shù)。至少得設(shè)置bootstrap.servers和group.id
創(chuàng)建一個新的消費(fèi)端
特殊參數(shù):on_commit:當(dāng)一個commit request 成功或失敗時調(diào)用的回調(diào)


on_commit(err, partitions)
參數(shù):

  • consumer (Consumer) –consumer實例
  • err (KafkaError) – commit error object
  • partitions (list(TopicPartition)) –分區(qū)列表包括他們的 committed offsets or per-partition errors.

assign()
assign(partitions)
將消費(fèi)的消息分區(qū)設(shè)置為提供的TopicPartition列表并開始消費(fèi)
參數(shù)

  • partitions (list(TopicPartition)) – 主題+分區(qū)的列表以及可選擇的消費(fèi)的初始o(jì)ffset

assignment()
返回目前的分區(qū)設(shè)置情況


close()關(guān)閉消費(fèi)者
進(jìn)行的操作:

  • 結(jié)束消費(fèi)
  • 提交offsets 除非消費(fèi)者屬性'enable.auto.commit'設(shè)置為False
  • 離開用戶組

commit()
commit([message=None][, offsets=None][, asynchronous=True])
提交一個信息或者offsets列表
消息和偏移是互斥的,如果兩者都沒有設(shè)置,則使用當(dāng)前分區(qū)分配的偏移。 如果您將'enable.auto.commit'設(shè)置為False,則consumer依賴于您使用此方法


committed()
committed(partitions[, timeout=None])
檢索分區(qū)中已經(jīng)提交的offsets


consume()
consume([num_messages=1][, timeout=-1])
消費(fèi)消息,調(diào)用回調(diào)函數(shù)以及返回消息列表。應(yīng)用程序必須檢查返回的Message對象的Message.error()方法,以區(qū)分正確的消息(error()返回None),或列表中每個Message的事件或錯誤(請參閱error()。code()以獲取詳細(xì)信息)。


get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])
檢索分區(qū)的low and high offsets
cached (bool) – 不是查詢broker所使用的內(nèi)存信息。緩存值:定期更新低偏移量(如果設(shè)置了statistics.interval.ms),同時在從此分區(qū)的broker獲取的每條消息上更新高偏移量。
Return : Tuple of (low,high) on success or None on timeout.


list_topics()
list_topics([topic=None][, timeout=-1])
返回元數(shù)據(jù)


offsets_for_times()
offsets_for_times(partitions[, timeout=None])
offsets_for_times按給定分區(qū)的時間戳查找偏移量。
每個分區(qū)的返回偏移量是最早的偏移量,其時間戳大于或等于相應(yīng)分區(qū)中的給定時間戳。


pause()
pause(partitions)
暫停該分區(qū)的消費(fèi)


poll()
poll([timeout=None])
使用消息,調(diào)用回調(diào)并返回事件。


position()
position(partitions[, timeout=None])
檢索給定分區(qū)的當(dāng)前偏移量。


resume()
resume(partitions)
恢復(fù)提供的分區(qū)列表的消耗。


seek(partition)
將分區(qū)的消耗位置設(shè)置為偏移量。 偏移可以是絕對(> = 0)或邏輯偏移(OFFSET_BEGINNING等)。
seek()可以僅用于更新主動消耗的分區(qū)的消耗偏移(即,在assign()之后),以設(shè)置未被消耗的分區(qū)的起始偏移,而不是在assign()調(diào)用中傳遞偏移。


store_offsets()
store_offsets([message=None][, offsets=None])
存儲消息或偏移列表的偏移量。
消息和偏移是互斥的。 存儲的偏移量將根據(jù)'auto.commit.interval.ms'或手動無偏移提交()進(jìn)行提交。 請注意,使用此API時,“enable.auto.offset.store”必須設(shè)置為False。


subscribe()
subscribe(topics[, listener=None])
設(shè)置訂閱提供的主題列表這將替換以前的訂閱。
可以通過正則化進(jìn)行訂閱:

consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])

回調(diào)函數(shù)
on_assign(consumer, partitions)
on_revoke(consumer, partitions)


unassign()
刪除當(dāng)前分區(qū)設(shè)置并停止消費(fèi)。


unsubscribe()
移除當(dāng)前訂閱

TopicPartition API

class confluent_kafka.TopicPartition

TopicPartition是一種通用類型,用于保存單個分區(qū)及其各種信息。
它通常用于為各種操作提供主題或分區(qū)列表,例如Consumer.assign()。
TopicPartition(topic[, partition][, offset])
實例化一個topicpartition對象
參數(shù)

  • topic (string) – Topic name
  • partition (int) – Partition id
  • offset (int) – Initial partition offset

屬性:

  • error
  • offset
  • partition
  • topic

Message API

Message對象表示單個消費(fèi)或生成的消息,或者一個錯誤事件(error()不是None)。
應(yīng)用程序必須檢查error()以查看對象是否是正確的消息(error()返回None)或錯誤/事件。
這個對象不需要用戶初始化
方法:

  • len() 返回消息大小
  • error() Return type: None or KafkaError 用來檢查是否消息是錯誤事件
  • headers() 檢索消息的頭部。每個頭部都是一個鍵值對。注意消息頭的key是有序且可重復(fù)的
  • key() Returns: message key or None if not available.
  • offset() Returns: message offset or None if not available.
  • partition() Returns: partition number or None if not available.
  • set_headers() Set the field ‘Message.headers’ with new value.
  • set_key() Set the field ‘Message.key’ with new value.
  • set_value() Set the field ‘Message.value’ with new value.
  • timestamp()
    從消息中檢索時間戳類型和時間戳。 時間戳類型是以下之一:
    • TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker
    • TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time)
    • TIMESTAMP_LOG_APPEND_TIME - Broker receive time
  • topic() Returns: topic name or None if not available.
  • value() Returns: message value (payload) or None if not available.

Offset API

邏輯offset常量:

  • OFFSET_BEGINNING - Beginning of partition (oldest offset)
  • OFFSET_END - End of partition (next offset)
  • OFFSET_STORED - Use stored/committed offset
  • OFFSET_INVALID - Invalid/Default offset
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容