百萬架構(gòu)師第三十三課:kafka:分布式消息通信Kafka(一)|JavaGuide

課程目標

  • Kafka 產(chǎn)生的背景
  • Kafka 的架構(gòu)
  • Kafka 的安裝部署和集群部署
  • Kafka 的基本操作
  • Kafka 的應用

Kafka 的簡介

  • 高性能
  • 高吞吐量

什么是 Kafka

Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng),具有高性能、高吞吐量的特點而被廣泛應用于大數(shù)據(jù)傳輸場景。它是由 LinkedIn公 司開發(fā),使用 Scala 語言編寫,之后成為 Apache 基金會的一個頂級項目。 kafka 提供了類似 JMS 的特性,但是在設計和實現(xiàn)上是完全不同的,而且他也不是 JMS 規(guī)范的實現(xiàn)。

Kafka 產(chǎn)生的背景

kafka作為一個消息系統(tǒng),早期設計的目的是用作 LinkedIn 的活動流(Activity Stream)和運營數(shù)據(jù)處理管道(Pipeline)?;顒恿鲾?shù)據(jù)是所有的網(wǎng)站對用戶的使用情況做分析的時候要用到的最常規(guī)的部分,活動數(shù)據(jù)包括頁面的訪問量(PV)、被查看內(nèi)容方面的信息以及搜索內(nèi)容。這種數(shù)據(jù)通常的處理方式是先把各種活動以日志的形式寫入某種文件,然后周期性地對這些文件進行統(tǒng)計分析。運營數(shù)據(jù)指的是服務器的性能數(shù)據(jù)(CPU、IO使用率、請求時間、服務日志等)。

Kafka 的應用場景

  • 內(nèi)置分區(qū)

  • 實現(xiàn)集群

      `spring cloud stream` 也有 Kafka 的實現(xiàn)。
    

    由于 kafka 具有更好的吞吐量、內(nèi)置分區(qū)、冗余及容錯性的優(yōu)點 ( kafka 每秒可以處理幾十萬消息 ) ,讓 kafka 成為了一個很好的大規(guī)模消息處理應用的解決方案。所以在企業(yè)級應用中,主要會應用于如下幾個方面:

  • 行為跟蹤:kafka可以用于跟蹤用戶瀏覽頁面、搜索及其他行為。通過發(fā)布-訂閱模式實時記錄到對應的topic中,通過后端大數(shù)據(jù)平臺接入處理分析,并做更進一步的實時處理和監(jiān)控。

  • 日志收集:在日志收集方面,有很多比較優(yōu)秀的產(chǎn)品,比如 Apache Flume,很多公司使用kafka 代理日志聚合。日志聚合表示從服務器上收集日志文件,然后放到一個集中的平臺(文件服務器)進行處理。在實際應用開發(fā)中,我們應用程序的 log 都會輸出到本地的磁盤上, 排查問題的話通過 linux 命令來搞定,如果應用程序組成了負載均衡集群,并且集群的機器有幾十臺以上,那么想通過日志快速定位到問題,就是很麻煩的事情了。所以一般都會做一個日志統(tǒng)一收集平臺管理 log 日志用來快速查詢重要應用的問題。所以很多公司的套路都是把應用日志集中到 kafka 上,然后分別導入到 es 和 hdfs 上,用來做實時檢索分析和離線統(tǒng)計數(shù)據(jù)備份等。而另一方面,kafka 本身又提供了很好的 api 來集成日志并且做日志收集。

JavaGuide_Kafka_通信1_ELK圖解.png

Kafka 本身的架構(gòu)

一個典型的 kafka 集群包含若干 Producer(可以是應用節(jié)點產(chǎn)生的消息,也可以是通過Flume 收集日志產(chǎn)生的事件),若干個 Broker(kafka 支持水平擴展)、若干個 Consumer 、Group,以及一個 zookeeper 集群。kafka 通過 zookeeper 管理集群配置及服務協(xié)同。
  • Producer 使用 push 模式將消息發(fā)布到 broker,consumer 通過監(jiān)聽使用 pull 模式從broker 訂閱并消費消息。

  • 多個 broker 協(xié)同工作, producer 和 consumer 部署在各個業(yè)務邏輯中。三者通過zookeeper 管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)。這樣就組成了一個高性能的分布式消息發(fā)布和訂閱系統(tǒng)。圖上有一個細節(jié)是和其他 mq 中間件不同的點。producer 發(fā)送消息到 broker 的過程是 push,而 consumer 從 broker 消費消息的過程是 pull,主動去拉數(shù)據(jù)。而不是 broker 把數(shù)據(jù)主動發(fā)送給 consumer 。

  • Topic 主題
  • partion 數(shù)據(jù)分區(qū)
  • group
