SpringBoot整合kafka

解釋:

Kafka是一個(gè)分布式的消息存儲(chǔ)系統(tǒng),提供了四大核心接口:
1.Producer API允許了應(yīng)用可以向Kafka中的topics發(fā)布消息;
2.Consumer API允許了應(yīng)用可以訂閱Kafka中的topics,并消費(fèi)消息;
3.Streams API允許應(yīng)用可以作為消息流的處理者,比如可以從topicA中消費(fèi)消息,處理的結(jié)果發(fā)布到topicB中;
4.Connector API提供Kafka與現(xiàn)有的應(yīng)用或系統(tǒng)適配功能,比如與數(shù)據(jù)庫(kù)連接器可以捕獲表結(jié)構(gòu)的變化;

Topic —> 每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic.
Producer —> 負(fù)責(zé)發(fā)布消息到Kafka broker.
Consumer —> 消息消費(fèi)者,向Kafka broker讀取消息的客戶端.

Kafka安裝:

Kafka下載地址:(http://kafka.apache.org/downloads)

解壓下載文件目錄結(jié)構(gòu)如下:
目錄

Windows啟動(dòng)方式:

分別啟動(dòng)Zookeeper、Kafka

 \bin\windows\zookeeper-server-start.bat config\zookeeper.properties
 \bin\windows\kafka-server-start.bat config\server.properties

提供kafka服務(wù)不需要在本地安裝。

Spring Boot整合Kafaka:

非注解使用方式:
pom引入:

<!--kafka-clients發(fā)送消息所需jar包-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

添加配置.properties文件:

#============== kafka ===================
# 指定kafka 代理地址,可以多個(gè)
spring.kafka.bootstrap-servers=localhost:9092
#=============== provider  =======================
spring.kafka.producer.retries=0 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
# 每次批量發(fā)送消息的數(shù)量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息體的編解碼方式 UTF-8
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

編寫一個(gè)生產(chǎn)者者:

package com.zhongway.modules.kafka.provider;

import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zhongway.modules.kafka.entity.MessageEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @author Minko
 */
public class KafkaSender {
    private static Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    private static KafkaProducer<String, String> producer;
    private Gson gson = new GsonBuilder().create();

    static {
        try {
            InputStream props = Resources.getResource("producer.props").openStream();
            Properties properties = new Properties();
            properties.load(props);
            producer = new KafkaProducer<>(properties);
        } catch (IOException e) {
            logger.error("初始化Kafka配置文件失敗");
        }
    }

    /**
     * 發(fā)送消息方法
     *
     * @param topic 主題
     * @param msg 消息體
     */
    public void sendMsg(String topic, String msg) {
        MessageEntity message = new MessageEntity();
        message.setMsg(msg);
        message.setSendTime(new Date());
        logger.info("sendMessage = {}", gson.toJson(message));
        try {
            Future<RecordMetadata> record = producer.send(new ProducerRecord<>(topic, gson.toJson(message)));
            record.get();
        } catch (Exception e) {
            logger.error("sendErrorMessage = {}", gson.toJson(message));
        }
    }
}

更簡(jiǎn)單的使用注解方式:

pom引入:

spring for kafka對(duì)應(yīng)版本
對(duì)應(yīng)版本

此處引入2.1.x 其對(duì)應(yīng)kafka-clients版本為所需的1.0.0
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

添加配置文件application.yml:

  kafka:
    bootstrap-servers: localhost:9092 # 指定kafka 代理地址,可以多個(gè)
    producer: # 生產(chǎn)者
      retries: 1 # 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
      # 每次批量發(fā)送消息的數(shù)量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

編寫一個(gè)生產(chǎn)者:

/**
 * 生產(chǎn)者
 * @author Minko
 */
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 發(fā)送消息到kafka
     *@param topic 主題
     *@param message 內(nèi)容體
     */
    public void sendMsg(String topic , String message){
        kafkaTemplate.send(topic ,message);
    }
}

兩種方式外部只需提供 topic 和發(fā)送的 json字符串即可 。
關(guān)于定時(shí)任務(wù),更新后的renren框架取消了job 中 method,每個(gè)定時(shí)任務(wù)需要實(shí)現(xiàn)ITask的 run方法,源碼中會(huì)獲取存入schedule_job表中的bean名稱 和 run方法根據(jù)cron表達(dá)式去執(zhí)行該方法。
示例:

@Component("KafkaSenderTask ")
public class KafkaSenderTask implements ITask {
    private Logger logger = LoggerFactory.getLogger(getClass());
       /**
     * params 可為空
     * @param params   參數(shù),多參數(shù)使用JSON數(shù)據(jù)
     */
    @Override
    public void run(String params){
        KafkaSender kafkaSender = new KafkaSender();
        kafkaSender.sendMsg("M","工單消息內(nèi)容");
    }
}

kafka葵花寶典:傳送門=>

Demo:

public class GsonTest {
    public static void main(String[] args) {
        List<Map<String, String>> mapList = new ArrayList<>();
        Map map = new HashMap();
        map.put("id", "1");
        map.put("name", "葵花寶典");
        Map map2 = new HashMap();
        map2.put("id", "2");
        map2.put("name", "九陰真經(jīng)");
        mapList.add(map);
        mapList.add(map2);
        Gson gson = new Gson();
        System.out.println(gson.toJson(mapList));
    }
}
打印結(jié)果:[{"name":"葵花寶典","id":"1"},{"name":"九陰真經(jīng)","id":"2"}]
                                   ...end
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容