Kafka從入門到進階

1. Apache Kafka是一個分布式流平臺

1.1 流平臺有三個關(guān)鍵功能:

發(fā)布和訂閱流記錄,類似于一個消息隊列或企業(yè)消息系統(tǒng)

以一種容錯的持久方式存儲記錄流

在流記錄生成的時候就處理它們

1.2 Kafka通常用于兩大類應(yīng)用:

如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費分享給大家。

構(gòu)建實時流數(shù)據(jù)管道,在系統(tǒng)或應(yīng)用程序之間可靠地獲取數(shù)據(jù)

構(gòu)建對數(shù)據(jù)流進行轉(zhuǎn)換或輸出的實時流媒體應(yīng)用程序

1.3 有幾個特別重要的概念:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.

The Kafka cluster stores streams of records in categories called topics.

Each record consists of a key, a value, and a timestamp.

Kafka作為集群運行在一個或多個可以跨多個數(shù)據(jù)中心的服務(wù)器上

從這句話表達了三個意思:

Kafka是以集群方式運行的

集群中可以只有一臺服務(wù)器,也有可能有多臺服務(wù)器。也就是說,一臺服務(wù)器也是一個集群,多臺服務(wù)器也可以組成一個集群

這些服務(wù)器可以跨多個數(shù)據(jù)中心

Kafka集群按分類存儲流記錄,這個分類叫做主題

這句話表達了以下幾個信息:

流記錄是分類存儲的,也就說記錄是歸類的

我們稱這種分類為主題

簡單地來講,記錄是按主題劃分歸類存儲的

每個記錄由一個鍵、一個值和一個時間戳組成

1.4 Kafka有四個核心API:

Producer API?:允許應(yīng)用發(fā)布一條流記錄到一個或多個主題

Consumer API?:允許應(yīng)用訂閱一個或多個主題,并處理流記錄

Streams API?:允許應(yīng)用作為一個流處理器,從一個或多個主題那里消費輸入流,并將輸出流輸出到一個或多個輸出主題,從而有效地講輸入流轉(zhuǎn)換為輸出流

Connector API?:允許將主題連接到已經(jīng)存在的應(yīng)用或者數(shù)據(jù)系統(tǒng),以構(gòu)建并允許可重用的生產(chǎn)者或消費者。例如,一個關(guān)系型數(shù)據(jù)庫的連接器可能捕獲到一張表的每一次變更

(畫外音:我理解這四個核心API其實就是:發(fā)布、訂閱、轉(zhuǎn)換處理、從第三方采集數(shù)據(jù)。)

在Kafka中,客戶端和服務(wù)器之間的通信是使用簡單的、高性能的、與語言無關(guān)的TCP協(xié)議完成的。

2. Topics and Logs(主題和日志)

一個topic是一個分類,或者說是記錄被發(fā)布的時候的一個名字(畫外音:可以理解為記錄要被發(fā)到哪兒去)。

在Kafka中,topic總是有多個訂閱者,因此,一個topic可能有0個,1個或多個訂閱該數(shù)據(jù)的消費者。

對于每個主題,Kafka集群維護一個分區(qū)日志,如下圖所示:

每個分區(qū)都是一個有序的、不可變的記錄序列,而且記錄會不斷的被追加,一條記錄就是一個結(jié)構(gòu)化的提交日志(a structured commit log)。

分區(qū)中的每條記錄都被分配了一個連續(xù)的id號,這個id號被叫做offset(偏移量),這個偏移量唯一的標識出分區(qū)中的每條記錄。(PS:如果把分區(qū)比作數(shù)據(jù)庫表的話,那么偏移量就是主鍵)

Kafka集群持久化所有已發(fā)布的記錄,無論它們有沒有被消費,記錄被保留的時間是可以配置的。例如,如果保留策略被設(shè)置為兩天,那么在記錄發(fā)布后的兩天內(nèi),可以使用它,之后將其丟棄以釋放空間。在對數(shù)據(jù)大小方面,Kafka的性能是高效的,恒定常量級的,因此長時間存儲數(shù)據(jù)不是問題。

