使用場景
-
環(huán)境隔離需求
- 當需要在同一個Kafka集群上同時運行生產環(huán)境和灰度環(huán)境的消息隊列
- 避免不同環(huán)境的消息互相干擾
- 方便進行灰度測試和驗證
-
動態(tài)Topic路由
- 無需修改業(yè)務代碼
- 通過配置實現消息的自動路由
- 支持靈活切換環(huán)境
實現原理
-
生產者攔截器(KafkaProducerInterceptor)
- 實現了 ProducerInterceptor 接口
- 在消息發(fā)送前通過 onSend 方法攔截消息
- 根據配置的前綴( AppConst.KAFKA_PREFIX_KEY )動態(tài)修改目標Topic
- 例如:原始topic為"order",配置前綴為"grey_",最終發(fā)送到"grey_order"隊列
2.監(jiān)聽器配置
- 需要配合 KafkaListenerFactoryBeanPostProcessor 使用
- 用于修改消費者監(jiān)聽的隊列名稱
- 確保消費者監(jiān)聽的隊列與生產者發(fā)送的隊列相匹配
實現代碼
import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import org.reflections.scanners.FieldAnnotationsScanner;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.reflections.scanners.MethodParameterScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 需要配合cn.example.interceptor.KafkaProducerInterceptor
* KafkaListenerFactoryBeanPostProcessor:更改監(jiān)聽隊列
* KafkaProducerInterceptor:更改消息發(fā)送隊列
*/
@Component
@Slf4j
public class KafkaListenerFactoryBeanPostProcessor implements BeanFactoryPostProcessor, EnvironmentAware {
private Environment env;
@SneakyThrows
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
if (StrUtil.isNotBlank(env.getProperty(AppConst.KAFKA_PREFIX_KEY))) {
List<String> packageNames = AutoConfigurationPackages.get(beanFactory);
for (String packageName : packageNames) {
Reflections reflections = new Reflections(new ConfigurationBuilder()
// 指定路徑URL
.forPackages(packageName)
// 添加子類掃描工具
.addScanners(new SubTypesScanner())
// 添加 屬性注解掃描工具
.addScanners(new FieldAnnotationsScanner())
// 添加 方法注解掃描工具
.addScanners(new MethodAnnotationsScanner())
// 添加方法參數掃描工具
.addScanners(new MethodParameterScanner())
);
Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
if (!CollectionUtils.isEmpty(methodSet)) {
for (Method method : methodSet) {
KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
changeTopics(kafkaListener);
}
}
}
}
}
private void changeTopics(KafkaListener kafkaListener) throws Exception {
InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
memberValuesField.setAccessible(true);
Map<String, Object> memberValues = (Map<String, Object>) memberValuesField.get(invocationHandler);
String[] topics = (String[]) memberValues.get("topics");
log.info("修改前topics:{}", Lists.newArrayList(topics));
for (int i = 0; i < topics.length; i++) {
topics[i] = env.getProperty(AppConst.KAFKA_PREFIX_KEY) + topics[i];
}
memberValues.put("topics", topics);
log.info("修改后topics:{}", Lists.newArrayList(kafkaListener.topics()));
}
@Override
public void setEnvironment(Environment environment) {
env = environment;
}
}
import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import cn.example.common.utils.ApplicationContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import java.util.Map;
import java.util.Objects;
/**
* 需要配合cn.example.beanPostProcesser.KafkaListenerFactoryBeanPostProcessor
* KafkaListenerFactoryBeanPostProcessor:更改監(jiān)聽隊列
* KafkaProducerInterceptor:更改消息發(fā)送隊列
*/
@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {
/**
* 運行在用戶主線程中,在消息被序列化之前調用
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
Environment environment = ApplicationContextUtil.getEnvironment();
String prefixKey = environment.getProperty(AppConst.KAFKA_PREFIX_KEY);
if (StrUtil.isNotBlank(prefixKey)) {
log.info("原始topic:{}", record.topic());
String targetTopic = prefixKey + record.topic();
log.info("修改后的topic:{}",prefixKey+record.topic());
return new ProducerRecord<String, String>( targetTopic,
record.partition(), record.timestamp(), record.key(), record.value());
}
return record;
}
/**
* 在消息被應答之前或者消息發(fā)送失敗時調用,通常在producer回調邏輯觸發(fā)之前,運行在produer的io線程中
*
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
log.info("實際topic:{}", metadata.topic());
}
/**
* 清理工作
*/
@Override
public void close() {
}
/**
* 初始化工作
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
配置方式
- 在配置文件中設置前綴
# 生產環(huán)境不設置前綴
kafka.prefix.key=
# 灰度環(huán)境設置前綴
kafka.prefix.key=grey_