JavaGuide_Kafka_通信1_Kafka通信分區(qū).png

kafka 的安裝部署

下載安裝包

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz

kafka_2.11-1.1.0.tgx: http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz

安裝過程

  1. tar -zxvf 解壓安裝包
[root@Darian1 software]# tar -zxvf kafka_2.11-2.0.0.tgz

kafka 目錄介紹

  1. /bin 操作 kafka 的可執(zhí)行腳本
  2. /config 配置文件
  3. /libs 依賴庫目錄
  4. /logs 日志數(shù)據(jù)目錄

啟動 /停止 kafka

  1. 需要先啟動 zookeeper ,如果沒有搭建 zookeeper 環(huán)境,可以直接運行kafka內(nèi)嵌的zookeeper
    啟動命令: bin/zookeeper-server-start.sh config/zookeeper.properties &
  2. 進入kafka目錄,運行 bin/kafka-server-start.sh {-daemon 后臺啟動} config/server.properties &
  3. 進入kafka目錄,運行bin/kafka-server-stop.sh config/server.properties

運行的外部的 Zookeeper 集群的 Kafka

[root@Darian1 bin]# vim ../config/server.properties 

zookeeper.connect=168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 
    
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties 

如果超時時間連接較長可以延長時間。

[root@Darian1 bin]# vim ../config/server.properties 

zookeeper.connect=192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=30000

Kafka 的基本操作

前提:

首先需要啟動 zookeeper

啟動 kafka