事實上,唯一維護在每個消費者上的元數(shù)據(jù)是消費者在日志中的位置或者叫偏移量。偏移量是由消費者控制的:通常消費者在讀取記錄的時候會線性的增加它的偏移量,但是,事實上,由于位置(偏移量)是由消費者控制的,所有它可以按任意它喜歡的順序消費記錄。例如:一個消費者可以重置到一個較舊的偏移量來重新處理之前已經(jīng)處理過的數(shù)據(jù),或者跳轉(zhuǎn)到最近的記錄并從“現(xiàn)在”開始消費。

這種特性意味著消費者非常廉價————他們可以來來去去的消息而不會對集群或者其它消費者造成太大影響。

日志中的分區(qū)有幾個用途。首先,它們允許日志的規(guī)模超出單個服務(wù)器的大小。每個獨立分區(qū)都必須與宿主的服務(wù)器相匹配,但一個主題可能有多個分區(qū),所以它可以處理任意數(shù)量的數(shù)據(jù)。第二,它們作為并行的單位——稍后再進一步。

畫外音:簡單地來說,日志分區(qū)的作用有兩個:一、日志的規(guī)模不再受限于單個服務(wù)器;二、分區(qū)意味著可以并行。

什么意思呢?主題建立在集群之上,每個主題維護了一個分區(qū)日志,顧名思義,日志是分區(qū)的;每個分區(qū)所在的服務(wù)器的資源(比如:CPU、內(nèi)存、帶寬、磁盤等)是有限的,如果不分區(qū)(可以理解為等同于只有一個)的話,必然受限于這個分區(qū)所在的服務(wù)器,那么多個分區(qū)的話就不一樣了,就突破了這種限制,服務(wù)器可以隨便加,分區(qū)也可以隨便加。

3. Distribution(分布)

日志的分區(qū)分布在集群中的服務(wù)器上,每個服務(wù)器處理數(shù)據(jù),并且分區(qū)請求是共享的。每個分區(qū)被復(fù)制到多個服務(wù)器上以實現(xiàn)容錯,到底復(fù)制到多少個服務(wù)器上是可以配置的。

Each partition is replicated across a configurable number of servers for fault tolerance.

每個分區(qū)都有一個服務(wù)器充當“leader”角色,并且有0個或者多個服務(wù)器作為“followers”。leader處理對這個分區(qū)的所有讀和寫請求,而followers被動的從leader那里復(fù)制數(shù)據(jù)。如果leader失敗,followers中的其中一個會自動變成新的leader。每個服務(wù)器充當一些分區(qū)的“leader”的同時也是其它分區(qū)的“follower”,因此在整個集群中負載是均衡的。

也就是說,每個服務(wù)器既是“leader”也是“follower”。我們知道一個主題可能有多個分區(qū),一個分區(qū)可能在一個服務(wù)器上也可能跨多個服務(wù)器,然而這并不以為著一臺服務(wù)器上只有一個分區(qū),是可能有多個分區(qū)的。每個分區(qū)中有一個服務(wù)器充當“leader”,其余是“follower”。leader負責(zé)處理這個它作為leader所負責(zé)的分區(qū)的所有讀寫請求,而該分區(qū)中的follow只是被動復(fù)制leader的數(shù)據(jù)。這個有點兒像HDFS中的副本機制。例如:分區(qū)-1有服務(wù)器A和B組成,A是leader,B是follower,有請求要往分區(qū)-1中寫數(shù)據(jù)的時候就由A處理,然后A把剛才寫的數(shù)據(jù)同步給B,這樣的話正常請求相當于A和B的數(shù)據(jù)是一樣的,都有分區(qū)-1的全部數(shù)據(jù),如果A宕機了,B成為leader,接替A繼續(xù)處理對分區(qū)-1的讀寫請求。

需要注意的是,分區(qū)是一個虛擬的概念,是一個邏輯單元。

4. Producers(生產(chǎn)者)

如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費分享給大家。

生產(chǎn)者發(fā)布數(shù)據(jù)到它們選擇的主題中。生產(chǎn)者負責(zé)選擇將記錄投遞到哪個主題的哪個分區(qū)中。要做這件事情,可以簡單地用循環(huán)方式以到達負載均衡,或者根據(jù)一些語義分區(qū)函數(shù)(比如:基于記錄中的某些key)

