docker kafka 入門實(shí)踐

1 前置條件

Linux-centos7 && docker && docker compose

2. 鏡像

3. docker-compose.yml示例

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.18.55.63 # 此ip是我自己的內(nèi)網(wǎng)ID
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

備注:亦可參照https://hub.docker.com/r/wurstmeister/kafka/站點(diǎn)的信息

3.啟動(dòng)

在docker-compose.yml所在的目錄執(zhí)行以下命令:

docker-compose up -d

運(yùn)行:

docker ps -a

查看docker的容器信息:

CONTAINER ID        IMAGE                                     COMMAND                  CREATED             STATUS              PORTS                                                                        NAMES
ffa65c7bce19        wurstmeister/kafka                        "start-kafka.sh"         6 days ago          Up 6 days           0.0.0.0:32768->9092/tcp                                                      kafka_kafka_1
6d9cd607fc78        wurstmeister/zookeeper                    "/bin/sh -c '/usr/sb…"   6 days ago          Up 6 days           22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32769->2181/tcp                          kafka_zookeeper_1

4. 測(cè)試

  • 進(jìn)入容器
docker exec -it kafka_kafka_1 /bin/bash
  • 創(chuàng)建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic xiaojun --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 --partitions 1
  • 查看topic信息
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1 --topic xiaojun

輸出:

Topic:xiaojun   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: xiaojun  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
  • 發(fā)布消息[發(fā)布完按ctrl+C退出]
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=xiaojun --broker-list kafka_kafka_1:9092

我發(fā)布了三條信息:

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me
  • 消費(fèi)信息
    1 從頭消費(fèi)
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun

輸出:

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me

2 按offset消費(fèi)

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun

輸出:

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
Can you help me
  • 刪除topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --topic test  --zookeeper kafka_zookeeper_1:2181

5. PHP實(shí)踐

1. Producer:
<?php

$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$topic = $rk->newTopic("test");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $rk->poll(0);
}

while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}

?>
2. Consumer:
<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer->subscribe(['test']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

6 最后簡(jiǎn)要說(shuō)一下kafka快的原理

6.1 先說(shuō)幾個(gè)概念

  • producer:生產(chǎn)者
  • consumer:消費(fèi)者
  • topic:主題,可以理解為隊(duì)列的名稱
  • broker:集群的一個(gè)實(shí)例,可以是指定的服務(wù)器IP+端口
  • group:消費(fèi)者可以是一個(gè)群組

6.2 原理

參考:
kafka效率
為什么Kafka那么快
總結(jié):

寫入優(yōu)化

讀寫數(shù)據(jù)的效率瓶頸主要在于磁盤IO,所謂磁盤IO,需要經(jīng)歷機(jī)械磁盤的磁臂移動(dòng)(尋道時(shí)間)和旋轉(zhuǎn)延遲(尋址時(shí)間),如果是隨機(jī)IO,那么這個(gè)時(shí)間耗費(fèi)就更大了,但是呢,如果是順序IO,則可以大大加快讀寫操作的效率。

以上說(shuō)的簡(jiǎn)單的優(yōu)化產(chǎn)生了數(shù)量級(jí)的加速。 批處理導(dǎo)致更大的網(wǎng)絡(luò)數(shù)據(jù)包,更大的順序磁盤操作,連續(xù)的內(nèi)存塊等,這些都允許Kafka將隨機(jī)消息寫入的突發(fā)流轉(zhuǎn)換成流向消費(fèi)者的線性寫入。

當(dāng)然,即便是順序?qū)懭胗脖P,硬盤的訪問(wèn)速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁(yè)存儲(chǔ)來(lái)提高I/O效率。

Kafka還使用了Memory Mapped Files進(jìn)行寫入優(yōu)化。

Memory Mapped Files(后面簡(jiǎn)稱mmap)也被翻譯成內(nèi)存映射文件,在64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的Page來(lái)實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對(duì)物理內(nèi)存的操作會(huì)被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候進(jìn)行操作)。

通過(guò)mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存),也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底。
使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷(調(diào)用文件的read會(huì)把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中,然后再?gòu)?fù)制到用戶空間的內(nèi)存中。)。但這種做法也有一個(gè)很明顯的缺陷——不可靠,寫到mmap中的數(shù)據(jù)并沒(méi)有被真正的寫到硬盤,操作系統(tǒng)會(huì)在程序主動(dòng)調(diào)用flush的時(shí)候才把數(shù)據(jù)真正的寫到硬盤。Kafka提供了一個(gè)參數(shù)——producer.type來(lái)控制是不是主動(dòng)flush,如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫同步(sync);寫入mmap之后立即返回Producer不調(diào)用flush叫異步(async)。

同時(shí),Kafka采用由生產(chǎn)者,經(jīng)紀(jì)人和消費(fèi)者共享的標(biāo)準(zhǔn)化二進(jìn)制消息格式(樣數(shù)據(jù)塊就可以在它們之間自由傳輸,無(wú)需轉(zhuǎn)換)來(lái)避免無(wú)效率的字節(jié)復(fù)制。

讀取優(yōu)化

另外一個(gè)很重要的優(yōu)化是:
通常來(lái)說(shuō),數(shù)據(jù)從文件傳輸?shù)絪ocket的公共數(shù)據(jù)路徑是:

    1. 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁(yè)緩存
    1. 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
    1. 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
    1. 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出

這樣做明顯是低效的,這里有四次拷貝,兩次系統(tǒng)調(diào)用。如果使用sendfile,再次拷貝可以被避免:允許操作系統(tǒng)將數(shù)據(jù)直接從頁(yè)緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的。

以上就是所謂的sendfile和zero copy。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容