Kafka集群的安裝部署和實(shí)踐應(yīng)用

Kafka介紹

Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),有如下特性:

  • 通過(guò)O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數(shù)百萬(wàn)的消息。
  • 支持通過(guò)Kafka服務(wù)器和消費(fèi)機(jī)集群來(lái)分區(qū)消息。
  • 支持Hadoop并行數(shù)據(jù)加載。

消息隊(duì)列的作用

  • 應(yīng)用程序解耦并行處理
  • 順序保證
  • 高吞吐率
  • 高容錯(cuò)、高可用
  • 可擴(kuò)展
  • 峰值處理


    Kafka集群.png

kafka原理

Kafka集群由多個(gè)實(shí)例組成,每個(gè)節(jié)點(diǎn)稱為Broker,對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類
一個(gè)Topic可以被劃分為多個(gè)Partition每個(gè)Partition可以有多個(gè)副本。

Kafka原理圖01.png

Partition內(nèi)順序存儲(chǔ),寫入新消息采用追加的方式,消費(fèi)消息采用FIFO的方式順序拉取消息
一個(gè)Topic可以有多個(gè)分區(qū),Kafka只保證同一個(gè)分區(qū)內(nèi)有序,不保證Topic整體(多個(gè)分區(qū)之間)有序

kafka原理圖02.png

Consumer Group(CG),為了加快讀取速度,多個(gè)consumer可以劃分為一個(gè)組,并行消費(fèi)一個(gè)Toic,一個(gè)Topic可以由多個(gè)CG訂閱,多個(gè)CG之間是平等的,同一個(gè)CG內(nèi)可以有一個(gè)或多個(gè)consumer,同一個(gè)CG內(nèi)的consumer之間是競(jìng)爭(zhēng) 關(guān)系,一個(gè)消息在一個(gè)CG內(nèi)的只能被一個(gè)consumer消費(fèi)


kafka原理圖03.png

一、Kafka集群部署

集群規(guī)劃清單

名稱 節(jié)點(diǎn) 說(shuō)明 節(jié)點(diǎn)名
Broker01 192.168.43.22 kafka節(jié)點(diǎn)01 hadoop03
Broker02 192.168.43.23 kafka節(jié)點(diǎn)02 hadoop04
Broker03 192.168.43.24 kafka節(jié)點(diǎn)03 hadoop05
Zookeeper 192.168.43.20/21/22 Zookeeper集群節(jié)點(diǎn) hadoop01/hadoop02/hadoop03

1.下載Kafka安裝包,并解壓安裝

[root@hadoop03 kafka_2.11-0.10.2.1]# ll
總用量 52
drwxr-xr-x. 3 hadoop hadoop  4096 4月  22 2017 bin
drwxr-xr-x. 2 hadoop hadoop  4096 4月  22 2017 config
drwxr-xr-x. 2 root   root     152 1月  20 18:57 kafka-logs
drwxr-xr-x. 2 hadoop hadoop  4096 1月  20 18:43 libs
-rw-r--r--. 1 hadoop hadoop 28824 4月  22 2017 LICENSE
drwxr-xr-x. 2 root   root    4096 1月  20 23:07 logs
-rw-r--r--. 1 hadoop hadoop   336 4月  22 2017 NOTICE
drwxr-xr-x. 2 hadoop hadoop    47 4月  22 2017 site-docs

2.創(chuàng)建軟鏈接

[root@hadoop03 kafka_2.11-0.10.2.1]# ln -s /home/hadoop/apps/kafka_2.11-0.10.2.1 /usr/local/kafka

3.創(chuàng)建日志文件夾

[root@hadoop03 kafka]# pwd
/usr/local/kafka
[root@hadoop03 kafka]# mkdir kafka-logs/

4.配置服務(wù)啟動(dòng)信息

在/usr/local/kafka/config目錄下修改server.properties文件,具體內(nèi)容如下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

#每個(gè)borker的id是唯一的,多個(gè)broker要設(shè)置不同的id
broker.id=0

#訪問(wèn)端口號(hào)
port=9092

#訪問(wèn)地址
host.name=192.168.43.22

#允許刪除topic
delete.topic.enable=true


# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

#存儲(chǔ)數(shù)據(jù)路徑,默認(rèn)是在/tmp目錄下,需要修改
log.dirs=/usr/local/kafka/kafka-logs

#創(chuàng)建topic默認(rèn)分區(qū)數(shù)
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

