解釋:
Kafka是一個(gè)分布式的消息存儲(chǔ)系統(tǒng),提供了四大核心接口:
1.Producer API允許了應(yīng)用可以向Kafka中的topics發(fā)布消息;
2.Consumer API允許了應(yīng)用可以訂閱Kafka中的topics,并消費(fèi)消息;
3.Streams API允許應(yīng)用可以作為消息流的處理者,比如可以從topicA中消費(fèi)消息,處理的結(jié)果發(fā)布到topicB中;
4.Connector API提供Kafka與現(xiàn)有的應(yīng)用或系統(tǒng)適配功能,比如與數(shù)據(jù)庫(kù)連接器可以捕獲表結(jié)構(gòu)的變化;
Topic —> 每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic.
Producer —> 負(fù)責(zé)發(fā)布消息到Kafka broker.
Consumer —> 消息消費(fèi)者,向Kafka broker讀取消息的客戶端.
Kafka安裝:
Kafka下載地址:(http://kafka.apache.org/downloads)
解壓下載文件目錄結(jié)構(gòu)如下:
Windows啟動(dòng)方式:
分別啟動(dòng)Zookeeper、Kafka
\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
\bin\windows\kafka-server-start.bat config\server.properties
提供kafka服務(wù)不需要在本地安裝。
Spring Boot整合Kafaka:
非注解使用方式:
pom引入:
<!--kafka-clients發(fā)送消息所需jar包-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
添加配置.properties文件:
#============== kafka ===================
# 指定kafka 代理地址,可以多個(gè)
spring.kafka.bootstrap-servers=localhost:9092
#=============== provider =======================
spring.kafka.producer.retries=0 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
# 每次批量發(fā)送消息的數(shù)量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息體的編解碼方式 UTF-8
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
編寫一個(gè)生產(chǎn)者者:
package com.zhongway.modules.kafka.provider;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zhongway.modules.kafka.entity.MessageEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author Minko
*/
public class KafkaSender {
private static Logger logger = LoggerFactory.getLogger(KafkaSender.class);
private static KafkaProducer<String, String> producer;
private Gson gson = new GsonBuilder().create();
static {
try {
InputStream props = Resources.getResource("producer.props").openStream();
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
} catch (IOException e) {
logger.error("初始化Kafka配置文件失敗");
}
}
/**
* 發(fā)送消息方法
*
* @param topic 主題
* @param msg 消息體
*/
public void sendMsg(String topic, String msg) {
MessageEntity message = new MessageEntity();
message.setMsg(msg);
message.setSendTime(new Date());
logger.info("sendMessage = {}", gson.toJson(message));
try {
Future<RecordMetadata> record = producer.send(new ProducerRecord<>(topic, gson.toJson(message)));
record.get();
} catch (Exception e) {
logger.error("sendErrorMessage = {}", gson.toJson(message));
}
}
}
更簡(jiǎn)單的使用注解方式:
pom引入:

此處引入2.1.x 其對(duì)應(yīng)kafka-clients版本為所需的1.0.0
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
添加配置文件application.yml:
kafka:
bootstrap-servers: localhost:9092 # 指定kafka 代理地址,可以多個(gè)
producer: # 生產(chǎn)者
retries: 1 # 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
# 每次批量發(fā)送消息的數(shù)量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
編寫一個(gè)生產(chǎn)者:
/**
* 生產(chǎn)者
* @author Minko
*/
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 發(fā)送消息到kafka
*@param topic 主題
*@param message 內(nèi)容體
*/
public void sendMsg(String topic , String message){
kafkaTemplate.send(topic ,message);
}
}
兩種方式外部只需提供 topic 和發(fā)送的 json字符串即可 。
關(guān)于定時(shí)任務(wù),更新后的renren框架取消了job 中 method,每個(gè)定時(shí)任務(wù)需要實(shí)現(xiàn)ITask的 run方法,源碼中會(huì)獲取存入schedule_job表中的bean名稱 和 run方法根據(jù)cron表達(dá)式去執(zhí)行該方法。
示例:
@Component("KafkaSenderTask ")
public class KafkaSenderTask implements ITask {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* params 可為空
* @param params 參數(shù),多參數(shù)使用JSON數(shù)據(jù)
*/
@Override
public void run(String params){
KafkaSender kafkaSender = new KafkaSender();
kafkaSender.sendMsg("M","工單消息內(nèi)容");
}
}
kafka葵花寶典:傳送門=>
Demo:
public class GsonTest {
public static void main(String[] args) {
List<Map<String, String>> mapList = new ArrayList<>();
Map map = new HashMap();
map.put("id", "1");
map.put("name", "葵花寶典");
Map map2 = new HashMap();
map2.put("id", "2");
map2.put("name", "九陰真經(jīng)");
mapList.add(map);
mapList.add(map2);
Gson gson = new Gson();
System.out.println(gson.toJson(mapList));
}
}
打印結(jié)果:[{"name":"葵花寶典","id":"1"},{"name":"九陰真經(jīng)","id":"2"}]
...end