SpringBoot整合Kafka實現(xiàn)發(fā)布訂閱
新建SpringBoot項目
基于JDK版本1.8,SpringBoota版本1.5.9.RELEASE
1、pom.xml中添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sunlong</groupId>
<artifactId>spring-boot-kafka-demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>spring-boot-kafka-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、配置application.yml文件
spring:
kafka: # 指定kafka 代理地址,可以多個
bootstrap-servers: http://kafkahost:9092
consumer: # 指定默認(rèn)消費者group id
group-id: myGroup
template: # 指定默認(rèn)topic id
default-topic: tsc_dsc_newMsg
listener: # 指定listener 容器中的線程數(shù),用于提高并發(fā)量
concurrency: 5
producer: # 每次批量發(fā)送消息的數(shù)量
batch-size: 1000
server:
port: 8888
3、模擬生產(chǎn)者Producer
package com.sunlong.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/*
* kafkaDemo
*
* @Author 孫龍
* @Date 2018/1/19
*/
@RestController
public class SampleController {
@Autowired
private KafkaTemplate<String, String> template;
@GetMapping("/send")
String send(String topic, String key, String data) {
template.send(topic, key, data);
return "success";
}
}
4、消息監(jiān)聽Consumer
可以建議多個消費者
package com.sunlong.listenner;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/*
* kafkaDemo
*
* @Author 孫龍
* @Date 2018/1/19
*/
@Component
public class Listenner {
@KafkaListener(topics = "topic1")
public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {
System.out.println("listenT1收到消息??! topic:>>> " + cr.topic() + " key:>> " + cr.key() + " value:>> " + cr.value());
}
@KafkaListener(topics = "topic2")
public void listenT2(ConsumerRecord<?, ?> cr) throws Exception {
System.out.println("listenT2收到消息?。? topic:>>> " + cr.topic() + " key:>> " + cr.key() + " value:>> " + cr.value());
}
}
5、測試
5.1 啟動項目
5.2 打開瀏覽器輸入
http://localhost:8888/send?topic=topic1&key=msg&data=testmessage
5.3 可以看到控制臺打印
listenT1收到消息??! topic:>>> topic1key:>> msgvalue:>> testmessage