#數(shù)據(jù)保存時(shí)間,默認(rèn)7天,單位小時(shí)
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

#zookeeper地址,多個(gè)地址用逗號(hào)隔開(kāi)
zookeeper.connect=192.168.43.20:2181,192.168.43.21:2181,192.168.43.22:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

5.拷貝文件信息到Broker02/Broker03節(jié)點(diǎn)上

scp -r /home/hadoop/apps/kafka_2.11-0.10.2.1 hadoop@node04:/home/hadoop/apps/
scp -r /home/hadoop/apps/kafka_2.11-0.10.2.1 hadoop@node04:/home/hadoop/apps/

6.修改Broker02和Broker03信息

創(chuàng)建軟連接

[root@hadoop03 kafka_2.11-0.10.2.1]# ln -s /home/hadoop/apps/kafka_2.11-0.10.2.1 /usr/local/kafka

修改配置文件server.properties信息

broker.id=1
host.name=192.168.43.23

修改Broker03節(jié)點(diǎn)server.properties信息

broker.id=2
host.name=192.168.43.24

7.分別啟動(dòng)Broker01/Broker02/Broker03

以后臺(tái)進(jìn)程的方式啟動(dòng)Kafka

[root@hadoop03 bin]#./kafka-server-start.sh -daemon config/server.properties

二、Kafka應(yīng)用實(shí)踐

1.創(chuàng)建主題

[root@hadoop03 bin]# pwd
/usr/local/kafka/bin
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 2 --partitions 3 --topic topicnewtest1
Created topic "topicnewtest1".

2.查看主題

[root@hadoop03 bin]# ./kafka-topics.sh  --list --zookeeper 192.168.43.20:2181
topicnewtest1

3.查看主題信息

