kafka簡(jiǎn)介

簡(jiǎn)介

消息中間件
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁瀏覽,搜索和其他用戶的行動(dòng))數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。
broker
topic
partition
producer
consumer
consumer group

安裝

  1. 下載解壓
  2. 啟動(dòng)zookeeper
  3. 修改config/server.proprties的:
log.dirs=E:\data\kafka-logs
zookeeper.connect=10.129.83.213:2181
listeners=PLAINTEXT://10.143.47.32:9092
  1. 啟動(dòng)命令
    kafka_2.12-0.10.2.1 需要jdk1.8
    kafka_2.11-0.10.0.1 需要jdk1.7
    Kafka Shell基本命令(包括topic的增刪改查)
#win
.\bin\windows\kafka-server-start.bat .\config\server.properties
#linux
 bin/kafka-server-start.sh config/server.properties
#創(chuàng)建一個(gè)名為“test”的Topic,只有一個(gè)分區(qū)和一個(gè)備份
bin/kafka-topics.sh --create --zookeeper 10.129.83.213:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic
bin/kafka-topics.sh --list --zookeeper 10.129.83.213:2181
#查看consumer-groups
bin/kafka-consumer-groups.sh --list --bootstrap-server 10.143.47.32:9092
bin/kafka-consumer-groups.sh --bootstrap-server 10.143.47.32:9092 --describe --group myGroup
#查看消費(fèi)了多少數(shù)據(jù)
 bin/kafka-run-class.sh  kafka.tools.ConsumerOffsetChecker --group myGroup  --topic test  --zookeeper 10.129.83.213:2181
#查看test詳細(xì)信息
bin/kafka-topics.sh --describe --topic test --zookeeper 10.129.83.213:2181
#發(fā)送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 
#發(fā)送消息
bin/kafka-console-producer.sh --broker-list 10.143.47.32:9092 --topic test 
#消息內(nèi)容
This is a message
This is another message
Hello World
#消費(fèi)消息
#消費(fèi)者線程數(shù)必須是小等于topic的partition分區(qū)數(shù)
bin/kafka-console-consumer.sh --zookeeper 10.129.83.213:2181 --topic test --from-beginning
#消費(fèi)消息
bin/kafka-console-consumer.sh --bootstrap-server 10.143.47.32:9092 --topic test --from-beginning
Paste_Image.png

每個(gè) consumer 都屬于一個(gè) consumer group,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi),但可以被多個(gè) consumer group 消費(fèi)。
無論消息是否被消費(fèi),kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

基于時(shí)間:log.retention.hours=168
基于大?。簂og.retention.bytes=1073741824

producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition。其路由機(jī)制為:

指定了 patition,則直接使用;
未指定 patition 但指定 key,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition
patition 和 key 都未指定,使用輪詢選出一個(gè) patition。
  1. kafka_2.10-0.10.2.0 需要jdk1.7(springboot 1.5.3集成)
    pom.xml
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>1.5.3.RELEASE</version>
        <exclusions>  
                    <exclusion>  
                        <groupId>org.springframework.boot</groupId>  
                        <artifactId>spring-boot-starter-logging</artifactId>  
                    </exclusion>  
                </exclusions> 
    </dependency>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <version>1.5.3.RELEASE</version>
        </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.2.1.RELEASE</version>
    </dependency>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>

application.properties

spring.application.name=springboot-kafka-test
#kafka
spring.kafka.bootstrap-servers=10.143.47.32:9092
spring.kafka.consumer.group-id=myGroup
#charset
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.template.default-topic=test
spring.kafka.listener.concurrency=1
spring.kafka.producer.batch-size=1000
#log4j2
logging.config=classpath:log4j2.xml

配置

@Configuration
@EnableKafka
public class KafkaConfig {
}

生產(chǎn)者

@Component
public class MsgProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void send(String value){
        System.out.println("send start-----------");
        kafkaTemplate.send("test", value+"1");
        kafkaTemplate.send("test", value+"2");
        System.out.println("send end-----------");
    }
}

消費(fèi)者

@Component
public class MsgConsumer {
    static Logger subscribelogger = LoggerFactory.getLogger("subscribelogger"); 
    
    @KafkaListener(topics="test")
    public void processMsg1(String s){
        subscribelogger.info("{}|{}","myGroup",s); 
    }
    
    /*@KafkaListener(topics="test")
    public void processMsg(ConsumerRecord<?, ?> record){
        subscribelogger.info("{}|{}","myGroup1",record.value());    
    }*/
}

參考

kafka入門經(jīng)典教程
kafka教程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容