kafka

一 kafka官網(wǎng)

https://kafka.apache.org/

二 安裝Kafka

1 安裝zookeeper集群

2 安裝kafka

1. 解壓

[root@lihl01 software]# tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/
[root@lihl01 apps]# mv kafka_2.11-1.1.1/ kafka-2.11/

2. 修改$KAFKA_HOME/config/server.properties

The id of the broker. This must be set to a unique integer for each broker.

broker.id=1 ## 當(dāng)前kafka實(shí)例的id,必須為整數(shù),一個(gè)集群中不可重復(fù)
log.dirs=/opt/apps/kafka-2.11/data/kafka ## 生產(chǎn)到kafka中的數(shù)據(jù)存儲(chǔ)的目錄,目錄需要手動(dòng)創(chuàng)建
zookeeper.connect=lihl01,lihl02,lihl03/kafka ## kafka數(shù)據(jù)在zk中的存儲(chǔ)目錄

3. 配置環(huán)境變量

envrioment

export JAVA_HOME=/opt/apps/jdk1.8.0_45
export HADOOP_HOME=/opt/apps/hadoop-2.6.0-cdh5.7.6
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export HIVE_HOME=/opt/apps/hive-1.1.0-cdh5.7.6
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.5-cdh5.7.6
export KAFKA_HOME=/opt/apps/kafka-2.11
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$HIVE_HOME/bin
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin

4. 分發(fā),并修改其他的borker.id

[root@lihl01 apps]# scp -r kafka-2.11/ lihl02:/opt/apps/
[root@lihl01 apps]# scp -r kafka-2.11/ lihl03:/opt/apps/

5. 啟動(dòng)zk

[root@lihl01 apps]# zkServer.sh start
[root@lihl02 apps]# zkServer.sh start
[root@lihl03 apps]# zkServer.sh start

6. 啟動(dòng)kafka

[root@lihl01 apps]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

[root@lihl02 apps]# kafka-server-start.sh -daemon KAFKA_HOME/config/server.properties [root@lihl03 apps]# kafka-server-start.sh -daemonKAFKA_HOME/config/server.properties

三 Kafka基本操作-命令行

1 創(chuàng)建主題

./kafka-topics.sh \
--create \
--topic lihltest \
--if-not-exists \
--partitions 3 \
--replication-factor 2 \
--zookeeper lihl01,lihl02,lihl03/kafka

Created topic "lihltest".

tip:
副本因子的個(gè)數(shù)應(yīng)該小于等于broker的個(gè)數(shù)

2 查詢所有的主題列表

./kafka-topics.sh \
--list \
--zookeeper lihl01,lihl02,lihl03/kafka

lihltest

3 查詢主題詳情

./kafka-topics.sh
--describe
--topic lihltest
--zookeeper lihl01,lihl02,lihl03/kafka

Topic:lihltest PartitionCount:3 ReplicationFactor:2 Configs:
Topic: lihltest Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: lihltest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: lihltest Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3

Partition: 當(dāng)前topic對(duì)應(yīng)的分區(qū)編號(hào)
Replicas : 副本因子,當(dāng)前kafka對(duì)應(yīng)的partition所在的broker實(shí)例的broker.id的列表
Leader : 該partition的所有副本中的leader領(lǐng)導(dǎo)者,處理所有kafka該partition讀寫請(qǐng)求
ISR : 該partition的存活的副本對(duì)應(yīng)的broker實(shí)例的broker.id的列表

4 修改主題

./kafka-topics.sh
--alter
--topic lihltest
--partitions 2
--zookeeper lihl01,lihl02,lihl03/kafka

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

tip:
修改主題的時(shí)候,主要是修改主題的分區(qū)數(shù),但是分區(qū)數(shù)只能擴(kuò)大不能減少

5 刪除主題

./kafka-topics.sh
--delete
--topic flink_test1
--zookeeper lihl01,lihl02,lihl03/kafka

6 測(cè)試生產(chǎn)與消費(fèi)

6.1 生產(chǎn)端

./kafka-console-producer.sh
--topic lihltest
--broker-list lihl01:9092,lihl02:9092,lihl03:9092

6.2 消費(fèi)端

kafka-console-consumer.sh
--topic flink_test1
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092

6.3 消費(fèi)者組

