【新手快速入門】從零開始學(xué)習(xí)Kafka

簡介

kafka是一個(gè)分布式消息隊(duì)列。具有高性能、持久化、多副本備份、橫向擴(kuò)展能力。生產(chǎn)者往隊(duì)列里寫消息,消費(fèi)者從隊(duì)列里取消息進(jìn)行業(yè)務(wù)邏輯。一般在架構(gòu)設(shè)計(jì)中起到解耦、削峰、異步處理的作用。

Kafka核心組件-intsmaze

  • Topic:消息根據(jù)Topic進(jìn)行歸類,可以理解為一個(gè)隊(duì)里。
  • Producer:消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
  • Consumer:消息消費(fèi)者,向kafka broker取消息的客戶端。
  • broker:每個(gè)kafka實(shí)例(server),一臺kafka服務(wù)器就是一個(gè)broker,一個(gè)集群由多個(gè)broker組成,一個(gè)broker可以容納多個(gè)topic。
  • Zookeeper:依賴集群保存meta信息。
      
    大家先看kafka的介紹或者教程啊,上來都顯示一堆長篇大論,各自文字圖片,看著很懵逼,頭暈。搞程序的,要讓ta跑起來,再針對可運(yùn)行的成果,慢慢了解ta。所以本文會(huì)由淺入深,先實(shí)踐后理論,結(jié)合實(shí)踐講理論。

Kafka安裝配置

下載

wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

解壓

 tar -zxvf kafka_2.11-2.2.0.tgz
 

修改 kafka-server 的配置文件

 cd kafka_2.11-2.2.0
 
vim  config/server.properties

修改其中的:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs

啟動(dòng)zk【默認(rèn)端口2181】

bin/zookeeper-server-start.sh config/zookeeper.properties
image.png

啟動(dòng)Kafka

使用 kafka-server-start.sh 啟動(dòng) kafka 服務(wù):

bin/kafka-server-start.sh config/server.properties
image.png

測試使用

創(chuàng)建 topic

使用 kafka-topics.sh 創(chuàng)建單分區(qū)單副本的 topic demo

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
image.png

查看 topic 列表:

bin/kafka-topics.sh --list --zookeeper localhost:2181
image.png

發(fā)送消息【生產(chǎn)者】

使用 kafka-console-producer.sh 發(fā)送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo

讀取消息【消費(fèi)者】

使用 kafka-console-consumer.sh 接收消息并在終端打?。?/p>

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

image.png

注意不要使用
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning,高版本已經(jīng)不支持

查看描述 topics 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
[root@localhost kafka_2.11-2.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: demo     Partition: 0    Leader: 1       Replicas: 1     Isr: 1

image.png

第一行給出了所有分區(qū)的摘要,每個(gè)附加行給出了關(guān)于一個(gè)分區(qū)的信息。 由于我們只有一個(gè)分區(qū),所以只有一行。

  • “Leader”: 是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。 每個(gè)節(jié)點(diǎn)將成為分區(qū)隨機(jī)選擇部分的領(lǐng)導(dǎo)者。
  • “Replicas”: 是復(fù)制此分區(qū)日志的節(jié)點(diǎn)列表,無論它們是否是領(lǐng)導(dǎo)者,或者即使他們當(dāng)前處于活動(dòng)狀態(tài)。
  • “Isr”: 是一組“同步”副本。這是復(fù)制品列表的子集,當(dāng)前活著并被引導(dǎo)到領(lǐng)導(dǎo)者

擴(kuò)展-集群配置

Kafka 支持兩種模式的集群搭建:可以在單機(jī)上運(yùn)行多個(gè) broker 實(shí)例來實(shí)現(xiàn)集群,也可在多臺機(jī)器上搭建集群,下面介紹下如何實(shí)現(xiàn)單機(jī)多 broker 實(shí)例集群,其實(shí)很簡單,只需要如下配置即可。

單機(jī)多broker 集群配置

利用單節(jié)點(diǎn)部署多個(gè) broker。 不同的 broker 設(shè)置不同的 id,監(jiān)聽端口及日志目錄。 例如:

cp config/server.properties config/server-2.properties
vi config/server-2.properties

修改內(nèi)容:

broker.id=2

listeners = PLAINTEXT://127.0.0.1:9093

log.dirs=/data/kafka-logs2

同樣,配置第三個(gè)broker:

cp config/server-2.properties config/server-3.properties
vi config/server-3.properties

修改內(nèi)容:

broker.id=2

listeners = PLAINTEXT://127.0.0.1:9093

log.dirs=/data/kafka-logs2

listeners 申明此kafka服務(wù)器需要監(jiān)聽的端口號,默認(rèn)會(huì)使用localhost的地址,如果是在遠(yuǎn)程服務(wù)器上運(yùn)行則必須配置,例如:         
listeners=PLAINTEXT:// 192.168.180.128:9092
并確保服務(wù)器的9092端口能夠訪問

啟動(dòng)2/3 borker

bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &

至此,單機(jī)多broker實(shí)例的集群配置完畢。

擴(kuò)展-多機(jī)多borker集群

分別在多個(gè)節(jié)點(diǎn)按上述方式安裝 Kafka,配置啟動(dòng)多個(gè) Zookeeper 實(shí)例。

假設(shè)三臺機(jī)器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137

分別配置多個(gè)機(jī)器上的 Kafka 服務(wù),設(shè)置不同的 broker id,zookeeper.connect 設(shè)置如下:

config/server.properties里面的 zookeeper.connect

zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181

使用 Kafka Connect 來導(dǎo)入/導(dǎo)出數(shù)據(jù)

從控制臺寫入數(shù)據(jù)并將其寫回控制臺是一個(gè)方便的起點(diǎn),但您可能想要使用其他來源的數(shù)據(jù)或?qū)?shù)據(jù)從 Kafka 導(dǎo)出到其他系統(tǒng)。對于許多系統(tǒng),您可以使用 Kafka Connect 來導(dǎo)入或?qū)С鰯?shù)據(jù),而不必編寫自定義集成代碼。

