創(chuàng)建工程
一

二

三

這里需要注意一下,我們導(dǎo)入的Spring-Kafka為2.1.8版本,SpringBoot為2.0.4的正式版,請保持版本一致、

好了,已經(jīng)三秒了,真男人,你可以關(guān)閉屏幕冷靜一下了,停止你那顫抖的身體。
編寫第一個Demo
實現(xiàn)順序
- 創(chuàng)建消費者和生產(chǎn)者的Map配置
- 根據(jù)Map配置創(chuàng)建對應(yīng)的消費者工廠(consumerFactory)和生產(chǎn)者工廠(producerFactory)
- 根據(jù)consumerFactory創(chuàng)建監(jiān)聽器的監(jiān)聽器工廠
- 根據(jù)producerFactory創(chuàng)建KafkaTemplate(Kafka操作類)
- 創(chuàng)建監(jiān)聽容器
先給你們瞄一眼項目結(jié)構(gòu),記得把Kafka 啟動...

創(chuàng)建KafkaConfiguration配置類
都是一些配置參數(shù),具體的作用也在代碼中寫明了,值得注意的是,KafkaTemplate的類型為<Integer,String>,我們可以找kafkaTemplate的send方法,有多個重載方法,其中有個方法如下,key和data參數(shù)都為泛型,這其實就是對應(yīng)著KafkaTemplate<Integer,String>。那具體有什么用呢,還記得我們的Topic中可以包含多個Partition(分區(qū))嗎,那我們?nèi)绻幌胧謩又付òl(fā)送到哪個分區(qū),我們則可以利用key去實現(xiàn)。這里我們的key是Integer類型,template會根據(jù) key 路由到對應(yīng)的partition中,如果key存在對應(yīng)的partitionID則發(fā)送到該partition中,否則由算法選擇發(fā)送到哪個partition。
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
@Configuration
@EnableKafka
public class KafkaConfiguration {
//ConcurrentKafkaListenerContainerFactory為創(chuàng)建Kafka監(jiān)聽器的工程類,這里只配置了消費者
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
//根據(jù)consumerProps填寫的參數(shù)創(chuàng)建消費者工廠
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
//根據(jù)senderProps填寫的參數(shù)創(chuàng)建生產(chǎn)者工廠
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
//kafkaTemplate實現(xiàn)了Kafka發(fā)送接收等功能
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
return template;
}
//消費者配置參數(shù)
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
//連接地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//GroupID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka");
//是否自動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自動提交的頻率
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//Session超時設(shè)置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
//鍵的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
//值的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
//生產(chǎn)者配置
private Map<String, Object> senderProps (){
Map<String, Object> props = new HashMap<>();
//連接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//重試,0為不啟用重試機制
props.put(ProducerConfig.RETRIES_CONFIG, 1);
//控制批處理大小,單位為字節(jié)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//批量發(fā)送,延遲為1毫秒,啟用該功能能有效減少生產(chǎn)者發(fā)送消息次數(shù),從而提高并發(fā)量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//生產(chǎn)者可以使用的總內(nèi)存字節(jié)來緩沖等待發(fā)送到服務(wù)器的記錄
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
//鍵的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
創(chuàng)建DemoListener消費者
這里的消費者其實就是一個監(jiān)聽類,指定監(jiān)聽名為topic.quick.demo的Topic,consumerID為demo。
@Component
public class DemoListener {
private static final Logger log= LoggerFactory.getLogger(DemoListener.class);
//聲明consumerID為demo,監(jiān)聽topicName為topic.quick.demo的Topic
@KafkaListener(id = "demo", topics = "topic.quick.demo")
public void listen(String msgData) {
log.info("demo receive : "+msgData);
}
}
創(chuàng)建測試類
這里的send方法第一參數(shù)為TopicName,第二個參數(shù)則是發(fā)送的數(shù)據(jù)
@SpringBootTest
@RunWith(SpringRunner.class)
public class DemoTest {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testDemo() throws InterruptedException {
kafkaTemplate.send("topic.quick.demo", "this is my first demo");
//休眠5秒,為了使監(jiān)聽器有足夠的時間監(jiān)聽到topic的數(shù)據(jù)
Thread.sleep(5000);
}
}
接下來直接運行這個測試方法,我們可以看到日志中輸出了我們發(fā)送的消息,這就代表我們成功的消費了測試方法中發(fā)送的消息。
2018-09-06 17:26:20.850 INFO 6232 --- [ demo-0-C-1] com.viu.kafka.listen.DemoListener : demo receive : this is my first demo
啟動項目
看清楚了是啟動項目,不是測試類,我們來觀察一下控制臺的輸出日志
首先這個是KafkaConsumer的配置信息,每個消費者都會輸出該配置信息,配置太多就不做講解了
2018-09-06 17:40:15.258 INFO 9944 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = demo
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 15000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2018-09-06 17:40:15.274 INFO 9944 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.2
2018-09-06 17:40:15.274 INFO 9944 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 2a121f7b1d402825
這些日志就代表我們成功的創(chuàng)建了Consumer,由于沒有做并發(fā)配置,所以現(xiàn)在為單個消費者模式,系統(tǒng)會做一個分配Partition的操作,也就是將某個Partition指定給某個消費者消費。 這里有個地方需要注意一下,
看到日志中有輸出[Consumer clientId=consumer-1, groupId=demo],我們之前在監(jiān)聽中@KafkaListener注解中配置的id=demo,怎么就變成了groupId=demo,這是因為@KafkaListener注解如果沒有指定groupId這個屬性的值,則會默認(rèn)把id作為groupId。
2018-09-06 17:40:15.287 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=demo] Discovered group coordinator admin-PC:9092 (id: 2147483647 rack: null)
2018-09-06 17:40:15.290 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=demo] Revoking previously assigned partitions []
2018-09-06 17:40:15.290 INFO 9944 --- [ demo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2018-09-06 17:40:15.290 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=demo] (Re-)joining group
2018-09-06 17:40:15.301 INFO 9944 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2018-09-06 17:40:15.302 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=demo] Successfully joined group with generation 33
2018-09-06 17:40:15.303 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=demo] Setting newly assigned partitions [topic.quick.demo-0]
結(jié)束
SpringBoot2.0已經(jīng)提供了Kafka的自動配置,可以在application.properties文件中配置,別問我為什么要寫一堆代碼來創(chuàng)建這些工廠,相對于properties方式我更喜歡java Config方法創(chuàng)建這些配置,因為很直觀,雖然是有點麻煩。