kafka-console-consumer.sh
--topic lihltest
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092
--group lihltest
--offset latest \ # 從什么位置(消息的偏移量)開始消費(fèi)。數(shù)字、latest、earlist
--partition 0 # 消費(fèi)者對(duì)應(yīng)的分區(qū)

tip:
當(dāng)前的消費(fèi)者屬于lihltest的消費(fèi)者組,每次從0分區(qū)的最新的位置開始消費(fèi)
我的主題的并行度由分區(qū)和消費(fèi)者決定,消費(fèi)者組內(nèi)的消費(fèi)者至多可以同時(shí)消費(fèi)數(shù)據(jù)量最多和分區(qū)數(shù)相同
我們的kafka中的主題的偏移量默認(rèn)是保存在zk中,我們需要讀取這個(gè)主題中間的消息就必須要先獲取到偏移量才可以

四 Kafka的JavaAPI

1 生產(chǎn)者生產(chǎn)數(shù)據(jù)

1.1 初版代碼

public class Demo1_Kafka_Producer {
    public static void main(String[] args) {
        //0. 申明連接到kafka的配置的url
        Properties props = new Properties();
        props.put("bootstrap.servers", "lihl01:9092, lihl02:9092, lihl03:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //1. 創(chuàng)建生產(chǎn)者對(duì)象
        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 創(chuàng)建你想要發(fā)送的記錄對(duì)象
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("lihltest", "hello");
        producer.send(record);

        //3. 釋放
        producer.close();
    }
}

1.2 優(yōu)化之后

package cn.lihl.spark.kafka.day1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.util.Properties;

public class Demo1_Kafka_Producer {
    public static void main(String[] args) throws IOException {
        //0. 申明連接到kafka的配置的url
        Properties props = new Properties();
        props.load(Demo1_Kafka_Producer.class.getClassLoader().getResourceAsStream("producer.properties"));

        //1. 創(chuàng)建生產(chǎn)者對(duì)象
        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 創(chuàng)建你想要發(fā)送的記錄對(duì)象
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("hzbigdata2002", "hello");
        producer.send(record);

        //3. 釋放
        producer.close();
    }
}

1.3 producer.properties常見參數(shù)的說明

bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer # key的序列器
value.serializer=org.apache.kafka.common.serialization.StringSerializer # value的序列器
acks=[0|-1|1|all] ## 消息確認(rèn)機(jī)制
0: 不做確認(rèn),直管發(fā)送消息即可
-1|all: 不僅leader需要將數(shù)據(jù)寫入本地磁盤,并確認(rèn),還需要同步的等待其它followers進(jìn)行確認(rèn)
1:只需要leader進(jìn)行消息確認(rèn)即可,后期follower可以從leader進(jìn)行同步
batch.size=1024 #每個(gè)分區(qū)內(nèi)的用戶緩存未發(fā)送record記錄的空間大小

如果緩存區(qū)中的數(shù)據(jù),沒有沾滿,也就是仍然有未用的空間,那么也會(huì)將請(qǐng)求發(fā)送出去,為了較少請(qǐng)求次數(shù),我們可以配置linger.ms大于0,

linger.ms=10 ## 不管緩沖區(qū)是否被占滿,延遲10ms發(fā)送request
buffer.memory=10240 #控制的是一個(gè)producer中的所有的緩存空間
retries=0 #發(fā)送消息失敗之后的重試次數(shù)

1.4 producer.properties

############################# Producer Basics #############################

bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
compression.type=gzip
max.block.ms=3000
linger.ms=1
batch.size=16384
buffer.memory=33554432
acks=1
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

2 消費(fèi)者消費(fèi)數(shù)據(jù)

2.1 consumer.properties

bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
group.id=hzbigdata2002
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

2.2 消費(fèi)者代碼

package cn.lihl.spark.kafka.day2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

public class Demo1_Kafka_Consumer {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("lihltest"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
        }
    }
}

3 操作Topic

3.1 創(chuàng)建主題

public class Demo2_Kafka_Admin {
    public static void main(String[] args) {
        //1. 創(chuàng)建配置文件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "lihl01:9092,lihl02:9092,lihl03:9092");
        //2. 創(chuàng)建對(duì)象
        AdminClient client = AdminClient.create(props);
        //3. 創(chuàng)建主題
        client.createTopics(Arrays.asList(new NewTopic("chengzhiyuan", 3, (short)1)));
        //4. 釋放
        client.close();
    }
}