Kafka Connect 是 Kafka 包含的一個(gè)工具,可以將數(shù)據(jù)導(dǎo)入和導(dǎo)出到 Kafka。它是一個(gè)可擴(kuò)展的工具,運(yùn)行 連接器,實(shí)現(xiàn)與外部系統(tǒng)交互的自定義邏輯。在這個(gè)快速入門中,我們將看到如何使用簡單的連接器運(yùn)行 Kafka Connect,這些連接器將數(shù)據(jù)從文件導(dǎo)入到 Kafka topic,并將數(shù)據(jù)從 Kafka topic 導(dǎo)出到文件。

參考:

  • http://www.54tianzhisheng.cn/2018/01/04/Kafka/ 
    
  • http://kafka.apache.org/10/documentation/streams/quickstart
    
  • http://kafka.apache.org/20/documentation.html#quickstart
    

代碼測試

準(zhǔn)備測試kafka

cp config/server.properties config/server-idea.properties
vi config/server-idea.properties
 
broker.id=999

listeners = PLAINTEXT://192.168.1.177:9999

log.dirs=/data/kafka-logs-999

192.168.1.177為kafka所在機(jī)器的ip地址,9999端口號是對外提供的端口,下文會(huì)使用到

Springboot 發(fā)送消息、接受消息源碼

很簡單的一個(gè)小demo,可以直接拷貝使用。

KafkaApplication.java:


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {

        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);

        KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);

        for (int i = 0; i < 10; i++) {
            //調(diào)用消息發(fā)送類中的消息發(fā)送方法
            kafkaTemplate.send("mytopic", System.currentTimeMillis() + "發(fā)送" + i);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @KafkaListener(topics = {"mytopic"},groupId = "halburt-demo2")
    public void consumer1(String message) {
        System.out.println("consumer1收到消息:" + message);
    }

    @KafkaListener(topics = {"mytopic"} ,groupId = "halburt-demo")
    public void consumer2(ConsumerRecord<?, ?> record) {
        System.out.println("consumer2收到消息");
        System.out.println("    topic" + record.topic());
        System.out.println("    key:" + record.key());
        System.out.println("    value:"+record.value());
    }
}

application.yml:

server:
  port: 8090
spring:
  kafka:
    consumer:
      auto-commit-interval: 100
      bootstrap-servers: 192.168.1.177:9999
      enable-auto-commit: true
      group-id: halburt-demo
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 5
    producer:
      bootstrap-servers: 192.168.1.177:9999
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

192.168.1.177:9999即為kafka的配置文件中配置

pom.xml依賴:

依賴版本:

spring-boot.version:2.1.3.RELEASE
spring-kafka.version:2.2.0.RELEASE