5. Consumers(消費者)

消費者用一個消費者組名來標識它們自己(PS:相當于給自己貼一個標簽,標簽的名字是組名,以表明自己屬于哪個組),并且每一條發(fā)布到主題中的記錄只會投遞給每個訂閱的消費者組中的其中一個消費者實例。消費者實例可能是單獨的進程或者在單獨的機器上。

如果所有的消費者實例都使用相同的消費者組,那么記錄將會在這些消費者之間有效的負載均衡。

如果所有的消費者實例都使用不同的消費者組,那么每條記錄將會廣播給所有的消費者進程。

上圖中其實那個Kafka Cluster換成Topic會更準確一些

一個Kafka集群有2個服務(wù)器,4個分區(qū)(P0-P3),有兩個消費者組。組A中有2個消費者實例,組B中有4個消費者實例。

通常我們會發(fā)現(xiàn),主題不會有太多的消費者組,每個消費者組是一個“邏輯訂閱者”(以消費者組的名義訂閱主題,而非以消費者實例的名義去訂閱)。每個組由許多消費者實例組成,以實現(xiàn)可擴展性和容錯。這仍然是發(fā)布/訂閱,只不過訂閱者是一個消費者群體,而非單個進程。

在Kafka中,這種消費方式是通過用日志中的分區(qū)除以使用者實例來實現(xiàn)的,這樣可以保證在任意時刻每個消費者都是排它的消費,即“公平共享”。Kafka協(xié)議動態(tài)的處理維護組中的成員。如果有心的實例加入到組中,它們將從組中的其它成員那里接管一些分區(qū);如果組中有一個實例死了,那么它的分區(qū)將會被分給其它實例。

(畫外音:什么意思呢?舉個例子,在上面的圖中,4個分區(qū),組A有2個消費者,組B有4個消費者,那么對A來講組中的每個消費者負責(zé)4/2=2個分區(qū),對組B來說組中的每個消費者負責(zé)4/4=1個分區(qū),而且同一時間消息只能被組中的一個實例消費。如果組中的成員數(shù)量有變化,則重新分配。)

Kafka只提供分區(qū)下的記錄的總的順序,而不提供主題下不同分區(qū)的總的順序。每個分區(qū)結(jié)合按key劃分數(shù)據(jù)的能力排序?qū)Υ蠖鄶?shù)應(yīng)用來說是足夠的。然而,如果你需要主題下總的記錄順序,你可以只使用一個分區(qū),這樣做的做的話就意味著每個消費者組中只能有一個消費者實例。

6. 保證

在一個高級別的Kafka給出下列保證:

被一個生產(chǎn)者發(fā)送到指定主題分區(qū)的消息將會按照它們被發(fā)送的順序追加到分區(qū)中。也就是說,如果記錄M1和M2是被同一個生產(chǎn)者發(fā)送到同一個分區(qū)的,而且M1是先發(fā)送的,M2是后發(fā)送的,那么在分區(qū)中M1的偏移量一定比M2小,并且M1出現(xiàn)在日志中的位置更靠前。

一個消費者看到記錄的順序和它們在日志中存儲的順序是一樣的。

對于一個副本因子是N的主題,我們可以容忍最多N-1個服務(wù)器失敗,而不會丟失已經(jīng)提交給日志的任何記錄。

7. Spring Kafka

Spring提供了一個“模板”作為發(fā)送消息的高級抽象。它也通過使用@KafkaListener注釋和“監(jiān)聽器容器”提供對消息驅(qū)動POJOs的支持。這些庫促進了依賴注入和聲明式的使用。

7.1 純Java方式

1 package com.cjs.example.quickstart;

2

3 import org.apache.kafka.clients.consumer.ConsumerConfig;

4 import org.apache.kafka.clients.consumer.ConsumerRecord;

5 import org.apache.kafka.clients.producer.ProducerConfig;

6 import org.apache.kafka.common.serialization.IntegerDeserializer;

7 import org.apache.kafka.common.serialization.IntegerSerializer;

8 import org.apache.kafka.common.serialization.StringDeserializer;

