踩坑:maxwell寫入kafka數(shù)據(jù)傾斜

發(fā)現(xiàn)問(wèn)題

最近接手新項(xiàng)目,使用Maxwell解析MySQL的Binlog,發(fā)送到Kafka進(jìn)行處理。測(cè)試的時(shí)候發(fā)現(xiàn)一個(gè)問(wèn)題,就是Kafka的數(shù)據(jù)嚴(yán)重傾斜,8個(gè)partition,全部寫到同一個(gè)partition中,另外7個(gè)partition沒(méi)有任何meaasge。
Kafka數(shù)據(jù)傾斜的問(wèn)題一般是由于生產(chǎn)者使用的Partition接口實(shí)現(xiàn)類對(duì)分區(qū)處理的問(wèn)題,一般是對(duì)key做hash之后,對(duì)分區(qū)數(shù)取模。當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過(guò)大,未能充分發(fā)揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢(shì)(參考Apache Kafka 0.10 技術(shù)內(nèi)幕:數(shù)據(jù)傾斜詳解)。
而使用Maxwell解析MySQL的Binlog發(fā)送到Kafka的時(shí)候,生產(chǎn)者是Maxwell,那么數(shù)據(jù)傾斜的問(wèn)題明細(xì)就是Maxwell引起的了。

排查

Maxwell官網(wǎng)查文檔得知,在Maxwell沒(méi)有配置的情況下,默認(rèn)使用數(shù)據(jù)庫(kù)名database作為計(jì)算分區(qū)的key,并使用Java默認(rèn)的hashcode算法進(jìn)行計(jì)算,項(xiàng)目中maxwell的binlog就是單個(gè)database,所以造成數(shù)據(jù)傾斜。

官方文檔

A binlog event's partition is determined by the selected hash function and hash string as follows

  HASH_FUNCTION(HASH_STRING) % TOPIC.NUMBER_OF_PARTITIONS

The HASH_FUNCTION is either java's *hashCode* or *murmurhash3*. The default HASH_FUNCTION is *hashCode*. Murmurhash3 may be set with thekafka_partition_hashoption. The seed value for the murmurhash function is hardcoded to 25342 in the MaxwellKafkaPartitioner class.

The HASH_STRING may be (*database*, *table*, *primary_key*, *column*). The default HASH_STRING is the *database*. The partitioning field can be configured using theproducer_partition_byoption.

Maxwell will discover the number of partitions in its kafka topic upon boot. This means that you should pre-create your kafka topics, and with at least as many partitions as you have logical databases:

bin/kafka-topics.sh --zookeeper ZK_HOST:2181 --create \
                    --topic maxwell --partitions 20 --replication-factor 2

出處文檔http://maxwells-daemon.io/producers/#kafka-partitioning。

修改Maxwell的配置文件config.properties中加入對(duì)應(yīng)參數(shù)即可,這里我選擇了primary_key作為分區(qū)key,同時(shí)選用murmurhash3
哈希算法,以獲得更好的效率和分布:

# tl;dr config
log_level=info

producer=kafka
kafka.bootstrap.servers=server1:port1
kafka_topic=topic_name
#######修改partition_by,解決kafka數(shù)據(jù)傾斜######
kafka_partition_hash=murmur3
producer_partition_by=primary_key

# mysql login info
host=******
user=maxwell
password=*****

修改配置后,重啟Maxwell,觀察Offset的變化,隔一段時(shí)間之后,各partition的Offset的增量基本一致,問(wèn)題解決!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容