發(fā)現(xiàn)問(wèn)題
最近接手新項(xiàng)目,使用Maxwell解析MySQL的Binlog,發(fā)送到Kafka進(jìn)行處理。測(cè)試的時(shí)候發(fā)現(xiàn)一個(gè)問(wèn)題,就是Kafka的數(shù)據(jù)嚴(yán)重傾斜,8個(gè)partition,全部寫到同一個(gè)partition中,另外7個(gè)partition沒(méi)有任何meaasge。
Kafka數(shù)據(jù)傾斜的問(wèn)題一般是由于生產(chǎn)者使用的Partition接口實(shí)現(xiàn)類對(duì)分區(qū)處理的問(wèn)題,一般是對(duì)key做hash之后,對(duì)分區(qū)數(shù)取模。當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過(guò)大,未能充分發(fā)揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢(shì)(參考Apache Kafka 0.10 技術(shù)內(nèi)幕:數(shù)據(jù)傾斜詳解)。
而使用Maxwell解析MySQL的Binlog發(fā)送到Kafka的時(shí)候,生產(chǎn)者是Maxwell,那么數(shù)據(jù)傾斜的問(wèn)題明細(xì)就是Maxwell引起的了。
排查
在Maxwell官網(wǎng)查文檔得知,在Maxwell沒(méi)有配置的情況下,默認(rèn)使用數(shù)據(jù)庫(kù)名database作為計(jì)算分區(qū)的key,并使用Java默認(rèn)的hashcode算法進(jìn)行計(jì)算,項(xiàng)目中maxwell的binlog就是單個(gè)database,所以造成數(shù)據(jù)傾斜。
官方文檔
A binlog event's partition is determined by the selected hash function and hash string as follows
HASH_FUNCTION(HASH_STRING) % TOPIC.NUMBER_OF_PARTITIONS
The HASH_FUNCTION is either java's *hashCode* or *murmurhash3*. The default HASH_FUNCTION is *hashCode*. Murmurhash3 may be set with thekafka_partition_hashoption. The seed value for the murmurhash function is hardcoded to 25342 in the MaxwellKafkaPartitioner class.
The HASH_STRING may be (*database*, *table*, *primary_key*, *column*). The default HASH_STRING is the *database*. The partitioning field can be configured using theproducer_partition_byoption.
Maxwell will discover the number of partitions in its kafka topic upon boot. This means that you should pre-create your kafka topics, and with at least as many partitions as you have logical databases:
bin/kafka-topics.sh --zookeeper ZK_HOST:2181 --create \
--topic maxwell --partitions 20 --replication-factor 2
出處文檔http://maxwells-daemon.io/producers/#kafka-partitioning。
修改Maxwell的配置文件config.properties中加入對(duì)應(yīng)參數(shù)即可,這里我選擇了primary_key作為分區(qū)key,同時(shí)選用murmurhash3
哈希算法,以獲得更好的效率和分布:
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=server1:port1
kafka_topic=topic_name
#######修改partition_by,解決kafka數(shù)據(jù)傾斜######
kafka_partition_hash=murmur3
producer_partition_by=primary_key
# mysql login info
host=******
user=maxwell
password=*****
修改配置后,重啟Maxwell,觀察Offset的變化,隔一段時(shí)間之后,各partition的Offset的增量基本一致,問(wèn)題解決!