SpringBoot 集成 Kafka 教程

版本

  • SpringBoot:2.7.2

pom.xml

集成 kafka 相關依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yaml

spring:
  kafka:
    template:
      default-topic: myTest
    bootstrap-servers: <Kafka地址>
    jaas:
      enabled: true
      loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: <Username>
        password: <Password>
    consumer:
      ssl:
        truststoreLocation: file:/Users/cary/Documents/java/SpringBootTutorial/New/kafka-demo/src/main/resources/only.4096.client.truststore.jks
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
      group-id: myTestGroup
      max-poll-records: 2
    producer:
      ssl:
        truststoreLocation: file:/Users/cary/Documents/java/SpringBootTutorial/New/kafka-demo/src/main/resources/only.4096.client.truststore.jks
      retries: 3
      acks: 1
      compression-type: lz4
      buffer-memory: 33554432
      batch-size: 51200
      properties:
        send.buffer.bytes: 262144
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:

其中,default-topic 表示 topic 默認名字,bootstrap-servers 表示 kafka 連接地址,usernamepassword 是訪問 kafka 的賬號信息(注:作者使用的是阿里云 kafka 實例,可在阿里云后臺查看該信息),truststoreLocation 表示證書所在路徑。

Receiver 監(jiān)聽

@Component
public class Receiver {

    private Logger log = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = { "myTest" })
    public void receiveMessage(ConsumerRecord<String, String> record) {
        log.info("Receive Message, key = {}, value = {}", record.key(), record.value());
    }
}

Sender 發(fā)送

@Component
public class Sender {

    private Logger log = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {
        final String key = "my_msg";

        this.template.send("myTest", key, msg);
        log.info("send message, key: {}, dada: {}", key, msg);
    }
}

使用

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private Sender sender;

    @GetMapping("/send")
    public String send() {
        sender.send("Hello world from xxxx");

        return "Send success";
    }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容