[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties 

后臺啟動kafka

[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties

創(chuàng)建一個 Topic

[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --replication-factor 1 --partitions 1 --topic darianTest

Created topic "darianTest".

Replication-factor 表示該topic需要在不同的broker中保存幾份,這里設置成1,表示在兩個broker中保存兩份

Partitions 分區(qū)數(shù)

查看所有的 Topic

[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper localhost:2181
darianTest

查看 topic 屬性

[root@Darian1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --topic darianTest

Topic:darianTest    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: darianTest   Partition: 0    Leader: 1   Replicas: 1 Isr: 1

創(chuàng)建一個控制臺發(fā)送端

Broker-list 不是 zookeeper

[root@Darian1 bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic darianTest
>hello
>helloworld
>dsfsdf
>sdfsd
>

創(chuàng)建一個控制臺接收端

[root@Darian1 bin]# sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic darianTest --from-beginning

hello
helloworld
dsfsdf
sdfsd

安裝集群環(huán)境

zookeeper 能夠完成 kafka 的集群

修改 server.properties 配置

  1. 修改server.properties. broker.id=0 / 1

  2. 修改server.properties 修改成本機IP

advertised.listeners=PLAINTEXT://192.168.11.153:9092

當 Kafka broker 啟動時,它會在ZK上注冊自己的IP和端口號,客戶端就通過這個IP和端口號來連接。Kafka 的 listeners 如果需要配置集群的話,需要把自己機器的 IP 配置上去。

192.168.40.128
[root@Darian1 bin]# vim ../config/server.properties 


# 在集群里邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id=1 / 2 / 3

zookeeper.connect=192.168.40.128:2181

listeners=PLAINTEXT://192.168.40.128:9092
192.168.40.129
# 在集群里邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id= 2
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.129:9092
192.168.40.131
# 在集群里邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id= 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.131:9092
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties

Kafka 啟動不好的話,多啟動兩次。

啟動成功以后,看 zookeeper 節(jié)點的變化:

[root@Darian1 bin]# sh zkCli.sh 

[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 12] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 15] get /controller
{"version":1,"brokerid":2,"timestamp":"1548340180496"}
cZxid = 0x100000147
ctime = Thu Jan 24 22:29:40 CST 2019
mZxid = 0x100000147
mtime = Thu Jan 24 22:29:40 CST 2019
pZxid = 0x100000147
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1687fbe81fe000a
dataLength = 54
numChildren = 0
每一個節(jié)點都有它相應的含義。我們的寫請求會落到 `Master` 節(jié)點上,讀請求可以走其他節(jié)點去讀。他們都是熱備可以工作的。他的選舉規(guī)則是最小的節(jié)點也就是最早注冊的節(jié)點。

中間件都是成熟的商業(yè)化的東西。

JAVA API 的使用

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

KafKaConsumer

public class KafkaConsumerDemo extends Thread {
    private final KafkaConsumer kafkaConsumer;

    public KafkaConsumerDemo(String topic) {
        // 設置屬性
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                       "192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
        // 消費組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 間隔時間
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.StringDeserializer");
        // 從最早的開始
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(topic));
    }

    @Override
    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); // 超時時間
            consumerRecords.forEach(consumerRecord -> {
                System.err.println("[message receive]:" + consumerRecord.value());
                kafkaConsumer.commitSync();
            });
        }
    }

    public static void main(String[] args) {
        new KafkaConsumerDemo("test").start();
    }
}

KafkaProducer

public class KafkaProducerDemo extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final boolean isAsync;

    public KafkaProducerDemo(String topic, boolean isAsync) {
        // 設置屬性
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                       "192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    @Override
    public void run() {
        int num = 0;
        while (num < 50) {
            String message = "message_" + num;
            System.err.println("[producer message]:" + message);
            if (isAsync) { // 異步發(fā)送
                producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
                    if (recordMetadata != null) {
                        System.err.println("[async-offset]:" + recordMetadata.offset() +
                                           "->[partition]:" + recordMetadata.partition());
                    }
                });
            } else {  // 同步發(fā)送 future / callable
                try {
                    RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, message)).get();
                    System.err.println("[sync-offset]:" + recordMetadata.offset() +
                                       "->[partition]:" + recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            num++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new KafkaProducerDemo("test", true).start();
    }
}
  • Kafka 有兩種 API

    • 封裝得比較好的
    • 非常靈活的
  • Kafka 的配置信息非常地詳細

JavaGuide_Kafka_通信1_Kafka備注截圖.png

配置信息分析

發(fā)送端的可選配置信息分析

ackacks

acks 配置表示 producer 發(fā)送消息到 broker 上以后的確認值。有三個可選項

  • 0:表示 producer 不需要等待 broker 的消息確認。這個選項時延最小但同時風險最大(因為當 server 宕機時,數(shù)據(jù)將會丟失)。
  • 1:表示 producer 只需要獲得 kafka 集群中的 leader 節(jié)點確認即可,這個選擇時延較小同時確保了 leader 節(jié)點確認接收成功。
  • all(-1):需要 ISR 中所有的 Replica 給予接收確認(需要集群中的所有節(jié)點確認),速度最慢,安全性最高,但是由于 ISR 可能會縮小到僅包含一個 Replica ,所以設置參數(shù)為 all 并不能一定避免數(shù)據(jù)丟失。
batch.size
生產(chǎn)者發(fā)送多個消息到 `broker` 上的同一個 **分區(qū)** 時,為了減少網(wǎng)絡請求帶來的性能開銷,通過批量的方式來提交消息,可以通過這個參數(shù)來控制批量提交的字節(jié)數(shù)大小,默認大小是 16384byte ,  也就是 16kb ,意味著當一批消息大小達到指定的 `batch.size` 的時候會統(tǒng)一發(fā)送。
linger.ms
**Producer** 默認會把兩次發(fā)送時間間隔內(nèi)收集到的所有 Requests 進行一次聚合然后再發(fā)送,以此提高吞吐量,而 `linger.ms` 就是為每次發(fā)送到 broker 的請求增加一些 delay,以此來聚合更多的 Message 請求。 這個有點像TCP里面的 Nagle 算法,在 TCP 協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了 Nagle 算法,也就是基于小包的等-停協(xié)議。
  • batch.sizelinger.ms 這兩個參數(shù)是kafka性能優(yōu)化的關鍵參數(shù),很多同學會發(fā)現(xiàn) batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個都配置了,那么怎么工作的呢?實際上,當二者都配置的時候,只要滿足其中一個要求,就會發(fā)送請求到 broker 上。
max.request.size
設置請求的數(shù)據(jù)的最大字節(jié)數(shù),為了防止發(fā)生較大的數(shù)據(jù)包影響到吞吐量,默認值為1MB。

還有重試次數(shù)等

消息的同步發(fā)送和異步發(fā)送:

Kafka 1.0 以后,默認的發(fā)送方式都是異步發(fā)送消息。

我們的消息通過 **Kafka producer** 去 `send` 以后,這個消息實際上是放到一個后臺的發(fā)送隊列里邊,然后通過一個后臺的線程,通過不斷地從后代的發(fā)送隊列中不斷地取出消息進行發(fā)送。發(fā)送以后,進行調(diào)用回調(diào)方法。就是 `Callback` 方法進行執(zhí)行。

同步發(fā)送就是用的 future  的時候去阻塞。獲得發(fā)送信息是進行阻塞。

消費端的可選配置分析

group.id

 `consumer group` 是 kafka 提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例 ( consumer instance ) ,它們共享一個公共的 ID ,即 group ID 。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題 ( subscribed topics ) 的所有分區(qū) ( partition ) 。當然,每個分區(qū)只能由同一個消費組內(nèi)的一個 consumer 來消費。如下圖所示,分別有三個消費者,屬于兩個不同的 group ,那么對于 firstTopic 這個 topic 來說,這兩個組的消費者都能同時消費這個 topic 中的消息,對于此時的架構(gòu)來說,這個 firstTopic 就類似于 ActiveMQ 中的 topic 概念。如右圖所示,如果3個消費者都屬于同一個 group ,那么此時 firstTopic 就是一個 Queue 的概念。
  • 不同的組可以同時消費同一條消息。
JavaGuide_Kafka_通信1_不同的組可以消費同一條消息.png
  • 同一個組只能有一個 Consumer 能夠拿到消息
JavaGuide_Kafka_通信1_同一個組只能有一個消費者消費消息.png

enable.auto.commit

ENABLE_AUTO_COMMIT_CONFIG 可以設置成 false 。

消費者消費消息以后自動提交,只有當消息提交以后,該消息才不會被再次接收到,還可以配合 `auto.commit.interval.ms` 控制自動提交的頻率。 每一段時間內(nèi)的消息批量地確認提交。

當然,我們也可以通過 `consumer.commitSync()` 的方式實現(xiàn)手動提交
{
    // 異步 Commit
    kafkaConsumer.commitAsync();
    // 同步 commit
    kafkaConsumer.commitSync();
    // 
}

// 可以選擇不同的 Topic 
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
    // ...
}