9 import org.apache.kafka.common.serialization.StringSerializer;

10 import org.springframework.kafka.core.*;

11 import org.springframework.kafka.listener.KafkaMessageListenerContainer;

12 import org.springframework.kafka.listener.MessageListener;

13 import org.springframework.kafka.listener.config.ContainerProperties;

14

15 import java.util.HashMap;

16 import java.util.Map;

17

18 public class PureJavaDemo {

19

20 /**

21 * 生產(chǎn)者配置

22 */

23 private static Map senderProps() {

24 Map props = new HashMap<>();

25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");

26 props.put(ProducerConfig.RETRIES_CONFIG, 0);

27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

30 return props;

31 }

32

33 /**

34 * 消費者配置

35 */

36 private static Map consumerProps() {

37 Map props = new HashMap<>();

38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093");

39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello");

40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

45 return props;

46 }

47

48 /**

49 * 發(fā)送模板配置

50 */

51 private static KafkaTemplate createTemplate() {

52 Map senderProps = senderProps();

53 ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProps);

54 KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);

55 return kafkaTemplate;

56 }

57

58 /**

59 * 消息監(jiān)聽器容器配置

60 */

61 private static KafkaMessageListenerContainer createContainer() {

62 Map consumerProps = consumerProps();

63 ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);

64 ContainerProperties containerProperties = new ContainerProperties("test");

65 KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

66 return container;

67 }

68

69

70 public static void main(String[] args) throws InterruptedException {

71 String topic1 = "test"; // 主題

72

73 KafkaMessageListenerContainer container = createContainer();

74 ContainerProperties containerProperties = container.getContainerProperties();

75 containerProperties.setMessageListener(new MessageListener() {

76 @Override

77 public void onMessage(ConsumerRecord record) {

78 System.out.println("Received: " + record);

79 }

80 });

81 container.setBeanName("testAuto");

82

83 container.start();

84

85 KafkaTemplate kafkaTemplate = createTemplate();

86 kafkaTemplate.setDefaultTopic(topic1);

87

88 kafkaTemplate.sendDefault(0, "foo");

89 kafkaTemplate.sendDefault(2, "bar");

90 kafkaTemplate.sendDefault(0, "baz");

91 kafkaTemplate.sendDefault(2, "qux");

92

93 kafkaTemplate.flush();

94 container.stop();

95

96 System.out.println("結(jié)束");

97 }

98

99 }

運行結(jié)果:

Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo)

Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar)

Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz)

Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)

7.2 更簡單一點兒,用SpringBoot

1 package com.cjs.example.quickstart;

2

3 import org.apache.kafka.clients.consumer.ConsumerRecord;

4 import org.springframework.beans.factory.annotation.Autowired;

5 import org.springframework.boot.CommandLineRunner;

6 import org.springframework.context.annotation.Bean;

7 import org.springframework.context.annotation.Configuration;

8 import org.springframework.kafka.annotation.KafkaListener;

9 import org.springframework.kafka.core.KafkaTemplate;

10

11 @Configuration

12 public class JavaConfigurationDemo {

13

14 @KafkaListener(topics = "test")

15 public void listen(ConsumerRecord record) {

16 System.out.println("收到消息: " + record);

17 }

18

19 @Bean

20 public CommandLineRunner commandLineRunner() {

21 return new MyRunner();

22 }

23

24 class MyRunner implements CommandLineRunner {

25

26 @Autowired

27 private KafkaTemplate kafkaTemplate;

28

29 @Override

30 public void run(String... args) throws Exception {

31 kafkaTemplate.send("test", "foo1");

32 kafkaTemplate.send("test", "foo2");

33 kafkaTemplate.send("test", "foo3");

34 kafkaTemplate.send("test", "foo4");

35 }

36 }

37 }

application.properties配置

spring.kafka.bootstrap-servers=192.168.101.5:9092

spring.kafka.consumer.group-id=world

8. 生產(chǎn)者

1 package com.cjs.example.send;

2

3 import org.apache.kafka.clients.producer.ProducerConfig;

4 import org.apache.kafka.common.serialization.IntegerSerializer;

5 import org.apache.kafka.common.serialization.StringSerializer;