【此處有坑】此處依賴版本可以不用這2個(gè)版本,但是一定要注意springboot和kafka的版本對應(yīng)

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
    </dependencies>

啟動(dòng)kafka并run Application.java

bin/kafka-server-start.sh config/server-idea.properties &
image.png

已經(jīng)啟動(dòng)了zk,此處不用再啟動(dòng),如果未啟動(dòng),需要啟動(dòng)zk。


image.png
cd /home/hd/kafka_2.11-2.2.0/
bin/zookeeper-server-start.sh config/zookeeper.properties&

kafka啟動(dòng)成功之后,run Application,會(huì)看到日志如下:


image.png

已經(jīng)接收到消息了。

如果你是跟著本文從頭開始的,一定注意此處有個(gè)坑

如果你是從頭開始跟這個(gè)本文學(xué)習(xí)的,那么你直接啟動(dòng)的話,會(huì)發(fā)現(xiàn)消息發(fā)出去了,但是沒有接收到。
我也是查了好久,看了很多教程,別人都行我就不行。
如果你的zk有其他的topic節(jié)點(diǎn)的話,會(huì)收不到消息,直接上解決方案:刪除所有的zk節(jié)點(diǎn)。怎么刪除?

上碼:


/**
 * zookeeper znode遞歸刪除節(jié)點(diǎn)
 * @author Halburt
 *
 */
public class DeleteZkNode {
    //zookeeper的地址 
    private static final String connectString = "192.168.1.177:2181";

    private static final int sessionTimeout = 2000;

    private static ZooKeeper zookeeper = null;

    /**
     * main函數(shù)
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        //調(diào)用rmr,刪除所有目錄
        rmr("/");
    }

    /**
     * 遞歸刪除 因?yàn)閦ookeeper只允許刪除葉子節(jié)點(diǎn),如果要?jiǎng)h除非葉子節(jié)點(diǎn),只能使用遞歸
     * @param path
     * @throws IOException
     */
    public static void rmr(String path) throws Exception {
        ZooKeeper zk = getZookeeper();
        //獲取路徑下的節(jié)點(diǎn)
        List<String> children = zk.getChildren(path, false);
        for (String pathCd : children) {
            //獲取父節(jié)點(diǎn)下面的子節(jié)點(diǎn)路徑
            String newPath = "";
            //遞歸調(diào)用,判斷是否是根節(jié)點(diǎn)
            if (path.equals("/")) {
                newPath = "/" + pathCd;
            } else {
                newPath = path + "/" + pathCd;
            }
            rmr(newPath);
        }
        //刪除節(jié)點(diǎn),并過濾zookeeper節(jié)點(diǎn)和 /節(jié)點(diǎn)
        if (path != null && !path.trim().startsWith("/zookeeper") && !path.trim().equals("/")) {
            zk.delete(path, -1);
            //打印刪除的節(jié)點(diǎn)路徑
            System.out.println("被刪除的節(jié)點(diǎn)為:" + path);
        }
    }

    /**
     * 獲取Zookeeper實(shí)例
     * @return
     * @throws IOException
     */
    public static ZooKeeper getZookeeper() throws IOException {
        zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
        return zookeeper;
    }

}

終端命令查看消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.177:9999 --topic mytopic  --from-beginning
image.png

安利一下可視化工具Kafka Tool 2

下載地址

Kafka Tool 2是一款Kafka的可視化客戶端工具,可以非常方便的查看Topic的隊(duì)列信息以及消費(fèi)者信息以及kafka節(jié)點(diǎn)信息。下載地址:http://www.kafkatool.com/download.html

使用

先創(chuàng)建連接

下載安裝之后會(huì)彈出一個(gè)配置連接的窗口,我們可以看到這個(gè)窗口左上角為Add Cluster(添加集群),但沒關(guān)系,對應(yīng)單節(jié)點(diǎn)的Kafka實(shí)例來說也是可以的,因?yàn)檫@個(gè)軟件監(jiān)控的是Zookeeper而不是Kafka,Kafka的集群搭建也是依賴Zookeeper來實(shí)現(xiàn)的,所以默認(rèn)情況下我們都是直接通過Zookeeper去完成大部分操作。


image.png

創(chuàng)建完成之后,連接