// 可以選擇不同的回調(diào)接口 callback 接口
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    // ...
}

auto.offset.reset

這個參數(shù)是針對新的 `groupid` 中的消費者而言的,當有新 `groupid` 的消費者來消費指定的 `topic` 時,對于該參數(shù)的配置,會有不同的語義
  • auto.offset.reset=latest 情況下,新的消費者將會從其他消費者最后消費的 offset 處開始消費 Topic 下的消息。
  • auto.offset.reset= earliest 情況下,新的消費者會從該 topic 最早的消息開始消費,(對于新的 groupId 來說,重置 offset )。
  • auto.offset.reset=none 情況下,新的消費者加入以后,由于之前不存在offset,則會直接拋出異常。

沒有消費組,就會拋出異常。

max.poll.records

此設置限制每次調(diào)用 poll 返回的消息數(shù),這樣可以更容易地預測每次 poll 間隔要處理的最大值。通過調(diào)整此值,可以減少 poll 間隔。

Kafka 工具:

JavaGuide_Kafka_通信1_Kafka_Client連接工具.png
消息都會寫到磁盤上,只要磁盤上這條消息可以存在,那么我換不同的 `GroupId` ,那么就可以一直去消費這條消息。

Spring -kafka 集成

Spring 整合 Kafka 實現(xiàn)注冊成功以后去設置抽獎名額(贈送一個抽獎機會)。
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
# 定義應用的名稱
spring.application.name=spring-cloud-stream-kafka
# 配置 Web 服務端口
server.port=8080
# 失效管理安全
management.security.enabled=false
# 配置需要的 kafka 主題
kafka.topic.test=test
kafka.topic.darian=darian

# 配置 kafka 的 zookeeper 的節(jié)點
#spring.cloud.stream.kafka.binder.zk-nodes=192.168.136.128:2181
spring.cloud.stream.kafka.streams.binder.configuration.zk-nodes=192.168.40.128:2181
# 配置 Spring Kafka 配置信息
spring.kafka.bootstrap-servers=192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092
# Kafka 生產(chǎn)者配置
spring.kafka.producer.bootstrap-servers=192.168.40.128:9092
spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka 消費者配置
spring.kafka.consumer.group-id=darian-1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

KafkaConsumerListener
@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "${kafka.topic.test}")
    public void onMessageTest(String message) {
        System.out.println("Kafka test 消費者監(jiān)聽器,接收到消息:" + message);
    }

    @KafkaListener(topics = "${kafka.topic.darian}")
    public void onMessageDarian(String message) {
        System.out.println("Kafka darian消費者監(jiān)聽器,接收到消息:" + message);
    }

}
KafkaProducerController
@RequiredArgsConstructor
@RestController
public class KafkaProducerController {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/message/send/{topic}")
    public TopicMessage sendMessage(
            @PathVariable String topic,
            @RequestParam String message) {
        if ((!"darian".equals(topic)) && (!"test".equals(topic))) {
            return new TopicMessage(topic, message, false, "topic【" + topic + "】 不存在");
        }
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);

        return new TopicMessage(topic, message, true, "success");
    }


    @AllArgsConstructor
    @Data
    public static class TopicMessage {
        private String topic;
        private String message;
        private boolean send;
        private String errorMessage;
    }

}

JavaGuide_Kafka_通信1_Kafka消息發(fā)送截圖.png
JavaGuide_Kafka_通信1_Kafka發(fā)送消息Topic不存在.png

來源于: https://javaguide.net

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容