1. schema 注冊表
無論是使用傳統(tǒng)的Avro API自定義序列化類和反序列化類還是使用Twitter的Bijection類庫實現(xiàn)Avro的序列化與反序列化,這兩種方法都有一個缺點:在每條Kafka記錄里都嵌入了schema,這會讓記錄的大小成倍地增加。但是不管怎樣,在讀取記錄時仍然需要用到整個 schema,所以要先找到 schema。有沒有什么方法可以讓數(shù)據(jù)共用一個schema?
我們遵循通用的結(jié)構(gòu)模式并使用"schema注冊表"來達到目的。"schema注冊表"的原理如下:

把所有寫入數(shù)據(jù)需要用到的 schema 保存在注冊表里,然后在記錄里引用 schema 的 ID。負責(zé)讀取數(shù)據(jù)的應(yīng)用程序使用 ID 從注冊表里拉取 schema 來反序列化記錄。序列化器和反序列化器分別負責(zé)處理 schema 的注冊和拉取。
schema注冊表并不屬于Kafka,現(xiàn)在已經(jīng)有一些開源的schema 注冊表實現(xiàn)。比如本文要討論的Confluent Schema Registry。
2. 案例說明
現(xiàn)有 schema 文件 user.json,其中內(nèi)容如下:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
需求:把這個 schema 中的內(nèi)容注冊到 Confluent Schema Registry 中,Kafka Producer 和 Kafka Consumer 通過識別 Confluent Schema Registry 中的 schema 內(nèi)容來序列化和反序列化。
3. 實操步驟
(1) 啟動 Confluent Schema Registry 服務(wù)
- Confluent 下載地址:https://www.confluent.io/download/,我這里使用confluent-oss-4.1.1-2.11.tar.gz
- 下載好后上傳到服務(wù)器,解壓即可用
- 進入confluent-4.1.1/etc/schema-registry/目錄下,修改schema-registry.properties文件,內(nèi)容及注釋如下:
# Confluent Schema Registry 服務(wù)的訪問IP和端口
listeners=http://192.168.42.89:8081
# Kafka集群所使用的zookeeper地址,如果不配置,會使用Confluent內(nèi)置的Zookeeper地址(localhost:2181)
kafkastore.connection.url=192.168.42.89:2181/kafka-1.1.0-cluster
# Kafka集群的地址(上一個參數(shù)和這個參數(shù)配置一個就可以了)
# kafkastore.bootstrap.servers=192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094
# 存儲 schema 的 topic
kafkastore.topic=_schemas
# 其余保持默認即可
- 啟動 Confluent Schema Registry
[root@confluent confluent-4.1.1]# bin/schema-registry-start etc/schema-registry/schema-registry.properties
# 省略一些內(nèi)容......
[2018-06-22 16:10:26,442] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
(2) 注冊 User 的 schema 注冊到對應(yīng)的 topic 下
- 首先把原來的 schema 文件加上 "schema" 標(biāo)記
{
"schema": "{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}"
}
- 部分
"需要轉(zhuǎn)義:
{
"schema": "{
\"type\": \"record\",
\"name\": \"User\",
\"fields\": [
{\"name\": \"id\", \"type\": \"int\"},
{\"name\": \"name\", \"type\": \"string\"},
{\"name\": \"age\", \"type\": \"int\"}
]
}"
}
- 注冊 schema 的命令如下
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '' \
http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
說明:
<1> ''之間需要填寫schema字符串
<2> 我用來測試的 topic 為 dev3-yangyunhe-topic001,而且我只對 Kafka 的 value 進行 avro 的序列化,所以注冊的地址為http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
<3> http://192.168.42.89:8081需要根據(jù)自己的配置進行修改
- 把轉(zhuǎn)義后 schema 填充到
--data ''的兩個單引號中
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
- 注冊成功會返回這個 schema 的 ID
{"id":102}
(3) 在 maven 工程中引入 Confluent Schema Registry 相關(guān)的 jar 包
這些 jar 包在 maven 倉庫中下載不到,需要自己手動添加到集群中,confluent-4.1.1 解壓后,其 share/java/目錄下有 confluent 各個組件的 jar 包:

我們需要 confluent-common 目錄下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson開頭的 jar 包以及 kafka-serde-tools 目錄下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar,關(guān)于如何添加本地的 jar 包到 java 工程中,本文不再贅述。
(4) Kafka Producer 發(fā)送數(shù)據(jù)
package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @Title ConfluentProducer.java
* @Description 使用Confluent實現(xiàn)的Schema Registry服務(wù)來發(fā)送Avro序列化后的對象
* @Author YangYunhe
* @Date 2018-06-25 10:49:19
*/
public class ConfluentProducer {
public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
"{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用Confluent實現(xiàn)的KafkaAvroSerializer
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 添加schema服務(wù)的地址,用于獲取schema
props.put("schema.registry.url", "http://192.168.42.89:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while(id < 100) {
id++;
String name = "name" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id", id);
user.put("name", name);
user.put("age", age);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("dev3-yangyunhe-topic001", user);
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
(5) Kafka Consumer 消費數(shù)據(jù)
package com.bonc.rdpe.kafka110.consumer;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @Title ConfluentConsumer.java
* @Description 使用Confluent實現(xiàn)的Schema Registry服務(wù)來消費Avro序列化后的對象
* @Author YangYunhe
* @Date 2018-06-25 11:42:21
*/
public class ConfluentConsumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("group.id", "dev3-yangyunhe-group001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用Confluent實現(xiàn)的KafkaAvroDeserializer
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// 添加schema服務(wù)的地址,用于獲取schema
props.put("schema.registry.url", "http://192.168.42.89:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
(6) 測試結(jié)果
Kafka Consumer 的控制臺輸出內(nèi)容如下:
value = [user.id = 1, user.name = name1, user.age = 20], partition = 1, offset = 696
value = [user.id = 2, user.name = name2, user.age = 27], partition = 0, offset = 696
value = [user.id = 3, user.name = name3, user.age = 35], partition = 2, offset = 695
value = [user.id = 4, user.name = name4, user.age = 7], partition = 1, offset = 697
value = [user.id = 5, user.name = name5, user.age = 34], partition = 0, offset = 697
......