簡(jiǎn)介
消息中間件
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁瀏覽,搜索和其他用戶的行動(dòng))數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。
broker
topic
partition
producer
consumer
consumer group
安裝
- 下載解壓
- 啟動(dòng)zookeeper
- 修改config/server.proprties的:
log.dirs=E:\data\kafka-logs
zookeeper.connect=10.129.83.213:2181
listeners=PLAINTEXT://10.143.47.32:9092
- 啟動(dòng)命令
kafka_2.12-0.10.2.1 需要jdk1.8
kafka_2.11-0.10.0.1 需要jdk1.7
Kafka Shell基本命令(包括topic的增刪改查)
#win
.\bin\windows\kafka-server-start.bat .\config\server.properties
#linux
bin/kafka-server-start.sh config/server.properties
#創(chuàng)建一個(gè)名為“test”的Topic,只有一個(gè)分區(qū)和一個(gè)備份
bin/kafka-topics.sh --create --zookeeper 10.129.83.213:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic
bin/kafka-topics.sh --list --zookeeper 10.129.83.213:2181
#查看consumer-groups
bin/kafka-consumer-groups.sh --list --bootstrap-server 10.143.47.32:9092
bin/kafka-consumer-groups.sh --bootstrap-server 10.143.47.32:9092 --describe --group myGroup
#查看消費(fèi)了多少數(shù)據(jù)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group myGroup --topic test --zookeeper 10.129.83.213:2181
#查看test詳細(xì)信息
bin/kafka-topics.sh --describe --topic test --zookeeper 10.129.83.213:2181
#發(fā)送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#發(fā)送消息
bin/kafka-console-producer.sh --broker-list 10.143.47.32:9092 --topic test
#消息內(nèi)容
This is a message
This is another message
Hello World
#消費(fèi)消息
#消費(fèi)者線程數(shù)必須是小等于topic的partition分區(qū)數(shù)
bin/kafka-console-consumer.sh --zookeeper 10.129.83.213:2181 --topic test --from-beginning
#消費(fèi)消息
bin/kafka-console-consumer.sh --bootstrap-server 10.143.47.32:9092 --topic test --from-beginning

Paste_Image.png
每個(gè) consumer 都屬于一個(gè) consumer group,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi),但可以被多個(gè) consumer group 消費(fèi)。
無論消息是否被消費(fèi),kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
基于時(shí)間:log.retention.hours=168
基于大?。簂og.retention.bytes=1073741824
producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition。其路由機(jī)制為:
指定了 patition,則直接使用;
未指定 patition 但指定 key,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition
patition 和 key 都未指定,使用輪詢選出一個(gè) patition。
- kafka_2.10-0.10.2.0 需要jdk1.7(springboot 1.5.3集成)
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.3.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>1.5.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>1.5.3.RELEASE</version>
</dependency>
application.properties
spring.application.name=springboot-kafka-test
#kafka
spring.kafka.bootstrap-servers=10.143.47.32:9092
spring.kafka.consumer.group-id=myGroup
#charset
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.template.default-topic=test
spring.kafka.listener.concurrency=1
spring.kafka.producer.batch-size=1000
#log4j2
logging.config=classpath:log4j2.xml
配置
@Configuration
@EnableKafka
public class KafkaConfig {
}
生產(chǎn)者
@Component
public class MsgProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String value){
System.out.println("send start-----------");
kafkaTemplate.send("test", value+"1");
kafkaTemplate.send("test", value+"2");
System.out.println("send end-----------");
}
}
消費(fèi)者
@Component
public class MsgConsumer {
static Logger subscribelogger = LoggerFactory.getLogger("subscribelogger");
@KafkaListener(topics="test")
public void processMsg1(String s){
subscribelogger.info("{}|{}","myGroup",s);
}
/*@KafkaListener(topics="test")
public void processMsg(ConsumerRecord<?, ?> record){
subscribelogger.info("{}|{}","myGroup1",record.value());
}*/
}