三、Spring Cloud Stream+Kafka重復(fù)消費(fèi)問題

在 [二、Spring Cloud Stream整合Kafka](http://www.itdecent.cn/p/eed59e87e45a)
的基礎(chǔ)上再創(chuàng)建一個(gè)module , kafka-consumer2 (創(chuàng)建過程可參考http://www.itdecent.cn/p/d7771682688b)
子級(jí) 生產(chǎn)者(kafka-producer) application.yml 更新后如下:
server:
  port: 8181

spring:
  application:
    name: kafka_producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中間件服務(wù)器
          zk-nodes: localhost:2181  #Zookeeper的節(jié)點(diǎn),如果集群,后面加,號(hào)分隔
          auto-create-topics: true  #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒創(chuàng)建就直接調(diào)用了。
          auto-add-partitions: true     # 當(dāng)partition-count設(shè)置的值超過原來設(shè)置的值,true=自動(dòng)創(chuàng)建分區(qū)
      bindings:
        stream-demo:                          #這里可以任意寫,消費(fèi)者應(yīng)與之一致
          destination: custom-message-topic   #這里可以任意寫,消費(fèi)者應(yīng)與之一致,消息發(fā)往的目的地
          content-type: application/json      #消息發(fā)送的格式,接收端不用指定格式,但是發(fā)送端要; 文本則為 text/plain
          producer:
            # 分區(qū)的數(shù)量(默認(rèn)為1)
            partition-count: 2
主要目的是設(shè)置分區(qū)的值

auto-add-partitions: true

producer:
  # 分區(qū)的數(shù)量(默認(rèn)為1)
   partition-count: 2
子級(jí) 消費(fèi)者(kafka-consumer) application.yml 更新后如下:
server:
  port: 8081

kafka:
  group: kafka_g1
spring:
  application:
    name: kafka_consumer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中間件服務(wù)器
          zk-nodes: localhost:2181  #Zookeeper的節(jié)點(diǎn),如果集群,后面加,號(hào)分隔
          auto-create-topics: true  #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒創(chuàng)建就直接調(diào)用了。
      bindings:
        stream-demo:                          #這里可以任意寫,生產(chǎn)者應(yīng)與之一致
          destination: custom-message-topic   #這里可以任意寫,生產(chǎn)者應(yīng)與之一致,消息發(fā)往的目的地
          content-type: application/json      #消息發(fā)送的格式,接收端不用指定格式,但是發(fā)送端要; 文本則為 text/plain
          group: ${kafka.group}

主要目的是分組

kafka:
  group: kafka_g1

group: ${kafka.group}
子級(jí) 消費(fèi)者(kafka-consumer2) application.yml 同kafka-consumer基本一致
server:
  port: 8082

kafka:
  group: kafka_g1
spring:
  application:
    name: kafka_consumer2
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中間件服務(wù)器
          zk-nodes: localhost:2181  #Zookeeper的節(jié)點(diǎn),如果集群,后面加,號(hào)分隔
          auto-create-topics: true  #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒創(chuàng)建就直接調(diào)用了。
      bindings:
        stream-demo:                          #這里可以任意寫,生產(chǎn)者應(yīng)與之一致
          destination: custom-message-topic   #這里可以任意寫,生產(chǎn)者應(yīng)與之一致,消息發(fā)往的目的地
          content-type: application/json      #消息發(fā)送的格式,接收端不用指定格式,但是發(fā)送端要; 文本則為 text/plain
          group: ${kafka.group}

測(cè)試  http://localhost:8181/produce
消費(fèi)者kafka-consumer 打印如下:
2022-04-07 15:40:56.561  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息26 
2022-04-07 15:40:56.562  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息27 
2022-04-07 15:40:56.564  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息28 
2022-04-07 15:40:56.565  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息29 
2022-04-07 15:40:56.565  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息30 
2022-04-07 15:40:56.566  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息31 
2022-04-07 15:40:56.566  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息32 
2022-04-07 15:40:56.580  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息46 
2022-04-07 15:40:56.580  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息47 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息49 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息55 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息56 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息59 
2022-04-07 15:40:56.582  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息60 
2022-04-07 15:40:56.582  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息64 
2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息68 
2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息74 
2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息76 
2022-04-07 15:40:56.584  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息77 
2022-04-07 15:40:56.587  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息84 
2022-04-07 15:40:56.588  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息86 
2022-04-07 15:40:56.588  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息87 
2022-04-07 15:40:56.589  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息93 

消費(fèi)者kafka-consumer2 打印如下:
2022-04-07 15:40:56.599  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息0 
2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息1 
2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息2 
2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息3 
2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息4 
2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息5 
2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息6 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息7 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息8 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息9 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息10 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息11 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息12 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息13 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息14 
2022-04-07 15:40:56.604  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息15 
2022-04-07 15:40:56.604  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息16 
2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息17 
2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息18 
2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息19 
2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息20 
2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息21 
2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息22 
2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息23 
2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息24 
2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息25 
2022-04-07 15:40:56.627  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息33 
2022-04-07 15:40:56.628  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息34 
2022-04-07 15:40:56.628  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息35 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息36 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息37 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息38 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息39 
2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息40 
2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息41 
2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息42 
2022-04-07 15:40:56.631  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息48 
2022-04-07 15:40:56.632  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息53 
2022-04-07 15:40:56.632  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息54 
2022-04-07 15:40:56.633  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息57 
2022-04-07 15:40:56.633  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息58 
2022-04-07 15:40:56.634  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息63 
2022-04-07 15:40:56.634  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息65 
2022-04-07 15:40:56.635  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息66 
2022-04-07 15:40:56.635  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息67 
2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息70 
2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息73 
2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息75 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息78 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息79 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息82 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息43 
2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息44 
2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息45 
2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息50 
2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息51 
2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息52 
2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息61 
2022-04-07 15:40:56.640  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息62 
2022-04-07 15:40:56.641  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息69 
2022-04-07 15:40:56.641  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息71 
2022-04-07 15:40:56.642  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息72 
2022-04-07 15:40:56.642  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息80 
2022-04-07 15:40:56.643  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息81 
2022-04-07 15:40:56.656  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息96 
2022-04-07 15:40:56.657  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息97 
2022-04-07 15:40:56.657  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息98 
2022-04-07 15:40:56.658  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息83 
2022-04-07 15:40:56.659  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息85 
2022-04-07 15:40:56.659  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息88 
2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息89 
2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息90 
2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息91 
2022-04-07 15:40:56.661  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息92 
2022-04-07 15:40:56.661  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息94 
2022-04-07 15:40:56.662  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息95 
2022-04-07 15:40:56.662  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息99 


遇到問題2:
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: 
The number of expected partitions was: 2, but 1 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`

解決:
生產(chǎn)者yml文件中 spring.cloud.stream.kafka.binder 后面加入
auto-add-partitions: true     
# 當(dāng)partition-count設(shè)置的值超過原來設(shè)置的值,true=自動(dòng)創(chuàng)建分區(qū)
# partition-count: 2 這里的值超過原來設(shè)置的值,如果不是自動(dòng)創(chuàng)建分區(qū)會(huì)拋上面的異常
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容