我們可以看到已經(jīng)創(chuàng)建好的Topic。這個(gè)軟件默認(rèn)顯示數(shù)據(jù)的類型為Byte,可以在設(shè)置里面找到對應(yīng)的修改選項(xiàng)


image.png

接下來就自己探索吧


image.,接下來就自己探索吧png

理論學(xué)習(xí)

kafka單節(jié)點(diǎn)的結(jié)構(gòu)如下:

image.png

單節(jié)點(diǎn)broker包含多個(gè)topic主題,而每個(gè)topic則包含多個(gè)partition副本,每個(gè)partition會(huì)有序的存儲(chǔ)消息。

kafka的總體數(shù)據(jù)流

kafka對外使用topic的概念,生產(chǎn)者往topic里寫消息,消費(fèi)者從topic讀消息。為了做到水平擴(kuò)展,一個(gè)topic實(shí)際是由多個(gè)partition組成的,遇到瓶頸時(shí),可以通過增加partition的數(shù)量來進(jìn)行橫向擴(kuò)容。單個(gè)parition內(nèi)是保證消息有序。每新寫一條消息,kafka就是在對應(yīng)的文件append寫,所以性能非常高。kafka的總體數(shù)據(jù)流是這樣的:


2835676-f378607bc841309a.png

Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息,然后進(jìn)行業(yè)務(wù)處理。

名詞解析

Producer

消費(fèi)者: Producer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于"round-robin"方式或者通過其他的一些算法等.

Consumer

每個(gè)consumer屬于一個(gè)consumer group;反過來說,每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi)(對于一條消息來說,同一組的消費(fèi)者只會(huì)有一個(gè)消費(fèi)者去消費(fèi)).

如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會(huì)在consumers之間負(fù)載均衡.
如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會(huì)廣播給所有的消費(fèi)者.

在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi);每個(gè)group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過一個(gè)consumer可以消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí),消息是順序的。事實(shí)上,從Topic角度來說,消息仍不是有序的。

Topics

一個(gè)Topic可以認(rèn)為是一類消息,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件。任何發(fā)布到此partition的消息都會(huì)被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個(gè)long型數(shù)字,它是唯一標(biāo)記一條消息。它唯一的標(biāo)記一條消息。kafka并沒有提供其他額外的索引機(jī)制來存儲(chǔ)offset,因?yàn)樵趉afka中幾乎不允許對消息進(jìn)行“隨機(jī)讀寫”。

Partition

topic物理上的分組,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列

以下是單個(gè)生產(chǎn)者和消費(fèi)者從兩個(gè)分區(qū)主題讀取和寫入的簡單示例。


image.png

此圖顯示了一個(gè)producer向2個(gè)partition分區(qū)寫入日志,以及消費(fèi)者從相同日志中讀取的內(nèi)容。日志中的每條記錄都有一個(gè)相關(guān)的條目號,稱之為偏移量offset。消費(fèi)者使用此偏移來記錄其在partitiond讀取日志的位置。

當(dāng)然如果存在多個(gè)消費(fèi)者的話,根據(jù)groupId分組,同一組的消費(fèi)者不會(huì)重復(fù)讀取日志。

換句話說:
訂閱topic是以一個(gè)消費(fèi)組來訂閱的,一個(gè)消費(fèi)組里面可以有多個(gè)消費(fèi)者。同一個(gè)消費(fèi)組中的兩個(gè)消費(fèi)者,不會(huì)同時(shí)消費(fèi)一個(gè)partition。換句話來說,就是一個(gè)partition,只能被消費(fèi)組里的一個(gè)消費(fèi)者消費(fèi),但是可以同時(shí)被多個(gè)消費(fèi)組消費(fèi)。因此,如果消費(fèi)組內(nèi)的消費(fèi)者如果比partition多的話,那么就會(huì)有個(gè)別消費(fèi)者一直空閑。
 

其實(shí)consumer可以使用任意順序消費(fèi)日志消息,它只需要將offset重置為任意值.(offset將會(huì)保存在zookeeper中,kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存)

partition有多個(gè).最根本原因是kafka基于文件存儲(chǔ).通過分區(qū),可以將日志內(nèi)容分散到多個(gè)partition上,來避免文件大小達(dá)到單機(jī)磁盤的上限,每個(gè)partiton都會(huì)被當(dāng)前server(kafka實(shí)例)保存;可以將一個(gè)topic切分多任意多個(gè)partitions,來消息保存/消費(fèi)的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.

使用場景