6 import org.springframework.context.annotation.Bean;

7 import org.springframework.context.annotation.Configuration;

8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;

9 import org.springframework.kafka.core.KafkaTemplate;

10 import org.springframework.kafka.core.ProducerFactory;

11

12 import java.util.HashMap;

13 import java.util.Map;

14

15 @Configuration

16 public class Config {

17

18 public Map producerConfigs() {

19 Map props = new HashMap<>();

20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");

21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

23 return props;

24 }

25

26 public ProducerFactory producerFactory() {

27 return new DefaultKafkaProducerFactory<>(producerConfigs());

28 }

29

30 @Bean

31 public KafkaTemplate kafkaTemplate() {

32 return new KafkaTemplate(producerFactory());

33 }

34

35 }

1 package com.cjs.example.send;

2

3 import org.springframework.beans.factory.annotation.Autowired;

4 import org.springframework.boot.CommandLineRunner;

5 import org.springframework.kafka.core.KafkaTemplate;

6 import org.springframework.kafka.support.SendResult;

7 import org.springframework.stereotype.Component;

8 import org.springframework.util.concurrent.ListenableFuture;

9 import org.springframework.util.concurrent.ListenableFutureCallback;

10

11 @Component

12 public class MyCommandLineRunner implements CommandLineRunner {

13

14 @Autowired

15 private KafkaTemplate kafkaTemplate;

16

17 public void sendTo(Integer key, String value) {

18 ListenableFuture> listenableFuture = kafkaTemplate.send("test", key, value);

19 listenableFuture.addCallback(new ListenableFutureCallback>() {

20 @Override

21 public void onFailure(Throwable throwable) {

22 System.out.println("發(fā)送失敗啦");

23 throwable.printStackTrace();

24 }

25

26 @Override

27 public void onSuccess(SendResult sendResult) {

28 System.out.println("發(fā)送成功," + sendResult);

29 }

30 });

31 }

32

33 @Override

34 public void run(String... args) throws Exception {

35 sendTo(1, "aaa");

36 sendTo(2, "bbb");

37 sendTo(3, "ccc");

38 }

39

40

41 }

運行結(jié)果:

發(fā)送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37]

發(fā)送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38]

發(fā)送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]

9. 消費者@KafkaListener

1 package com.cjs.example.receive;

2

3 import org.apache.kafka.clients.consumer.ConsumerConfig;

4 import org.apache.kafka.clients.consumer.ConsumerRecord;

5 import org.apache.kafka.common.serialization.IntegerDeserializer;

6 import org.apache.kafka.common.serialization.StringDeserializer;

7 import org.springframework.context.annotation.Bean;

8 import org.springframework.context.annotation.Configuration;

9 import org.springframework.kafka.annotation.KafkaListener;

10 import org.springframework.kafka.annotation.TopicPartition;

11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

12 import org.springframework.kafka.config.KafkaListenerContainerFactory;

13 import org.springframework.kafka.core.ConsumerFactory;

14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

15 import org.springframework.kafka.listener.AbstractMessageListenerContainer;

16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

17 import org.springframework.kafka.listener.config.ContainerProperties;

18 import org.springframework.kafka.support.Acknowledgment;

19 import org.springframework.kafka.support.KafkaHeaders;

20 import org.springframework.messaging.handler.annotation.Header;

21 import org.springframework.messaging.handler.annotation.Payload;

22

23 import java.util.HashMap;

24 import java.util.List;

25 import java.util.Map;

26

27 @Configuration

28 public class Config2 {

29

30 @Bean

31 public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

32 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

33 factory.setConsumerFactory(consumerFactory());

34 factory.setConcurrency(3);

35 ContainerProperties containerProperties = factory.getContainerProperties();

36 containerProperties.setPollTimeout(2000);

37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

38 return factory;

39 }

40

41 private ConsumerFactory consumerFactory() {

42 return new DefaultKafkaConsumerFactory<>(consumerProps());

43 }

44

45 private Map consumerProps() {

46 Map props = new HashMap<>();

47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092");

48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha");

49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

52 return props;

53 }

54

55

56 @KafkaListener(topics = "test")