3.2 打印所有的主題列表

public class Demo3_Kafka_Admin_list {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 創(chuàng)建配置文件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "lihl01:9092,lihl02:9092,lihl03:9092");
        //2. 創(chuàng)建對(duì)象
        AdminClient client = AdminClient.create(props);
        //3. list
        ListTopicsResult listTopicsResult = client.listTopics();
        //4. 獲取到所有的主題名稱
        KafkaFuture<Set<String>> names = listTopicsResult.names();
        //5. 獲取到所有的名字字符串
        Set<String> topicNames = names.get();
        //6. 遍歷
        for (String topicName : topicNames) {
            System.out.println(topicName);
        }
        client.close();
    }
}

4 自定義分區(qū)

4.1 默認(rèn)分區(qū)策略

每一條producerRecord有,topic名稱、可選的partition分區(qū)編號(hào),以及一對(duì)可選的key和value組成。
三種策略進(jìn)入分區(qū)
1、如果指定的partition,那么直接進(jìn)入該partition
2、如果沒有指定partition,但是指定了key,使用key的hash選擇partition
3、如果既沒有指定partition,也沒有指定key,使用輪詢的方式進(jìn)入partition

4.2 隨機(jī)分區(qū)器

4.2.1 代碼

package cn.lihl.spark.kafka.day2;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

public class Demo4_Kafka_RandomPartitioner implements Partitioner{

    private Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //1. 獲取我的分區(qū)個(gè)數(shù)
        int partitionCount = cluster.partitionCountForTopic(topic);
        int partition = random.nextInt(partitionCount);
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4.2.2 修改配置文件:producer.properties

partitioner.class=cn.lihl.spark.kafka.day2.Demo4_Kafka_RandomPartitioner

五 Flume整合Kafka

1 安裝Flume

[root@lihl01 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/apps/
[root@lihl01 apps]# mv apache-flume-1.9.0-bin/ flume-1.9.0

envrioment

export JAVA_HOME=/opt/apps/jdk1.8.0_45
export HADOOP_HOME=/opt/apps/hadoop-2.6.0-cdh5.7.6
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export HIVE_HOME=/opt/apps/hive-1.1.0-cdh5.7.6
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.5-cdh5.7.6
export KAFKA_HOME=/opt/apps/kafka-2.11
export FLUME_HOME=/opt/apps/flume-1.9.0
export CLASSPATH=.:JAVA_HOME/lib/dt.jar:JAVA_HOME/lib/tools.jar
export PATH=PATH:JAVA_HOME/bin:HADOOP_HOME/bin:HADOOP_HOME/sbin:SCALA_HOME/bin:HIVE_HOME/bin
export PATH=PATH:SPARK_HOME/bin:SPARK_HOME/sbin:ZOOKEEPER_HOME/bin:KAFKA_HOME/bin:FLUME_HOME/bin

2 新建一個(gè)主題

kafka-topics.sh --create
--topic flume-kafka
--zookeeper lihl01,lihl02,lihl03/kafka
--partitions 3
--replication-factor 1

Created topic "flume-kafka".

3 配置flume:netcat_kafka.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.49.111
a1.sources.r1.port = 6666

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.bootstrap.servers = lihl01:9092,lihl02:9092,lihl03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4 啟動(dòng)測(cè)試

1 start-yarn.sh
2 zkServer.sh start
3 kafka-server-start.sh

4 啟動(dòng)flume,后臺(tái)啟動(dòng)

  1. 前臺(tái)啟動(dòng)
    flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf/ -f /home/netcat_kafka.conf -Dflume.root.logger=INFO,console

  2. 后臺(tái)啟動(dòng)
    nohup flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf/ -f /home/netcat_kafka.conf > /dev/null 2>&1 &

5 啟動(dòng)消費(fèi)者腳本
kafka-console-consumer.sh
--topic flume-kafka
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092

6 安裝一個(gè)web服務(wù)器
[root@lihl01 home]# yum -y install telnet

7 啟動(dòng)telnet
telnet lihl01 6666

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容