文章出處shenyifengtk.github.io 轉(zhuǎn)載請注明
本文由來,有一個需求要在瀏覽器輸入Kafka topic,消費組提交后自動開啟消費,這個做起來比較簡單,同事使用了Kafka 驅(qū)動包很快速完成這個。我突然想到能不能通過Spring Kafka自身框架完成這個功能,不使用底層驅(qū)動包來自做呢。而引出分析整個Spring Kafka 如何實現(xiàn)注解消費信息,調(diào)用方法的。并且最后通過幾個簡單的代碼完成上面小需求。
源碼解析
EnableKafka入口
kafka 模塊的開始先從@EnableKafka 上@Import(KafkaListenerConfigurationSelector.class)
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { KafkaBootstrapConfiguration.class.getName() };
}
接著繼續(xù)看下KafkaBootstrapConfiguration類
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
}
}
}
使用BeanDefinitionRegistry 將class 轉(zhuǎn)換成beanDefinition,注冊到beanDefinitionMap 容器中,容器會統(tǒng)一將Map Class全部進行實例化,其實就是將這個交給Spring 初始化。

KafkaListenerAnnotationBeanPostProcessor 解析
下面看下kafka核心處理類KafkaListenerAnnotationBeanPostProcessor 如何解析@KafkaListener 注解,postProcessAfterInitialization 在bean 實例化后調(diào)用方法,對bean 進行增強。
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
//如果此時bean可能是代理類,則獲取原始class ,否則直接class
Class<?> targetClass = AopUtils.getTargetClass(bean);
//這時類上去找@KafkaListener ,因為在class 上可能出現(xiàn)多種復雜情況,這個方法封裝一系列方法能包裝找到注解
//這里可能存在子父類同時使用注解,所有只有找到一個就進行對應方法處理
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
//從方法上找注解,找到方法放到map中,Method 當作key
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) { //如果類上有注解的話,都有搭配@KafkaHandler使用的,方法上找這個注解
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) { //將解析過class 緩存起來
this.nonAnnotatedClasses.add(bean.getClass());
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName); //方法監(jiān)聽處理的邏輯
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); //KafkaHandler 處理邏輯
}
}
return bean;
}
@kafkaListener其實可以作用于Class 上的,搭配著@KafkaHandler一起使用,那怎么樣使用呢,我用一個簡單例子展示下。
@KafkaListener(topics = "${topic-name.lists}",groupId = "${group}",concurrency = 4)
public class Kddk {
@KafkaHandler
public void user(User user){
}
@KafkaHandler
public void std(Dog dog){
}
}
消費信息不同對象區(qū)分進行處理,省去對象轉(zhuǎn)換的麻煩,我暫時想到場景就是這些,平常很少有這些。這個實現(xiàn)原理我就不深入分析了
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
//如果方法剛好被代理增強了,返回原始class 方法
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
String beanRef = kafkaListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
String[] topics = resolveTopics(kafkaListener);
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
//這個方法是判斷方法上是否有@RetryableTopic 注解,有的話則放回true,注冊到KafkaListenerEndpointRegistry
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
//解析@kafkaListener 屬性,設置到endpoint ,注冊到KafkaListenerEndpointRegistry
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
}
this.listenerScope.removeListener(beanRef);
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
String containerFactory = resolve(kafkaListener.containerFactory());
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);
//這里主要核心了,解析完成后,注冊到KafkaListenerEndpointRegistry 中,等待下一步操作了
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
}
類名MethodKafkaListenerEndpoint 都可以理解成端點對象,簡單地說,端點是通信通道的一端??梢岳斫膺@個端點連接業(yè)務方法和kafka 信息之間的通信端點。
@RetryableTopic 是spring kafka 2.7 后出的一個注解,主要作用就是在消費kafka信息時出現(xiàn)消費異常時,失敗重試而出現(xiàn)死信信息的處理,由于Kafka內(nèi)部并沒有死信隊列或者死信信息這類東西。Spring 自己搞出來一個DLT topics (Dead-Letter Topic),意思就是當消費信息失敗到達一定次數(shù)時,會將信息發(fā)送到指定DLT topic 中。注解可以設置重試次數(shù)、重試時間、故障異常、失敗策略等等。
其實這個processMainAndRetryListeners 方法跟下面processListener 作用差不多,都有解析注解內(nèi)容,然后調(diào)用KafkaListenerEndpointRegistry.registerEndpoint 方法。
KafkaListenerEndpointRegistry 主要由Spring 容器創(chuàng)建,用于實例化MessageListenerContainer
KafkaListenerEndpointRegistrar主要代碼new創(chuàng)建,并沒有交給spring容器管理,用于幫助bean 注冊到KafkaListenerEndpointRegistry中
這個兩個類類名特別相似,在分析源碼時被搞到暈頭轉(zhuǎn)向,分清楚后其實就挺簡單了,這個類名搞混上浪費不算時間去理解。
注冊endpoint
public void registerEndpoint(KafkaLiEstenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {
// Factory may be null, we defer the resolution right before actually creating the container
// 這個只是一個內(nèi)部類,用來裝兩個對象的,沒有任何實現(xiàn)意義,factory 實際可能為空,這里使用延時創(chuàng)建解析這個問題
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
//這個 startImmediately 并沒有被初始化,這里一定是false,當被設置true,會直接創(chuàng)建監(jiān)聽器容器,這時應該是spring 容器已經(jīng)初始化完成了
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
這里為什么有一個startImmediately開關呢,這里只是將endpoint 放入容器集中保存起來,等到全部添加完成后,使用Spring InitializingBean接口afterPropertiesSet 方法進行基礎注冊啟動,這是利用了Spring bean 生命周期方法來觸發(fā),如果是Spring 完全啟動完成后,那添加進來endpoint就是不能啟動的了,所以相當于一個閾值開關,開啟后立即啟動。
下面看下調(diào)用KafkaListenerEndpointRegistrar.afterPropertiesSet 來開啟各大endpoint 運行。
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint //只有使用@KafkaHandler 才會生成這個對象
&& this.validator != null) {
((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
}
//通過endpoint ,containerFactory 創(chuàng)建信息容器MessageListenerContainer
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
//全部處理完成了,就可以開啟start啟動按鈕,讓新增進來立即啟動
this.startImmediately = true; // trigger immediate startup
}
}
//獲取內(nèi)部類KafkaListenerContainerFactory 具體實例,在延時啟動時,可能存在空,這時可以使用Spring 內(nèi)部默認
// 如果注解上已經(jīng)備注了要使用ContainerFactory 則使用自定義,為空則使用默認ConcurrentKafkaListenerContainerFactory
private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {
if (descriptor.containerFactory != null) {
return descriptor.containerFactory;
}
else if (this.containerFactory != null) {
return this.containerFactory;
}
else if (this.containerFactoryBeanName != null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
this.containerFactory = this.beanFactory.getBean(
this.containerFactoryBeanName, KafkaListenerContainerFactory.class);
return this.containerFactory; // Consider changing this if live change of the factory is required
}
else {
//.....
}
}
MessageListenerContainer
看下KafkaListenerEndpointRegistry.registerListenerContainer 方法如何生成信息監(jiān)聽器的。
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
registerListenerContainer(endpoint, factory, false);
}
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
//創(chuàng)建監(jiān)聽器容器
MessageListenerContainer container = createListenerContainer(endpoint, factory);
//使用map 將實例化容器保存起來,key就是 @KafkaListener id ,這個就是所謂的beanName
this.listenerContainers.put(id, container);
ConfigurableApplicationContext appContext = this.applicationContext;
String groupName = endpoint.getGroup();
//如果注解中有設置自定義監(jiān)聽組,這時需要獲取到監(jiān)聽組實例,將監(jiān)聽器容器裝起來
if (StringUtils.hasText(groupName) && appContext != null) {
//省略部分內(nèi)容
}
if (startImmediately) { //如果是立即啟動,這時需要手動調(diào)用監(jiān)聽器start 方法
startIfNecessary(container);
}
}
}
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenercContainerFactory<?> factory) {
//監(jiān)聽器被創(chuàng)建了
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) { //這時spring 容器已經(jīng)初始化完成了,生命周期方法不會再執(zhí)行了,這里顯式調(diào)用它
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
int containerPhase = listenerContainer.getPhase();
if (listenerContainer.isAutoStartup() &&
containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) { // a custom phase value
if (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container "
+ "factory definitions: " + this.phase + " vs " + containerPhase);
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
// contextRefreshed Spring 完全啟動完成true
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
主要就是通過KafkaListenercContainerFactory 信息監(jiān)聽工廠來創(chuàng)建監(jiān)聽器MessageListenerContainer ,通過繼承了SmartLifecycle。SmartLifecycle接口是Spring 在初始化完成后,根據(jù)接口isAutoStartup() 返回值是否實現(xiàn)該接口的類中對應的start()。Spring 當spring 完全初始化完成后,SmartLifecycle 接口就不會被Spring 調(diào)用執(zhí)行,這時就需要手動執(zhí)行start 方法,所以startIfNecessary 方法才會判斷容器已經(jīng)啟動完成了。
MessageListenerContainer
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
//配置kafka 設置,因為像信息消費提交ack,信息消費批量這些設置都是通過配置設定的,這些信息都在factory保存著,這時將配置信息設置給endpoint
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
}
//這里是核心,將注解聲明bean method 創(chuàng)建成MessagingMessageListenerAdapter 信息監(jiān)聽適配器,在將適配器初始化參數(shù)去創(chuàng)建信息監(jiān)聽器,交給instance
endpoint.setupListenerContainer(instance, this.messageConverter);
//將concurrency 并發(fā)數(shù)設置上
initializeContainer(instance, endpoint);
//自定義配置
customizeContainer(instance);
return instance;
}
這時kafka 配置信息、@KafkaListener 信息、消費方法、bean 已經(jīng)全部設置createListenerContainer,這時監(jiān)聽器容器就可以啟動kafka 拉取信息,調(diào)用方法進行處理了。
直接從信息監(jiān)聽器ConcurrentMessageListenerContainer啟動方法開始
public final void start() {
checkGroupId();
synchronized (this.lifecycleMonitor) {
if (!isRunning()) { //監(jiān)聽狀態(tài),測試還沒有開始監(jiān)聽,所以監(jiān)聽狀態(tài)應該為false
Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
//抽象方法,由子類去實現(xiàn)
doStart();
}
}
}
@Override
protected void doStart() {
if (!isRunning()) {
//topic 正則匹配,根據(jù)規(guī)則去匹配sever所有topic,沒有則拋出異常
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
//已經(jīng)獲取到消費組的分區(qū)和offset
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
// 當 concurrency 并發(fā)數(shù)超過分區(qū)時,這里會打印警告日志
this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
//注意這里,強制將并發(fā)數(shù)改成最大分數(shù),在設置消費并發(fā)時,不用擔心分區(qū)數(shù)量并發(fā)超過
this.concurrency = topicPartitions.length;
}
setRunning(true); //開始監(jiān)聽
//concurrency 就是創(chuàng)建容器時,從@KafkaListener 解析處理的并發(fā)數(shù)
// 可以看出并發(fā)數(shù)控制著 KafkaMessageListenerContainer 實例產(chǎn)生
for (int i = 0; i < this.concurrency; i++) {
//創(chuàng)建 KafkaMessageListenerContainer 對象
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
//配置監(jiān)聽器容器攔截器、通知這些,如果沒有配置默認都是null
configureChildContainer(i, container);
if (isPaused()) {
container.pause();
}
container.start(); //啟動任務
//因為所有消費現(xiàn)場都是同一個容器創(chuàng)建的,當要停止某個消費topic,需要對containers進行操作
this.containers.add(container);
}
}
}
private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties,
@Nullable TopicPartitionOffset[] topicPartitions, int i) {
KafkaMessageListenerContainer<K, V> container;
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); // NOSONAR
}
else { //如果存在分區(qū),每一個消費都有平分分區(qū)
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, // NOSONAR
containerProperties, partitionSubset(containerProperties, i));
}
return container;
}
看到了@KafkaListener 并發(fā)數(shù)如何實現(xiàn)的,并且并發(fā)數(shù)不能超過分區(qū)數(shù)的,如果并發(fā)數(shù)小于分區(qū)數(shù),則會出現(xiàn)平分的情況,可能會讓一個消費占有多個分區(qū)情況。這里在創(chuàng)建KafkaMessageListenerContainer 去對Kafka topic 進行消費。
KafkaMessageListenerContainer
因為KafkaMessageListenerContainer和ConcurrentMessageListenerContainer都是通過extends AbstractMessageListenerContainer 重寫doStart()開啟任務,直接看見doStart就可以知道程序入口了。
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
//檢查是否非自動ack,在org.springframework.kafka.listener.ContainerProperties.AckMode 有多種模式
checkAckMode(containerProperties);
//
Object = containerProperties.getMessageListener();
//任務執(zhí)行器,看起倆像一個線程池Executor ,本質(zhì)上是直接使用Thread來啟動任務的
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
//這個一個枚舉類,根據(jù)類型生成type,type 標記著如何處理kafka 信息,有批量的、單條的、手動提交、自動提交
ListenerType listenerType = determineListenerType(listener);
//ListenerConsumer 內(nèi)部類,有關Kafka 任何信息都可以直接去取的
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true); //設置運行狀態(tài)
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);//啟動線程
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
在這里主要邏輯就是啟動線程去去處理kafka 信息拉取。我們直接去看ListenerConsumer run() 就行了。
@Override // NOSONAR complexity
public void run() {
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
//向spring容器發(fā)布事件
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
//從kafka 獲取消費組 分區(qū) offset,保存起來
initAssignedPartitions();
//發(fā)布事件
publishConsumerStartedEvent();
Throwable exitThrowable = null;
while (isRunning()) {
try {
//核心 拉取信息和 調(diào)用方法去處理信息
pollAndInvoke();
}
//省略
pollAndInvoke 這個方法就是拉取信息和處理的過程了,方法太繁瑣了,無非就是如何去調(diào)用endpoint 生成信息處理器,并且將參數(shù)注入方法中。
總結(jié)

結(jié)合上面圖,簡單總結(jié)下Spring Kafka 如何通過一個簡單注解實現(xiàn)對方法消費信息的。首先通過Spring 前置處理器機制使用KafkaListenerAnnotationBeanPostProcessor 掃描所有已經(jīng)實例化的bean,找出帶有@KafkaListener bean 和方法,解析注解的內(nèi)容設置到MethodKafkaListenerEndpoint,并且注冊到KafkaListenerEndpointRegistry,有它統(tǒng)一保存起來,等到執(zhí)行前置處理器統(tǒng)一將KafkaListenerEndpointRegistry保存起來的enpoint,注冊到KafkaListenerEndpointRegistrar,根據(jù)enpoint生成ConcurrentMessageListenerContainer,在根據(jù)并發(fā)數(shù)去生成對應數(shù)量的KafkaMessageListenerContainer,最后使用Thread 異步啟動Kafka 信息拉去,調(diào)用bean 方法進行處理。
還理解了topic 分區(qū)和并發(fā)數(shù)如何關聯(lián)的,還知道kafka消費是可控制的,處理Kafka信息方法,返回值可以被推送到另一個topic的、也是第一次知道有@RetryableTopic 重試機制,還有DLT 死信topic。如果不是看源碼分析,平常工作場景估計很少用得上這些?,F(xiàn)在看源碼多了,越來越有感覺查看代碼更能加深你對框架學習,心得。
動態(tài)訂閱
看了這么多代碼,對照處理器CV下就,簡單版動態(tài)監(jiān)聽就可以實現(xiàn)了
@Component
public class ListenerMessageCommand<K,V> implements CommandLineRunner {
@Autowired
private Cusmotd cusmotd;
@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
@Autowired
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
private Logger logger = LoggerFactory.getLogger(ListenerMessageCommand.class);
@Override
public void run(String... args) throws Exception {
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(cusmotd);
Method method = ReflectionUtils.findMethod(cusmotd.getClass(), "dis", ConsumerRecord.class);
endpoint.setMethod(method);
endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
endpoint.setId("tk.shengyifeng.custom#1");
endpoint.setGroupId("test");
endpoint.setTopicPartitions(new TopicPartitionOffset[0]);
endpoint.setTopics("skdsk");
endpoint.setClientIdPrefix("comuserd_");
endpoint.setConcurrency(1);
endpointRegistry.registerListenerContainer(endpoint,kafkaListenerContainerFactory,true);
logger.info("register...............");
}
}
我們看過完整代碼,知道監(jiān)聽動作是由KafkaListenerContainerFactory創(chuàng)建后,調(diào)用實例start 方法開始的,并且我們還能拿到監(jiān)聽容器對象,可以調(diào)用對象各式API,可以動態(tài)停止對topic消費哦。
@RestController
@RequestMapping("kafka")
public class KafkaController<K,V> {
@Autowired
private Cusmotd cusmotd;
@Autowired
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
private Map<String,MessageListenerContainer> containerMap = new ConcurrentReferenceHashMap<>();
@GetMapping("start/topic")
public void startTopic(String topicName,String groupName){
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(cusmotd);
Method method = ReflectionUtils.findMethod(cusmotd.getClass(), "dis", ConsumerRecord.class);
endpoint.setMethod(method);
endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
endpoint.setId("tk.shengyifeng.custom#1");
endpoint.setGroupId(groupName);
endpoint.setTopicPartitions(new TopicPartitionOffset[0]);
endpoint.setTopics(topicName);
endpoint.setClientIdPrefix("comuserd_");
endpoint.setConcurrency(1);
MessageListenerContainer listenerContainer = kafkaListenerContainerFactory.createListenerContainer(endpoint);
listenerContainer.start();
containerMap.put(topicName,listenerContainer);
}
@GetMapping("stop/topic")
public void stopTopic(String topicName){
if (containerMap.containsKey(topicName))
containerMap.get(topicName).stop();
}
}
這個簡單http接口,通過接口方式支持對外擴容的方式動態(tài)訂閱頻道,并且支持已經(jīng)訂閱topic消費停下來。
使用@kafkaListener 聲明方法消費的同學不用羨慕的,Spring 提供機制可以去獲取MessageListenerContainer,上面代碼分析我們知道了KafkaListenerEndpointRegistry內(nèi)部的listenerContainers 會保存所有container實例,并且提供外部方法根據(jù)id去獲取對象,而且KafkaListenerEndpointRegistry還是有spring 進行實例化的,所以....
為了方便獲取id簡單,可以在使用注解時,手動指定id 值,如果沒有指定則id,默認生成規(guī)則是org.springframework.kafka.KafkaListenerEndpointContainer# + 自增長
SpringBoot 自動配置
大家可能好奇,Spring boot中Kafka配置信息如何給kafkaListenerContainerFactory,因為它是通過Spring 容器初始化的,源碼中并沒有看見帶有構(gòu)造器的參數(shù)注入。想要具體了解,只有看KafkaAnnotationDrivenConfiguration,ConcurrentKafkaListenerContainerFactoryConfigurer
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter messageConverter;
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
private final BatchMessageConverter batchMessageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
private final ConsumerAwareRebalanceListener rebalanceListener;
private final ErrorHandler errorHandler;
private final BatchErrorHandler batchErrorHandler;
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private final RecordInterceptor<Object, Object> recordInterceptor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
}
作為其實Spring Boot自動配置原理就是由spring-boot-autoconfigure 包編碼實現(xiàn)的,在根據(jù)@ConditionalOnClass 注解來決定是否啟動配置類,所以當你引入對應pox時,就會啟動配置類了,配置信息會注入到KafkaProperties對象中,然后將properties 設置到工廠對象,實例化對象交給spring 容器,你會發(fā)現(xiàn)大多數(shù)自動配置都是這樣套路。