57 public void listen(String data) {

58 System.out.println("listen 收到: " + data);

59 }

60

61

62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")

63 public void listen2(String data, Acknowledgment ack) {

64 System.out.println("listen2 收到: " + data);

65 ack.acknowledge();

66 }

67

68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")})

69 public void listen3(ConsumerRecord record) {

70 System.out.println("listen3 收到: " + record.value());

71 }

72

73

74 @KafkaListener(id = "xyz", topics = "test")

75 public void listen4(@Payload String foo,

76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

79 @Header(KafkaHeaders.OFFSET) List offsets) {

80 System.out.println("listen4 收到: ");

81 System.out.println(foo);

82 System.out.println(key);

83 System.out.println(partition);

84 System.out.println(topic);

85 System.out.println(offsets);

86 }

87

88 }

9.1 Committing Offsets

如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費分享給大家。

如果enable.auto.commit設(shè)置為true,那么kafka將自動提交offset。如果設(shè)置為false,則支持下列AckMode(確認模式)。

消費者poll()方法將返回一個或多個ConsumerRecords

RECORD :處理完記錄以后,當監(jiān)聽器返回時,提交offset

BATCH :當對poll()返回的所有記錄進行處理完以后,提交偏offset

TIME :當對poll()返回的所有記錄進行處理完以后,只要距離上一次提交已經(jīng)過了ackTime時間后就提交

COUNT :當poll()返回的所有記錄都被處理時,只要從上次提交以來收到了ackCount條記錄,就可以提交

COUNT_TIME :和TIME以及COUNT類似,只要這兩個中有一個為true,則提交

MANUAL :消息監(jiān)聽器負責(zé)調(diào)用Acknowledgment.acknowledge()方法,此后和BATCH是一樣的

MANUAL_IMMEDIATE :當監(jiān)聽器調(diào)用Acknowledgment.acknowledge()方法后立即提交

10. Spring Boot Kafka

10.1 application.properties

spring.kafka.bootstrap-servers=192.168.101.5:9092

10.2 發(fā)送消息

1 package com.cjs.example;

2

3 import org.springframework.beans.factory.annotation.Autowired;

4 import org.springframework.kafka.core.KafkaTemplate;

5 import org.springframework.web.bind.annotation.RequestMapping;

6 import org.springframework.web.bind.annotation.RestController;

7

8 import javax.annotation.Resource;

9

10 @RestController

11 @RequestMapping("/msg")

12 public class MessageController {

13

14 @Resource

15 private KafkaTemplate kafkaTemplate;

16

17 @RequestMapping("/send")

18 public String send(String topic, String key, String value) {

19 kafkaTemplate.send(topic, key, value);

20 return "ok";

21 }

22

23 }

10.3 接收消息

1 package com.cjs.example;

2

3 import org.apache.kafka.clients.consumer.ConsumerRecord;

4 import org.springframework.kafka.annotation.KafkaListener;

5 import org.springframework.kafka.annotation.KafkaListeners;

6 import org.springframework.stereotype.Component;

7

8 @Component

9 public class MessageListener {

10

11 /**

12 * 監(jiān)聽訂單消息

13 */

14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup")

15 public void listenToOrder(String data) {

16 System.out.println("收到訂單消息:" + data);

17 }

18

19 /**

20 * 監(jiān)聽會員消息

21 */

22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")

23 public void listenToMember(ConsumerRecord record) {

24 System.out.println("收到會員消息:" + record);

25 }

26

27 /**

28 * 監(jiān)聽所有消息

29 *

30 * 任意時刻,一條消息只會發(fā)給組中的一個消費者

31 *

32 * 消費者組中的成員數(shù)量不能超過分區(qū)數(shù),這里分區(qū)數(shù)是1,因此訂閱該主題的消費者組成員不能超過1

33 */

34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"),

35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")})

36 // public void listenToAll(String data) {

37 // System.out.println("啊啊啊");

38 // }

39

40 }

11. pom.xml

如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費分享給大家。


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.cjs.example

cjs-kafka-example

0.0.1-SNAPSHOT

jar

cjs-kafka-example

org.springframework.boot

spring-boot-starter-parent

2.0.4.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter-web

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容