消息系統(tǒng)、消息隊(duì)列

對于一些常規(guī)的消息系統(tǒng),kafka是個(gè)不錯(cuò)的選擇;partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性和性能優(yōu)勢.不過到目前為止,我們應(yīng)該很清楚認(rèn)識到,kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)

日志聚合

kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時(shí)consumer端可以使hadoop等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng).

網(wǎng)站活動(dòng)追蹤、調(diào)用鏈系統(tǒng)、事件采集

可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中.并實(shí)時(shí)監(jiān)控,或者離線統(tǒng)計(jì)分析等

等等其他場景

server.properties配置文件解讀

############################# Server Basics #############################
# 節(jié)點(diǎn)的ID,必須與其它節(jié)點(diǎn)不同
broker.id=0
# 選擇啟用刪除主題功能,默認(rèn)false
#delete.topic.enable=true
############################# Socket Server Settings #############################

# 套接字服務(wù)器堅(jiān)挺的地址。如果沒有配置,就使用java.net.InetAddress.getCanonicalHostName()的返回值
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# 節(jié)點(diǎn)的主機(jī)名會(huì)通知給生產(chǎn)者和消費(fèi)者。如果沒有設(shè)置,如果配置了"listeners"就使用"listeners"的值。
# 否則就使用java.net.InetAddress.getCanonicalHostName()的返回值
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 將偵聽器的名稱映射到安全協(xié)議,默認(rèn)情況下它們是相同的。有關(guān)詳細(xì)信息,請參閱配置文檔
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 服務(wù)器用來接受請求或者發(fā)送響應(yīng)的線程數(shù)
num.network.threads=3

# 服務(wù)器用來處理請求的線程數(shù),可能包括磁盤IO
num.io.threads=8

# 套接字服務(wù)器使用的發(fā)送緩沖區(qū)大小
socket.send.buffer.bytes=102400

# 套接字服務(wù)器使用的接收緩沖區(qū)大小
socket.receive.buffer.bytes=102400

# 單個(gè)請求最大能接收的數(shù)據(jù)量
socket.request.max.bytes=104857600


############################# Log Basics #############################

# 一個(gè)逗號分隔的目錄列表,用來存儲(chǔ)日志文件
log.dirs=/tmp/kafka-logs

# 每個(gè)主題的日志分區(qū)的默認(rèn)數(shù)量。更多的分區(qū)允許更大的并行操作,但是它會(huì)導(dǎo)致節(jié)點(diǎn)產(chǎn)生更多的文件
num.partitions=1

# 每個(gè)數(shù)據(jù)目錄中的線程數(shù),用于在啟動(dòng)時(shí)日志恢復(fù),并在關(guān)閉時(shí)刷新。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# 內(nèi)部主題設(shè)置
# 對于除了開發(fā)測試之外的其他任何東西,group元數(shù)據(jù)內(nèi)部主題的復(fù)制因子“__consumer_offsets”和“__transaction_state”,建議值大于1,以確??捎眯?如3)。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################



# 在強(qiáng)制刷新數(shù)據(jù)到磁盤之前允許接收消息的數(shù)量
#log.flush.interval.messages=10000

# 在強(qiáng)制刷新之前,消息可以在日志中停留的最長時(shí)間
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 以下的配置控制了日志段的處理。策略可以配置為每隔一段時(shí)間刪除片段或者到達(dá)一定大小之后。
# 當(dāng)滿足這些條件時(shí),將會(huì)刪除一個(gè)片段。刪除總是發(fā)生在日志的末尾。

# 一個(gè)日志的最小存活時(shí)間,可以被刪除
log.retention.hours=168

# 一個(gè)基于大小的日志保留策略。段將被從日志中刪除只要剩下的部分段不低于log.retention.bytes。
#log.retention.bytes=1073741824

# 每一個(gè)日志段大小的最大值。當(dāng)?shù)竭_(dá)這個(gè)大小時(shí),會(huì)生成一個(gè)新的片段。
log.segment.bytes=1073741824

# 檢查日志段的時(shí)間間隔,看是否可以根據(jù)保留策略刪除它們
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=localhost:2181

# 連接到Zookeeper的超時(shí)時(shí)間
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

group.initial.rebalance.delay.ms=0

參考文章

https://www.cnblogs.com/likehua/p/3999538.html

http://www.itdecent.cn/p/d3e963ff8b70

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容