[root@hadoop03 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.43.20:2181 --topic topicnewtest1
Topic:topicnewtest1 PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: topicnewtest1    Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0
    Topic: topicnewtest1    Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: topicnewtest1    Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2

4.刪除主題

[root@hadoop03 bin]# ./kafka-topics.sh --delete --zookeeper 192.168.43.20:2181 --topic topicnewtest1
Topic topicnewtest1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

5.增加分區(qū)

[root@hadoop03 bin]# ./kafka-topics.sh --alter --zookeeper 192.168.43.20:2181 --topic topicnewtest1 --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@hadoop03 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.43.20:2181 --topic topicnewtest1
Topic:topicnewtest1 PartitionCount:5    ReplicationFactor:2 Configs:
    Topic: topicnewtest1    Partition: 0    Leader: 1   Replicas: 1,0   Isr: 1,0
    Topic: topicnewtest1    Partition: 1    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: topicnewtest1    Partition: 2    Leader: 0   Replicas: 0,2   Isr: 0,2
    Topic: topicnewtest1    Partition: 3    Leader: 1   Replicas: 1,2   Isr: 1,2
    Topic: topicnewtest1    Partition: 4    Leader: 2   Replicas: 2,0   Isr: 2,0

6.使用kafka自帶的生產(chǎn)者客戶端腳本和消費(fèi)端腳本

使用kafka自帶的生產(chǎn)者客戶端腳本

[root@hadoop03 bin]# ./kafka-console-producer.sh --broker-list 192.168.43.22:9092,192.168.43.23:9092 --topic topicnewtest1

使用kafka自帶的消費(fèi)者客戶端腳本

[root@hadoop04 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.43.20:2181 --from-beginning --topic topicnewtest1

在生成端發(fā)送消息,可以在消費(fèi)看到消息

7.使用Java訪問(wèn)Kafka產(chǎn)生消息和消費(fèi)消息

  • Producer
package cn.chinahadoop.client;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * Kafka生產(chǎn)端
 * @author Zhangyongliang
 */
public class ProducerClient {
    public static void main(String[] args){
        Properties props = new Properties();
        //kafka broker列表
        props.put("bootstrap.servers", "192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092");
        //acks=1表示Broker接收到消息成功寫入本地log文件后向Producer返回成功接收的信號(hào),不需要等待所有的Follower全部同步完消息后再做回應(yīng)
        props.put("acks", "1");
        //key和value的字符串序列化類
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //用戶產(chǎn)生隨機(jī)數(shù),模擬消息生成
        Random rand = new Random();
        for(int i = 0; i < 20; i++) {
            //通過(guò)隨機(jī)數(shù)產(chǎn)生一個(gè)ip地址作為key發(fā)送出去
            String ip = "192.168.1." + rand.nextInt(255);
            long runtime = new Date().getTime();
            //組裝一條消息內(nèi)容
            String msg = runtime + "---" + ip;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("send to kafka->key:" + ip + " value:" + msg);
            //向kafka topictest1主題發(fā)送消息
            producer.send(new ProducerRecord<String, String>("topicnewtest1", ip, msg));
        }
        producer.close();
    }
}
  • ConSumer
package com.yongliang.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * Kafka消費(fèi)端
 * @author Zhangyongliang
 */
public class ConsumerClient {
    /**
     * 手動(dòng)提交偏移量
     */
    public static void manualCommintClient(){
        Properties props = new Properties();
        //kafka broker列表
        props.put("bootstrap.servers", "192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092");
        //consumer group id
        props.put("group.id", "yongliang");
        //手動(dòng)提交offset
        props.put("enable.auto.commit", "false");
        //earliest表示從最早的偏移量開(kāi)始拉取,latest表示從最新的偏移量開(kāi)始拉取,none表示如果沒(méi)有發(fā)現(xiàn)該Consumer組之前拉取的偏移量則拋異常。默認(rèn)值latest。
        props.put("auto.offset.reset", "earliest");
        //key和value的字符串反序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //consumer訂閱topictest1主題,同時(shí)消費(fèi)多個(gè)主題用逗號(hào)隔開(kāi)
        consumer.subscribe(Arrays.asList("topicnewtest1"));
        //每次最少處理10條消息后才提交
        final int minBatchSize = 10;
        //用于保存消息的list
        List<ConsumerRecord<String, String>> bufferList = new ArrayList<ConsumerRecord<String, String>>();
        while (true) {
            System.out.println("--------------start pull message---------------" );
            long starttime = System.currentTimeMillis();
            //poll方法需要傳入一個(gè)超時(shí)時(shí)間,當(dāng)沒(méi)有可以拉取的消息時(shí)先等待,
            //如果已到超時(shí)時(shí)間還沒(méi)有可以拉取的消息則進(jìn)行下一輪拉取,單位毫秒
            ConsumerRecords<String, String> records = consumer.poll(1000);
            long endtime = System.currentTimeMillis();
            long tm = (endtime - starttime) / 1000;
            System.out.println("--------------end pull message and times=" + tm + "s -------------");

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                bufferList.add(record);
            }
            System.out.println("--------------buffer size->" + bufferList.size());
            //如果讀取到的消息滿了10條, 就進(jìn)行處理
            if (bufferList.size() >= minBatchSize) {
                System.out.println("******start deal message******");
                try {
                    //當(dāng)前線程睡眠1秒鐘,模擬消息處理過(guò)程
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("manual commint offset start...");
                //處理完之后進(jìn)行提交
                consumer.commitSync();
                //清除list, 繼續(xù)接收
                bufferList.clear();
                System.out.println("manual commint offset end...");
            }
        }
    }

    /**
     * 自動(dòng)提交偏移量
     */
    public static void autoCommintClient(){
        Properties props = new Properties();
        //kafka broker列表
        props.put("bootstrap.servers", "192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092");
        props.put("group.id", "newConsumerGroup");
        //自動(dòng)提交
        props.put("enable.auto.commit", "true");
        //自動(dòng)提交時(shí)間間隔1000毫秒
        props.put("auto.commit.interval.ms", "1000");
        //earliest表示從最早的偏移量開(kāi)始拉取,latest表示從最新的偏移量開(kāi)始拉取,none表示如果沒(méi)有發(fā)現(xiàn)該Consumer組之前拉取的偏移量則拋異常。默認(rèn)值latest。
        props.put("auto.offset.reset", "earliest");
        //key和value的字符串反序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //consumer訂閱topictest1主題,同時(shí)消費(fèi)多個(gè)主題用逗號(hào)隔開(kāi)
        consumer.subscribe(Arrays.asList("topicnewtest1"));
        while (true) {
            //poll方法需要傳入一個(gè)超時(shí)時(shí)間,當(dāng)沒(méi)有可以拉取的消息時(shí)先等待,
            //如果已到超時(shí)時(shí)間還沒(méi)有可以拉取的消息則進(jìn)行下一輪拉取,單位毫秒
            ConsumerRecords<String, String> records = consumer.poll(1000);
            //處理拉取過(guò)來(lái)的消息
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }

        }
    }
    public static void main(String[] args){
        //自動(dòng)提交offset
//        autoCommintClient();
        //手動(dòng)提交offset
        manualCommintClient();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容