1 前置條件
Linux-centos7 && docker && docker compose
2. 鏡像
3. docker-compose.yml示例
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.18.55.63 # 此ip是我自己的內(nèi)網(wǎng)ID
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
備注:亦可參照https://hub.docker.com/r/wurstmeister/kafka/站點(diǎn)的信息
3.啟動(dòng)
在docker-compose.yml所在的目錄執(zhí)行以下命令:
docker-compose up -d
運(yùn)行:
docker ps -a
查看docker的容器信息:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ffa65c7bce19 wurstmeister/kafka "start-kafka.sh" 6 days ago Up 6 days 0.0.0.0:32768->9092/tcp kafka_kafka_1
6d9cd607fc78 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 6 days ago Up 6 days 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32769->2181/tcp kafka_zookeeper_1
4. 測(cè)試
- 進(jìn)入容器
docker exec -it kafka_kafka_1 /bin/bash
- 創(chuàng)建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic xiaojun --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 --partitions 1
- 查看topic信息
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1 --topic xiaojun
輸出:
Topic:xiaojun PartitionCount:1 ReplicationFactor:1 Configs:
Topic: xiaojun Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
- 發(fā)布消息[發(fā)布完按ctrl+C退出]
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=xiaojun --broker-list kafka_kafka_1:9092
我發(fā)布了三條信息:
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me
- 消費(fèi)信息
1 從頭消費(fèi)
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
輸出:
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me
2 按offset消費(fèi)
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
輸出:
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
Can you help me
- 刪除topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --topic test --zookeeper kafka_zookeeper_1:2181
5. PHP實(shí)踐
- 使用擴(kuò)展:php-rdkafka
- 文檔:https://arnaud-lb.github.io/php-rdkafka/phpdoc/index.html
- docker php-kafka示例:https://github.com/10xjzheng/php-kafka
- PHP Manual Examples :
1. Producer:
<?php
$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$topic = $rk->newTopic("test");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$rk->poll(0);
}
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
?>
2. Consumer:
<?php
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['test']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
6 最后簡(jiǎn)要說(shuō)一下kafka快的原理
6.1 先說(shuō)幾個(gè)概念
- producer:生產(chǎn)者
- consumer:消費(fèi)者
- topic:主題,可以理解為隊(duì)列的名稱
- broker:集群的一個(gè)實(shí)例,可以是指定的服務(wù)器IP+端口
- group:消費(fèi)者可以是一個(gè)群組
6.2 原理
參考:
kafka效率
為什么Kafka那么快
總結(jié):
寫入優(yōu)化
讀寫數(shù)據(jù)的效率瓶頸主要在于磁盤IO,所謂磁盤IO,需要經(jīng)歷機(jī)械磁盤的磁臂移動(dòng)(尋道時(shí)間)和旋轉(zhuǎn)延遲(尋址時(shí)間),如果是隨機(jī)IO,那么這個(gè)時(shí)間耗費(fèi)就更大了,但是呢,如果是順序IO,則可以大大加快讀寫操作的效率。
以上說(shuō)的簡(jiǎn)單的優(yōu)化產(chǎn)生了數(shù)量級(jí)的加速。 批處理導(dǎo)致更大的網(wǎng)絡(luò)數(shù)據(jù)包,更大的順序磁盤操作,連續(xù)的內(nèi)存塊等,這些都允許Kafka將隨機(jī)消息寫入的突發(fā)流轉(zhuǎn)換成流向消費(fèi)者的線性寫入。
當(dāng)然,即便是順序?qū)懭胗脖P,硬盤的訪問(wèn)速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁(yè)存儲(chǔ)來(lái)提高I/O效率。
Kafka還使用了Memory Mapped Files進(jìn)行寫入優(yōu)化。
Memory Mapped Files(后面簡(jiǎn)稱mmap)也被翻譯成內(nèi)存映射文件,在64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的Page來(lái)實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對(duì)物理內(nèi)存的操作會(huì)被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候進(jìn)行操作)。
通過(guò)mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存),也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底。
使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷(調(diào)用文件的read會(huì)把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中,然后再?gòu)?fù)制到用戶空間的內(nèi)存中。)。但這種做法也有一個(gè)很明顯的缺陷——不可靠,寫到mmap中的數(shù)據(jù)并沒(méi)有被真正的寫到硬盤,操作系統(tǒng)會(huì)在程序主動(dòng)調(diào)用flush的時(shí)候才把數(shù)據(jù)真正的寫到硬盤。Kafka提供了一個(gè)參數(shù)——producer.type來(lái)控制是不是主動(dòng)flush,如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫同步(sync);寫入mmap之后立即返回Producer不調(diào)用flush叫異步(async)。
同時(shí),Kafka采用由生產(chǎn)者,經(jīng)紀(jì)人和消費(fèi)者共享的標(biāo)準(zhǔn)化二進(jìn)制消息格式(樣數(shù)據(jù)塊就可以在它們之間自由傳輸,無(wú)需轉(zhuǎn)換)來(lái)避免無(wú)效率的字節(jié)復(fù)制。
讀取優(yōu)化
另外一個(gè)很重要的優(yōu)化是:
通常來(lái)說(shuō),數(shù)據(jù)從文件傳輸?shù)絪ocket的公共數(shù)據(jù)路徑是:
- 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁(yè)緩存
- 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
- 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
- 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這樣做明顯是低效的,這里有四次拷貝,兩次系統(tǒng)調(diào)用。如果使用sendfile,再次拷貝可以被避免:允許操作系統(tǒng)將數(shù)據(jù)直接從頁(yè)緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的。
以上就是所謂的sendfile和zero copy。