Kafka多環(huán)境隔離

使用場景

  1. 環(huán)境隔離需求

    • 當需要在同一個Kafka集群上同時運行生產環(huán)境和灰度環(huán)境的消息隊列
    • 避免不同環(huán)境的消息互相干擾
    • 方便進行灰度測試和驗證
  2. 動態(tài)Topic路由

    • 無需修改業(yè)務代碼
    • 通過配置實現消息的自動路由
    • 支持靈活切換環(huán)境

實現原理

  1. 生產者攔截器(KafkaProducerInterceptor)

    • 實現了 ProducerInterceptor 接口
    • 在消息發(fā)送前通過 onSend 方法攔截消息
    • 根據配置的前綴( AppConst.KAFKA_PREFIX_KEY )動態(tài)修改目標Topic
    • 例如:原始topic為"order",配置前綴為"grey_",最終發(fā)送到"grey_order"隊列

2.監(jiān)聽器配置

  • 需要配合 KafkaListenerFactoryBeanPostProcessor 使用
  • 用于修改消費者監(jiān)聽的隊列名稱
  • 確保消費者監(jiān)聽的隊列與生產者發(fā)送的隊列相匹配

實現代碼

import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import org.reflections.scanners.FieldAnnotationsScanner;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.reflections.scanners.MethodParameterScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.Set;


/**
 * 需要配合cn.example.interceptor.KafkaProducerInterceptor
 * KafkaListenerFactoryBeanPostProcessor:更改監(jiān)聽隊列
 * KafkaProducerInterceptor:更改消息發(fā)送隊列
 */
@Component
@Slf4j
public class KafkaListenerFactoryBeanPostProcessor implements BeanFactoryPostProcessor, EnvironmentAware {


    private Environment env;



    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        if (StrUtil.isNotBlank(env.getProperty(AppConst.KAFKA_PREFIX_KEY))) {
            List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

            for (String packageName : packageNames) {
                Reflections reflections = new Reflections(new ConfigurationBuilder()
                        // 指定路徑URL
                        .forPackages(packageName)
                        // 添加子類掃描工具
                        .addScanners(new SubTypesScanner())
                        // 添加 屬性注解掃描工具
                        .addScanners(new FieldAnnotationsScanner())
                        // 添加 方法注解掃描工具
                        .addScanners(new MethodAnnotationsScanner())
                        // 添加方法參數掃描工具
                        .addScanners(new MethodParameterScanner())
                );

                Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
                if (!CollectionUtils.isEmpty(methodSet)) {
                    for (Method method : methodSet) {
                        KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                        changeTopics(kafkaListener);
                    }
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception {
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String, Object> memberValues = (Map<String, Object>) memberValuesField.get(invocationHandler);
        String[] topics = (String[]) memberValues.get("topics");
        log.info("修改前topics:{}", Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = env.getProperty(AppConst.KAFKA_PREFIX_KEY) + topics[i];
        }
        memberValues.put("topics", topics);
        log.info("修改后topics:{}", Lists.newArrayList(kafkaListener.topics()));

    }

    @Override
    public void setEnvironment(Environment environment) {
        env = environment;
    }
}
import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import cn.example.common.utils.ApplicationContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;

import java.util.Map;
import java.util.Objects;

/**
 * 需要配合cn.example.beanPostProcesser.KafkaListenerFactoryBeanPostProcessor
 * KafkaListenerFactoryBeanPostProcessor:更改監(jiān)聽隊列
 * KafkaProducerInterceptor:更改消息發(fā)送隊列
 */
@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {


    /**
     * 運行在用戶主線程中,在消息被序列化之前調用
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        Environment environment = ApplicationContextUtil.getEnvironment();
        String prefixKey = environment.getProperty(AppConst.KAFKA_PREFIX_KEY);
        if (StrUtil.isNotBlank(prefixKey)) {
            log.info("原始topic:{}", record.topic());
            String targetTopic = prefixKey + record.topic();
            log.info("修改后的topic:{}",prefixKey+record.topic());
            return new ProducerRecord<String, String>( targetTopic,
                    record.partition(), record.timestamp(), record.key(), record.value());
        }
        return record;
    }


    /**
     * 在消息被應答之前或者消息發(fā)送失敗時調用,通常在producer回調邏輯觸發(fā)之前,運行在produer的io線程中
     *
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        log.info("實際topic:{}", metadata.topic());
    }


    /**
     * 清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

配置方式

  1. 在配置文件中設置前綴
# 生產環(huán)境不設置前綴
kafka.prefix.key=

# 灰度環(huán)境設置前綴
kafka.prefix.key=grey_
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容