通常好產(chǎn)品可以理解為設(shè)計(jì)的原理很奇妙,能夠健壯地執(zhí)行,使用者實(shí)戰(zhàn)也不用花太多精力學(xué)習(xí)。
Kafka是一款簡單又實(shí)用的產(chǎn)品。知識的深入通常都是實(shí)戰(zhàn)中遇到問題累積,沉綻后得到提升。好的產(chǎn)品設(shè)計(jì)的初衷通常是讓使用者可以快速上手,馬上投入運(yùn)行。 至于是否真要深入,那就看使用者意愿了。 畢竟市面上成熟東西實(shí)在太多了,有足夠的精力更應(yīng)該多挖掘其他有前景的產(chǎn)品。
雖然這樣說,在使用前Kafka該懂的原理還是必須得了解的。 尤其是Partitions分區(qū)(簡單點(diǎn),就是多條隊(duì)列分支)。可謂不懂就不知道如何高性能應(yīng)用。
讀了網(wǎng)上很多的相關(guān)介紹,這一篇個人認(rèn)為其圖解是最通俗易懂的:漫畫:圖解 Kafka,看本篇就足夠啦。
建議沒有掌握其原理的使用者可以先閱讀了解一二。
使用docker-compose 部署Kafka
由于kafka運(yùn)行依賴zookeeper,除了安裝kafka外,還得先在系統(tǒng)上安裝zookeeper。
經(jīng)過網(wǎng)上的各種介紹和試錯。最簡單無腦的安裝方式就是使用docker compose。
創(chuàng)建好docker-compose.yml 文件
version: '3.9'
services:
zookeeper:
image: wurstmeister/zookeeper ## 鏡像
container_name: zookeeper ## 容器名稱
ports:
- "2181:2181" ## 對外暴露的端口號
volumes:
- /etc/localtime:/etc/localtime ## 掛載時(shí)區(qū)(kafka鏡像和宿主機(jī)器之間時(shí)間保持一致)
kafka:
image: wurstmeister/kafka ## 鏡像
container_name: kafka ## 容器名稱
ports:
- "9092:9092" ## 對外暴露的端口號
environment: ## 環(huán)境變量
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 #按實(shí)際應(yīng)該是寫入宿主IP
KAFKA_CREATE_TOPICS: "test:1:1"
depends_on: ## 依賴情況
- zookeeper
構(gòu)建容器即可:
cd [當(dāng)前存放docker-compose.yml的目錄]
docker-compose up -d
成功后,查看容器信息
% docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
bb4a4fc60f84 wurstmeister/kafka "start-kafka.sh" 58 minutes ago Up 58 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
b10413ec436e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 59 minutes ago Up 59 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
測試驗(yàn)證
進(jìn)入容器
docker exec -it kafka bash
1、創(chuàng)建 一個topic
參數(shù)說明:
--create --topic test 指示了要創(chuàng)建topic名字叫 test
--zookeeper zookeeper:2181 指示了 zookeeper 服務(wù)的地址
$ kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1
2、查看 topic 詳情
$ kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test Topic: test TopicId: Pu_5qECuTR-Gfi7Br9qCvA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
3、獲得topic的列表
$ kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
test
4、消息生產(chǎn)(即發(fā)送消息)
$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#可以發(fā)送消息
>hello
>hello world
5、消費(fèi)消息(即指接收消息方)
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
#可以開始接收到 producer.sh 發(fā)送的消息。
``
NodeJS連接Kafka(使用Kafka-node組件)
建立一個NodeJS項(xiàng)目(這里略過),下一步就是安裝依賴
npm i kafka-node
第一個步驟就是連接kafka服務(wù)的代碼
#引入組件
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})
第二步,可以通過代碼的方式創(chuàng)建topic
#創(chuàng)建兩個topic
#其中主題為topic-for-1 擁有1個分區(qū)數(shù)
#主為 topic-for-8 擁有4個分區(qū)數(shù)
function createTopics(){
const topicsToCreate = [{
topic: 'topic-for-1',
partitions: 1, //分區(qū)數(shù)量
replicationFactor: 1 //副本數(shù)量
},
{
topic: 'topic-for-8',
partitions: 4, //分區(qū)數(shù)量
replicationFactor: 1, //副本數(shù)量
// Optional set of config entries
configEntries: [
{
name: 'compression.type',
value: 'gzip'
},
{
name: 'min.compaction.lag.ms',
value: '50'
}
]
}];
client.createTopics(topicsToCreate, (error, result) => {
console.log(result);
console.log(error);
});
}
第三步,先編寫消費(fèi)消息(即指接收消息方)的代碼段
這個組件會提供兩種的類(kafka.Consumer 和kafka.ConsumerGroup)給使用者使用。通常情況下,使用Consumer即可。 ConsumerGroup封裝了kafka.KafkaClient和kafka.Consumer,并且提供了更多可控的參數(shù)。
function createConsumer(callback){
//消費(fèi)者
let self=this;
let consumer = new kafka.Consumer(this.client,
[
{ topic: 'topic-for-1', partition: 0 ,groupId:"my_group"},
{ topic: 'topic-for-8', partition: 1 }
#注:監(jiān)聽兩個topic,分別名為topic-for-1,topic-for-8。其中
#監(jiān)聽topic-for-1的0號分區(qū),設(shè)置組名為 my_group。
#即如果還有機(jī)器加入,并且組名也為my_group時(shí),則代表同屬同一個組。
],{
autoCommit:true //為true,代表自動提交offset
});
consumer.on('message',function(message){
console.log('consumer receive message:')
console.log(message);
callback(message);
});
consumer.on('error',function(err){
console.log('consumer err:')
console.log(err);
});
return consumer;
}
第四步,可以建立另一個項(xiàng)目來作為構(gòu)建消息生產(chǎn),或者同一個項(xiàng)目下使用setTimeout之類函數(shù)做演示。
組件提供兩種消息生產(chǎn)(即發(fā)送消息)的類:kafka.HighLevelProducer和kafka.Producer。用法是一致,其中kafka.HighLevelProducer為高可用消息生產(chǎn)。
function createHighLevelProducter(callback){
const producer=new kafka.HighLevelProducer(client);
producer.on('ready',function(){
console.log('kafka producer is connected and ready');
callback(producer);
});
producer.on('error',function(err){
console.log('kafka producer is error');
});
}
最后,封裝一系列的函數(shù)后, 如果只在一個終端上,我們就可以編寫一小段代碼來做測試。
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})
#創(chuàng)建topic
createTopics();
#啟動消息消費(fèi)
createConsumer((message)=>{
console.log('event callback receive message:');
console.log(message);
});
createHighLevelProducter((producter)=>{
#對主題:topic-for-1的第1個分區(qū)發(fā)送消息,對主題:topic-for-8的第2個分區(qū)發(fā)送消息。
setTimeout(()=>{
const record={
topic:'topic-for-1',
message:"Hello topic1!",
key:'topic1-1',
attributes:1,
partition:0
},
{
topic:'topic-for-8',
message:"Hello topic8。",
key:'topic8-1',
attributes:1,
partition:1
};
producter.send(record, (err, data) => {
if (err) {
console.log(`producer send err: ${err}`)
}
console.log(data);
});
},500);
});
完成后可以觀察具體的數(shù)據(jù)結(jié)果。