kafka消費(fèi)者api分為high api和low api,目前上述demo是都是使用kafka high api,高級(jí)api不用關(guān)心維護(hù)消費(fèi)狀態(tài)信息和負(fù)載均衡,系統(tǒng)會(huì)根據(jù)配置參數(shù),
定期flush offset到zk上,如果有多個(gè)consumer且每個(gè)consumer創(chuàng)建了多個(gè)線程,高級(jí)api會(huì)根據(jù)zk上注冊(cè)consumer信息,進(jìn)行自動(dòng)負(fù)載均衡操作。
注意事項(xiàng):
1.高級(jí)api將會(huì)內(nèi)部實(shí)現(xiàn)持久化每個(gè)分區(qū)最后讀到的消息的offset,數(shù)據(jù)保存在zookeeper中的消費(fèi)組名中(如/consumers/push-token-group/offsets/push-token/2。
其中push-token-group是消費(fèi)組,push-token是topic,最后一個(gè)2表示第3個(gè)分區(qū)),每間隔一個(gè)(默認(rèn)1000ms)時(shí)間更新一次offset,
那么可能在重啟消費(fèi)者時(shí)拿到重復(fù)的消息。此外,當(dāng)分區(qū)leader發(fā)生變更時(shí)也可能拿到重復(fù)的消息。因此在關(guān)閉消費(fèi)者時(shí)最好等待一定時(shí)間(10s)然后再shutdown()
2.消費(fèi)組名是一個(gè)全局的信息,要注意在新的消費(fèi)者啟動(dòng)之前舊的消費(fèi)者要關(guān)閉。如果新的進(jìn)程啟動(dòng)并且消費(fèi)組名相同,kafka會(huì)添加這個(gè)進(jìn)程到可用消費(fèi)線程組中用來消費(fèi)
topic和觸發(fā)重新分配負(fù)載均衡,那么同一個(gè)分區(qū)的消息就有可能發(fā)送到不同的進(jìn)程中。
3.如果消費(fèi)者組中所有consumer的總線程數(shù)量大于分區(qū)數(shù),一部分線程或某些consumer可能無法讀取消息或處于空閑狀態(tài)。
4.如果分區(qū)數(shù)多于線程數(shù)(如果消費(fèi)組中運(yùn)行者多個(gè)消費(fèi)者,則線程數(shù)為消費(fèi)者組內(nèi)所有消費(fèi)者線程總和),一部分線程會(huì)讀取到多個(gè)分區(qū)的消息
5.如果一個(gè)線程消費(fèi)多個(gè)分區(qū)消息,那么接收到的消息是不能保證順序的。
備注:可用zookeeper web ui工具管理查看zk目錄樹數(shù)據(jù): xxx/consumers/push-token-group/owners/push-token/2其中
push-token-group為消費(fèi)組,push-token為topic,2為分區(qū)3.查看里面的內(nèi)容如:
push-token-group-mobile-platform03-1405157976163-7ab14bd1-0表示該分區(qū)被該標(biāo)示的線程所執(zhí)行。
總結(jié):
producer性能優(yōu)化:異步化,消息批量發(fā)送,具體瀏覽上述參數(shù)說明。consumer性能優(yōu)化:如果是高吞吐量數(shù)據(jù),設(shè)置每次拿取消息(fetch.min.bytes)大些,
拿取消息頻繁(fetch.wait.max.ms)些(或時(shí)間間隔短些),如果是低延時(shí)要求,則設(shè)置時(shí)間時(shí)間間隔小,每次從kafka broker拿取消息盡量小些。