一 kafka官網(wǎng)
二 安裝Kafka
1 安裝zookeeper集群
2 安裝kafka
1. 解壓
[root@lihl01 software]# tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/
[root@lihl01 apps]# mv kafka_2.11-1.1.1/ kafka-2.11/
2. 修改$KAFKA_HOME/config/server.properties
The id of the broker. This must be set to a unique integer for each broker.
broker.id=1 ## 當(dāng)前kafka實(shí)例的id,必須為整數(shù),一個(gè)集群中不可重復(fù)
log.dirs=/opt/apps/kafka-2.11/data/kafka ## 生產(chǎn)到kafka中的數(shù)據(jù)存儲(chǔ)的目錄,目錄需要手動(dòng)創(chuàng)建
zookeeper.connect=lihl01,lihl02,lihl03/kafka ## kafka數(shù)據(jù)在zk中的存儲(chǔ)目錄
3. 配置環(huán)境變量
envrioment
export JAVA_HOME=/opt/apps/jdk1.8.0_45
export HADOOP_HOME=/opt/apps/hadoop-2.6.0-cdh5.7.6
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export HIVE_HOME=/opt/apps/hive-1.1.0-cdh5.7.6
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.5-cdh5.7.6
export KAFKA_HOME=/opt/apps/kafka-2.11
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$HIVE_HOME/bin
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
4. 分發(fā),并修改其他的borker.id
[root@lihl01 apps]# scp -r kafka-2.11/ lihl02:/opt/apps/
[root@lihl01 apps]# scp -r kafka-2.11/ lihl03:/opt/apps/
5. 啟動(dòng)zk
[root@lihl01 apps]# zkServer.sh start
[root@lihl02 apps]# zkServer.sh start
[root@lihl03 apps]# zkServer.sh start
6. 啟動(dòng)kafka
[root@lihl01 apps]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@lihl02 apps]# kafka-server-start.sh -daemon KAFKA_HOME/config/server.properties
三 Kafka基本操作-命令行
1 創(chuàng)建主題
./kafka-topics.sh \
--create \
--topic lihltest \
--if-not-exists \
--partitions 3 \
--replication-factor 2 \
--zookeeper lihl01,lihl02,lihl03/kafka
Created topic "lihltest".
tip:
副本因子的個(gè)數(shù)應(yīng)該小于等于broker的個(gè)數(shù)
2 查詢所有的主題列表
./kafka-topics.sh \
--list \
--zookeeper lihl01,lihl02,lihl03/kafka
lihltest
3 查詢主題詳情
./kafka-topics.sh
--describe
--topic lihltest
--zookeeper lihl01,lihl02,lihl03/kafka
Topic:lihltest PartitionCount:3 ReplicationFactor:2 Configs:
Topic: lihltest Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: lihltest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: lihltest Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Partition: 當(dāng)前topic對(duì)應(yīng)的分區(qū)編號(hào)
Replicas : 副本因子,當(dāng)前kafka對(duì)應(yīng)的partition所在的broker實(shí)例的broker.id的列表
Leader : 該partition的所有副本中的leader領(lǐng)導(dǎo)者,處理所有kafka該partition讀寫請(qǐng)求
ISR : 該partition的存活的副本對(duì)應(yīng)的broker實(shí)例的broker.id的列表
4 修改主題
./kafka-topics.sh
--alter
--topic lihltest
--partitions 2
--zookeeper lihl01,lihl02,lihl03/kafka
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
tip:
修改主題的時(shí)候,主要是修改主題的分區(qū)數(shù),但是分區(qū)數(shù)只能擴(kuò)大不能減少
5 刪除主題
./kafka-topics.sh
--delete
--topic flink_test1
--zookeeper lihl01,lihl02,lihl03/kafka
6 測(cè)試生產(chǎn)與消費(fèi)
6.1 生產(chǎn)端
./kafka-console-producer.sh
--topic lihltest
--broker-list lihl01:9092,lihl02:9092,lihl03:9092
6.2 消費(fèi)端
kafka-console-consumer.sh
--topic flink_test1
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092
6.3 消費(fèi)者組
kafka-console-consumer.sh
--topic lihltest
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092
--group lihltest
--offset latest \ # 從什么位置(消息的偏移量)開始消費(fèi)。數(shù)字、latest、earlist
--partition 0 # 消費(fèi)者對(duì)應(yīng)的分區(qū)
tip:
當(dāng)前的消費(fèi)者屬于lihltest的消費(fèi)者組,每次從0分區(qū)的最新的位置開始消費(fèi)
我的主題的并行度由分區(qū)和消費(fèi)者決定,消費(fèi)者組內(nèi)的消費(fèi)者至多可以同時(shí)消費(fèi)數(shù)據(jù)量最多和分區(qū)數(shù)相同
我們的kafka中的主題的偏移量默認(rèn)是保存在zk中,我們需要讀取這個(gè)主題中間的消息就必須要先獲取到偏移量才可以
四 Kafka的JavaAPI
1 生產(chǎn)者生產(chǎn)數(shù)據(jù)
1.1 初版代碼
public class Demo1_Kafka_Producer {
public static void main(String[] args) {
//0. 申明連接到kafka的配置的url
Properties props = new Properties();
props.put("bootstrap.servers", "lihl01:9092, lihl02:9092, lihl03:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 創(chuàng)建生產(chǎn)者對(duì)象
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 創(chuàng)建你想要發(fā)送的記錄對(duì)象
ProducerRecord<String, String> record = new ProducerRecord<String, String>("lihltest", "hello");
producer.send(record);
//3. 釋放
producer.close();
}
}
1.2 優(yōu)化之后
package cn.lihl.spark.kafka.day1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.util.Properties;
public class Demo1_Kafka_Producer {
public static void main(String[] args) throws IOException {
//0. 申明連接到kafka的配置的url
Properties props = new Properties();
props.load(Demo1_Kafka_Producer.class.getClassLoader().getResourceAsStream("producer.properties"));
//1. 創(chuàng)建生產(chǎn)者對(duì)象
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 創(chuàng)建你想要發(fā)送的記錄對(duì)象
ProducerRecord<String, String> record = new ProducerRecord<String, String>("hzbigdata2002", "hello");
producer.send(record);
//3. 釋放
producer.close();
}
}
1.3 producer.properties常見參數(shù)的說明
bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer # key的序列器
value.serializer=org.apache.kafka.common.serialization.StringSerializer # value的序列器
acks=[0|-1|1|all] ## 消息確認(rèn)機(jī)制
0: 不做確認(rèn),直管發(fā)送消息即可
-1|all: 不僅leader需要將數(shù)據(jù)寫入本地磁盤,并確認(rèn),還需要同步的等待其它followers進(jìn)行確認(rèn)
1:只需要leader進(jìn)行消息確認(rèn)即可,后期follower可以從leader進(jìn)行同步
batch.size=1024 #每個(gè)分區(qū)內(nèi)的用戶緩存未發(fā)送record記錄的空間大小
如果緩存區(qū)中的數(shù)據(jù),沒有沾滿,也就是仍然有未用的空間,那么也會(huì)將請(qǐng)求發(fā)送出去,為了較少請(qǐng)求次數(shù),我們可以配置linger.ms大于0,
linger.ms=10 ## 不管緩沖區(qū)是否被占滿,延遲10ms發(fā)送request
buffer.memory=10240 #控制的是一個(gè)producer中的所有的緩存空間
retries=0 #發(fā)送消息失敗之后的重試次數(shù)
1.4 producer.properties
############################# Producer Basics #############################
bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
compression.type=gzip
max.block.ms=3000
linger.ms=1
batch.size=16384
buffer.memory=33554432
acks=1
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
2 消費(fèi)者消費(fèi)數(shù)據(jù)
2.1 consumer.properties
bootstrap.servers=lihl01:9092,lihl02:9092,lihl03:9092
group.id=hzbigdata2002
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
2.2 消費(fèi)者代碼
package cn.lihl.spark.kafka.day2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
public class Demo1_Kafka_Consumer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("lihltest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
}
}
}
3 操作Topic
3.1 創(chuàng)建主題
public class Demo2_Kafka_Admin {
public static void main(String[] args) {
//1. 創(chuàng)建配置文件
Properties props = new Properties();
props.setProperty("bootstrap.servers", "lihl01:9092,lihl02:9092,lihl03:9092");
//2. 創(chuàng)建對(duì)象
AdminClient client = AdminClient.create(props);
//3. 創(chuàng)建主題
client.createTopics(Arrays.asList(new NewTopic("chengzhiyuan", 3, (short)1)));
//4. 釋放
client.close();
}
}
3.2 打印所有的主題列表
public class Demo3_Kafka_Admin_list {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 創(chuàng)建配置文件
Properties props = new Properties();
props.setProperty("bootstrap.servers", "lihl01:9092,lihl02:9092,lihl03:9092");
//2. 創(chuàng)建對(duì)象
AdminClient client = AdminClient.create(props);
//3. list
ListTopicsResult listTopicsResult = client.listTopics();
//4. 獲取到所有的主題名稱
KafkaFuture<Set<String>> names = listTopicsResult.names();
//5. 獲取到所有的名字字符串
Set<String> topicNames = names.get();
//6. 遍歷
for (String topicName : topicNames) {
System.out.println(topicName);
}
client.close();
}
}
4 自定義分區(qū)
4.1 默認(rèn)分區(qū)策略
每一條producerRecord有,topic名稱、可選的partition分區(qū)編號(hào),以及一對(duì)可選的key和value組成。
三種策略進(jìn)入分區(qū)
1、如果指定的partition,那么直接進(jìn)入該partition
2、如果沒有指定partition,但是指定了key,使用key的hash選擇partition
3、如果既沒有指定partition,也沒有指定key,使用輪詢的方式進(jìn)入partition
4.2 隨機(jī)分區(qū)器
4.2.1 代碼
package cn.lihl.spark.kafka.day2;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
public class Demo4_Kafka_RandomPartitioner implements Partitioner{
private Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//1. 獲取我的分區(qū)個(gè)數(shù)
int partitionCount = cluster.partitionCountForTopic(topic);
int partition = random.nextInt(partitionCount);
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
4.2.2 修改配置文件:producer.properties
partitioner.class=cn.lihl.spark.kafka.day2.Demo4_Kafka_RandomPartitioner
五 Flume整合Kafka
1 安裝Flume
[root@lihl01 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/apps/
[root@lihl01 apps]# mv apache-flume-1.9.0-bin/ flume-1.9.0
envrioment
export JAVA_HOME=/opt/apps/jdk1.8.0_45
export HADOOP_HOME=/opt/apps/hadoop-2.6.0-cdh5.7.6
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export HIVE_HOME=/opt/apps/hive-1.1.0-cdh5.7.6
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.5-cdh5.7.6
export KAFKA_HOME=/opt/apps/kafka-2.11
export FLUME_HOME=/opt/apps/flume-1.9.0
export CLASSPATH=.:JAVA_HOME/lib/tools.jar
export PATH=JAVA_HOME/bin:
HADOOP_HOME/sbin:
HIVE_HOME/bin
export PATH=SPARK_HOME/bin:
ZOOKEEPER_HOME/bin:
FLUME_HOME/bin
2 新建一個(gè)主題
kafka-topics.sh --create
--topic flume-kafka
--zookeeper lihl01,lihl02,lihl03/kafka
--partitions 3
--replication-factor 1
Created topic "flume-kafka".
3 配置flume:netcat_kafka.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.49.111
a1.sources.r1.port = 6666
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.bootstrap.servers = lihl01:9092,lihl02:9092,lihl03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4 啟動(dòng)測(cè)試
1 start-yarn.sh
2 zkServer.sh start
3 kafka-server-start.sh
4 啟動(dòng)flume,后臺(tái)啟動(dòng)
前臺(tái)啟動(dòng)
flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf/ -f /home/netcat_kafka.conf -Dflume.root.logger=INFO,console后臺(tái)啟動(dòng)
nohup flume-ng agent -n a1 -c /opt/apps/flume-1.9.0/conf/ -f /home/netcat_kafka.conf > /dev/null 2>&1 &
5 啟動(dòng)消費(fèi)者腳本
kafka-console-consumer.sh
--topic flume-kafka
--bootstrap-server lihl01:9092,lihl02:9092,lihl03:9092
6 安裝一個(gè)web服務(wù)器
[root@lihl01 home]# yum -y install telnet
7 啟動(dòng)telnet
telnet lihl01 6666