一、springboot整合kafka

image

本文提綱

  • 1、kakfa-producer
  • 2、kafka-consumer
  • 3、springboot整合

該項(xiàng)目依賴psyche,將相關(guān)kafka組件作為moudle放在fast-plugins

運(yùn)行環(huán)境

springboot + kafka2.11

1、前提

假設(shè)你已經(jīng)了解過springboot和kafka,對(duì)這兩門技術(shù)已經(jīng)有簡單的基礎(chǔ)認(rèn)知,包括知道kafka是mq組件,知道生產(chǎn)者消費(fèi)者的概念

  • kafka安裝教程

項(xiàng)目整體架構(gòu)如下

image

fast-pluginsmoudle下創(chuàng)建fast-data-kafka,其中又包含consumer和producer兩個(gè)moudle。
web的項(xiàng)目結(jié)構(gòu)如下圖
image

web依賴kafka和base項(xiàng)目

  • pom.xml依賴
    相關(guān)依賴由于是公共的,都放入fast-data-kafka這個(gè)上層項(xiàng)目
   <modules>
        <module>fast-data-kafka-consumer</module>
        <module>fast-data-kafka-producer</module>
    </modules>

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>22.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>

2、kafka-producer

producer主要包含兩個(gè)類

  • KafkaProducerProperties:配置文件對(duì)應(yīng)的bean
@Component
@ConfigurationProperties(prefix = KafkaProducerProperties.KAFKA_PRODUCER_PREFIX)
public class KafkaProducerProperties {

    public static final String KAFKA_PRODUCER_PREFIX = "kafka";

    private String brokerAddress;

    public String getBrokerAddress() {
        return brokerAddress;
    }

    public void setBrokerAddress(String brokerAddress) {
        this.brokerAddress = brokerAddress;
    }
}

該類對(duì)應(yīng)配置文件中的kafka.brokerAddress屬性

  • KafkaProducerAutoConfiguration:該類依賴KafkaProducerProperties配置bean

@Configuration
@EnableKafka
@EnableConfigurationProperties(KafkaProducerProperties.class)
@ConditionalOnClass(value = org.apache.kafka.clients.consumer.KafkaConsumer.class)
public class KafkaProducerAutoConfiguration {
    private KafkaProducerProperties kafkaProducerProperties;

    public KafkaProducerAutoConfiguration(KafkaProducerProperties kafkaProducerProperties) {
        this.kafkaProducerProperties = kafkaProducerProperties;
    }

    public Map<String, Object> producerConfigs() {
        String brokers = kafkaProducerProperties.getBrokerAddress();
        if (StringUtils.isEmpty(brokers)) {
            throw new RuntimeException("kafka broker address is empty");
        }
        Map<String, Object> props = Maps.newHashMap();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties
                .getBrokerAddress());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // value to block, after which it will throw a TimeoutException
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

該類包含了kafka-producer的一些基礎(chǔ)配置,并且創(chuàng)建了KafkaTemplate

以上就完成了kafka-producer的配置

3、kafka-consumer

同上,該moudle也包含以下兩個(gè)類

  • KafkaConsumerProperties: 配置文件對(duì)應(yīng)的bean
/**
 * Describe:
 *
 * @Author sunliang
 * @Since 2019/06/10
 */
@ConfigurationProperties(prefix = KafkaConsumerProperties.KAFKA_CONSUMER_PREFIX)
public class KafkaConsumerProperties {

    public static final String KAFKA_CONSUMER_PREFIX = "kafka";

    private String brokerAddress;

    private String groupId;

    public String getBrokerAddress() {
        return brokerAddress;
    }

    public void setBrokerAddress(String brokerAddress) {
        this.brokerAddress = brokerAddress;
    }

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }
  • KafkaConsumerAutoConfiguration: 自動(dòng)裝配類
/**
* Describe:
*
* @Author sunliang
* @Since 2019/06/10
*/
@EnableKafka
@Configuration
@EnableConfigurationProperties(KafkaConsumerProperties.class)
@ConditionalOnClass(value = org.apache.kafka.clients.consumer.KafkaConsumer.class)
public class KafkaConsumerAutoConfiguration {
   protected final Logger logger = LoggerFactory.getLogger(this.getClass());

   private KafkaConsumerProperties kafkaConsumerProperties;

   public KafkaConsumerAutoConfiguration(KafkaConsumerProperties kafkaConsumerProperties) {
       logger.info("KafkaConsumerAutoConfiguration kafkaConsumerProperties:{}",
               JSON.toJSONString(kafkaConsumerProperties));
       this.kafkaConsumerProperties = kafkaConsumerProperties;
   }

   @Bean
   public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
   kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory = new
               ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       factory.setConcurrency(3);
       factory.getContainerProperties().setPollTimeout(1000);
       return factory;
   }

   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }

   @Bean
   public Map<String, Object> consumerConfigs() {
       String brokers = kafkaConsumerProperties.getBrokerAddress();
       if (StringUtils.isEmpty(brokers)) {
           throw new RuntimeException("kafka broker address is emptiy");
       }

       Map<String, Object> propsMap = new HashMap<>();
       propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBrokerAddress());
       propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId());
       propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自動(dòng)commit
       propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); //定時(shí)commit的周期
       propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); //consumer活性超時(shí)時(shí)間
       propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //從何處開始消費(fèi),latest 表示消費(fèi)最新消息,earliest 表示從頭開始消費(fèi),none表示拋出異常,默認(rèn)latest
       return propsMap;
   }

}

以上就完成了consumer的配置,接下來我們做一個(gè)boot應(yīng)用,測試下kafka

3、fast-rest

  • pom.xml
    <dependencies>
       <dependency>
           <groupId>com.liangliang</groupId>
           <artifactId>fast-base</artifactId>
           <version>0.0.1-SNAPSHOT</version>
       </dependency>
       <dependency>
           <groupId>com.liangliang</groupId>
           <artifactId>fast-data-kafka-consumer</artifactId>
           <version>1.0-SNAPSHOT</version>
       </dependency>
       <dependency>
           <groupId>com.liangliang</groupId>
           <artifactId>fast-data-kafka-producer</artifactId>
           <version>1.0-SNAPSHOT</version>
       </dependency>
   </dependencies>
  • kafkaUtils:kafka的工具類

/**
 * Describe:
 *
 * @Author sunliang
 * @Since 2019/06/11
 */
@Slf4j
@Component
public class KafkaUtils {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String data) {
        log.info("kafka sendMessage start");
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                log.info("kafka sendMessage success topic = {}, data = {}",topic, data);
            }
        });
        log.info("kafka sendMessage end");
    }
}

  • listener:consumer監(jiān)聽程序
    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord record){
        String json = record.value().toString();
        log.info("kafka consumer sessionListener session json:{}", json);
    }

監(jiān)聽test主題,并輸出log

  • controller: 可以從web端輸入?yún)?shù),作為kafka生產(chǎn)者,將相關(guān)信息,存入kafka
/**
 * Describe:
 *
 * @Author sunliang
 * @Since 2019/06/11
 */
@Slf4j
@RestController
public class KafkaProducerController {
    @Autowired
    private KafkaUtils kafkaUtils;

    @GetMapping("/chat/{msg}")
    public RestResult area(HttpServletResponse response, @PathVariable("msg")String msg){
        response.setHeader("Access-Control-Allow-Origin", "*");
        log.info(">>>>>msg = {}",msg);
        kafkaUtils.sendMessage("test",msg);
        return RestResultBuilder.builder().data(msg).success().build();
    }
}

至此已經(jīng)完成了kafka與springboot的整合。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容