目錄
- 簡(jiǎn)單使用示例
- kafka生產(chǎn)者總體架構(gòu)
- 配置模塊
- 攔截器模塊
- 序列化模塊
- 分區(qū)模塊
- RecordAccumulator模塊
- Sender發(fā)送模塊
- kafka生產(chǎn)者配置對(duì)應(yīng)源碼部分
- 設(shè)計(jì)模式學(xué)習(xí)
簡(jiǎn)單使用示例
public class Test {
private static String topicName;
private static int msgNum;
public static void main(String[] args) {
// 配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*props.put("interceptor.classes", "kafka.productor.ProducerInterceptorDemo,kafka.productor.ProducerInterceptorDemo");*/
topicName = "test1";
// 發(fā)送的消息數(shù)
msgNum = 10;
// KafkaProducer線程安全類
Producer<String, String> producer = new KafkaProducer<>(props);
//發(fā)送帶有回調(diào)的消息
for (int i = 0; i < msgNum; i++) {
String msg = i + " This is seeger's msg." + System.currentTimeMillis();
producer.send( new ProducerRecord<>(topicName, msg), (metadata, exception) -> {
if (exception != null) {
System.out.println("進(jìn)行異常處理" + exception.getMessage());
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
producer.close();
}
}
kafka生產(chǎn)者總體架構(gòu)
-
盜用《apache Kafka源碼剖析》的一張圖
kafka生產(chǎn)者總體架構(gòu).png - 從圖中可以學(xué)習(xí)到:
- 使用KafkaProducer發(fā)送消息時(shí)只是發(fā)送到RecordAccumulator緩存保存。
- 真正將消息發(fā)送到kafka服務(wù)器的是Send線程,這里相當(dāng)于nio的selector。
- 利用RecordAccumulator中緩存隊(duì)列 + send線程來(lái)解決主線程發(fā)送請(qǐng)求跟實(shí)際發(fā)送請(qǐng)求到服務(wù)器的速率不一致問(wèn)題,以達(dá)到提高效率。也完成了主線程發(fā)送請(qǐng)求與實(shí)際發(fā)送請(qǐng)求到服務(wù)器的解耦。相同的做法在- 2000萬(wàn)條數(shù)據(jù)遷移從幾天到幾個(gè)小時(shí)這篇文章也有體現(xiàn)。
- 各個(gè)圖標(biāo)含義
- 首先獲取配置信息根據(jù)配置不同1-11會(huì)有不同表現(xiàn)
- 攔截器攔截
- 序列化Key Value
- 選擇合適的分區(qū)
- RecordAccumulator收集消息批量發(fā)送
- send線程從RecordAccumulator獲取消息
- 構(gòu)造ClientRequest
- 將ClientRequest交給NetWorkClient
- NetWorkClient將請(qǐng)求放入KafkaChannel
- 執(zhí)行真正的網(wǎng)絡(luò)io
- 收到響應(yīng)調(diào)用ClentRequest回調(diào)函數(shù)
- 調(diào)用RecordBatch回調(diào)函數(shù),最終調(diào)用每個(gè)消息上注冊(cè)的回調(diào)函數(shù)。
配置模塊
-
參考kafka手動(dòng)寫(xiě)一個(gè)配置模塊,時(shí)序圖
時(shí)序圖.png main方法
public class Test {
public static void main(String[] args) {
Properties props = new Properties();
props.put(MockProducerConfig.RETRIES_CONFIG, "2");
props.put(MockProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9091");
props.put("key.serializer", "java.util.HashMap");
MockKafkaProducer mockKafkaProducer = new MockKafkaProducer(props);
}
}
// 測(cè)試用,所以簡(jiǎn)單寫(xiě)
public class MockKafkaProducer {
private final Integer retries;
private final List<String> addresses;
//測(cè)試用所以這么寫(xiě)
private final HashMap keySerializer;
public MockKafkaProducer(Properties properties) {
this(new MockProducerConfig(properties));
}
@SuppressWarnings("unchecked")
private MockKafkaProducer(MockProducerConfig config) {
this.retries = config.getInt(MockProducerConfig.RETRIES_CONFIG);
System.out.println("配置已生效retries: " + retries);
addresses = config.getList(MockProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
addresses.stream().forEach((s -> System.out.println("配置已生效addresses :" + s)));
this.keySerializer = config.getConfiguredInstance(MockProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
HashMap.class);
System.out.println("配置已生效keySerializer: " + keySerializer.toString());
}
}
import java.util.Map;
import kafka.productor.config.learn.MockConfigDef.MockType;
/**
* kafka 生產(chǎn)者配置
*/
public class MockProducerConfig extends MockAbstractConfig {
private static final MockConfigDef CONFIG;
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String RETRIES_CONFIG = "retries";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
static {
CONFIG = new MockConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, MockType.LIST, null)
.define(RETRIES_CONFIG, MockType.INT, 0)
.define(KEY_SERIALIZER_CLASS_CONFIG, MockType.CLASS, null);
}
public MockProducerConfig(Map<?, ?> props) {
super(CONFIG, props);
}
}
public class MockAbstractConfig {
/**
* 用戶原始配置
*/
private final Map<String, ?> originals;
/**
* 解析后配置
*/
private final Map<String, Object> values;
@SuppressWarnings("unchecked")
public MockAbstractConfig(MockConfigDef definition, Map<?, ?> originals) {
// 校驗(yàn)key是否都屬于String
Optional keyString = originals.keySet().stream().filter(key -> !(key instanceof String)).findFirst();
if (keyString.isPresent()) {
throw new MockConfigException(keyString.get().toString(), originals.get(keyString.get()), "Key must be a string.");
}
this.originals = (Map<String, ?>) originals;
this.values = definition.parse(this.originals);
}
public Integer getInt(String key) {
return (Integer) get(key);
}
@SuppressWarnings("unchecked")
public List<String> getList(String key) {
return (List<String>) get(key);
}
protected Object get(String key) {
if (!values.containsKey(key)) {
throw new MockConfigException(String.format("Unknown configuration '%s'", key));
}
return values.get(key);
}
/**
* 利用反射實(shí)例化,很好的設(shè)計(jì)值得借鑒,key.serializer, value.serializer都直接復(fù)用這個(gè)接口了
*/
public <T> T getConfiguredInstance(String key, Class<T> t) {
Class<?> c = getClass(key);
if (c == null) {
return null;
}
Object o = Utils.newInstance(c);
if (!t.isInstance(o)) {
throw new MockKafkaException(c.getName() + " is not an instance of " + t.getName());
}
return t.cast(o);
}
public Class<?> getClass(String key) {
return (Class<?>) get(key);
}
}
public class MockConfigDef {
public static final String NO_DEFAULT_VALUE = "";
private final Map<String, MockConfigKey> configKeys = new HashMap<>();
public static class MockConfigKey {
public final String name;
public final MockType type;
public final Object defaultValue;
public MockConfigKey(String name, MockType type, Object defaultValue) {
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
}
public boolean hasDefault() {
return this.defaultValue != NO_DEFAULT_VALUE;
}
}
/**
* 配置類型
*/
public enum MockType {
BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
}
public MockConfigDef define(String name, MockType type, Object defaultValue) {
if (configKeys.containsKey(name)) {
throw new MockConfigException("Configuration " + name + " is defined twice.");
}
Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
configKeys.put(name, new MockConfigKey(name, type, parsedDefault));
return this;
}
public Map<String, Object> parse(Map<?, ?> props) {
Map<String, Object> values = new HashMap<>();
for (MockConfigKey key : configKeys.values()) {
Object value;
// props map contains setting - assign ConfigKey value
if (props.containsKey(key.name)) {
value = parseType(key.name, props.get(key.name), key.type);
// props map doesn't contain setting, the key is required because no default value specified - its an error
} else if (key.defaultValue == NO_DEFAULT_VALUE) {
throw new MockConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
} else {
// otherwise assign setting its default value
value = key.defaultValue;
}
// 當(dāng)然還需要校驗(yàn)傳入?yún)?shù)有效性,這里省略
values.put(key.name, value);
}
return values;
}
private Object parseType(String name, Object value, MockType type) {
try {
if (value == null) {
return null;
}
String trimmed = null;
if (value instanceof String) {
trimmed = ((String) value).trim();
}
switch (type) {
case STRING:
if (value instanceof String) {
return trimmed;
} else {
throw new MockConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
}
case INT:
if (value instanceof Integer) {
return (Integer) value;
} else if (value instanceof String) {
return Integer.parseInt(trimmed);
} else {
throw new MockConfigException(name, value, "Expected value to be an number.");
}
case LIST:
if (value instanceof List) {
return (List<?>) value;
} else if (value instanceof String) {
if (trimmed.isEmpty()) {
return Collections.emptyList();
} else {
return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
}
} else {
throw new MockConfigException(name, value, "Expected a comma separated list.");
}
case CLASS:
if (value instanceof Class) {
return (Class<?>) value;
} else if (value instanceof String) {
return Class.forName(trimmed, true, MockUtils.getContextOrMockKafkaClassLoader());
} else {
throw new MockConfigException(name, value, "Expected a Class instance or class name.");
}
default:
throw new IllegalStateException("Unknown type.");
}
} catch (NumberFormatException e) {
throw new MockConfigException(name, value, "Not a number of type " + type);
} catch (ClassNotFoundException e) {
throw new MockConfigException(name, value, "Class " + value + " could not be found.");
}
}
}
public class MockConfigException extends MockKafkaException {
private static final long serialVersionUID = 1L;
public MockConfigException(String message) {
super(message);
}
public MockConfigException(String name, Object value) {
this(name, value, null);
}
public MockConfigException(String name, Object value, String message) {
super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
}
}
public class MockKafkaException extends RuntimeException {
private final static long serialVersionUID = 1L;
public MockKafkaException(String message, Throwable cause) {
super(message, cause);
}
public MockKafkaException(String message) {
super(message);
}
public MockKafkaException(Throwable cause) {
super(cause);
}
public MockKafkaException() {
super();
}
}
public class MockUtils {
public static ClassLoader getContextOrMockKafkaClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return cl == null ? getMockKafkaClassLoader() : cl;
}
public static ClassLoader getMockKafkaClassLoader() {
return MockUtils.class.getClassLoader();
}
public static <T> T newInstance(Class<T> c) {
try {
return c.newInstance();
} catch (IllegalAccessException e) {
throw new MockKafkaException("Could not instantiate class " + c.getName(), e);
} catch (InstantiationException e) {
throw new MockKafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
} catch (NullPointerException e) {
throw new MockKafkaException("Requested class was null", e);
}
}
}
攔截器模塊
- todo 圖
- 在配置里面添加interceptor.classes,可加入自定義實(shí)現(xiàn)ProducerInterceptor的攔截器
- 配置晚后初始化KafKaProducer時(shí),會(huì)自動(dòng)幫初始化interceptors屬性,改屬性是ProducerInterceptors類,持有我們拓展的ProducerInterceptor集合
- 發(fā)送時(shí)調(diào)用攔截器
序列化模塊
- todo圖
- 在配置里面可以指定key, value指定的序列化實(shí)現(xiàn)類,也可以自定義
- 通過(guò)不同配置,調(diào)用不同實(shí)現(xiàn),相當(dāng)于是策略模式,由客戶端指定序列化方式
分區(qū)模塊
- 優(yōu)先根據(jù)指定partation序號(hào)分區(qū)
- 如果沒(méi)指定那么,Partitioner有默認(rèn)實(shí)現(xiàn)類DefaultPartitioner, 如果消息沒(méi)有key會(huì)根據(jù)count與分區(qū)個(gè)數(shù)取模來(lái)決定分配到哪個(gè)分區(qū),count(AtomicInteger類型保證線程安全)會(huì)不斷自增。如果存在key會(huì)對(duì)key進(jìn)行hash,使用murmur底碰撞Hash算法,然后與分區(qū)數(shù)取模來(lái)達(dá)到負(fù)載均衡。根據(jù)分區(qū)數(shù)來(lái)的,所以分區(qū)數(shù)一定確定不能改變,否則hash會(huì)出現(xiàn)不一致
-
小技巧,使用接口來(lái)達(dá)到配置項(xiàng)與對(duì)外提供的配置項(xiàng)提供統(tǒng)一的參數(shù)
配置項(xiàng)與對(duì)外提供的配置項(xiàng)提供統(tǒng)一的參數(shù)
RecordAccumulator模塊
總覽之并發(fā)安全設(shè)計(jì)
- 隊(duì)列對(duì)象因?yàn)樽x多寫(xiě)少所以用cow,key是TopPartition,所以寫(xiě)少。
- kafka同步發(fā)送如何實(shí)現(xiàn),同步發(fā)送在send()結(jié)果加個(gè)get()即可實(shí)現(xiàn),本質(zhì)上同步是調(diào)用FutureRecordMetadata.get方法, 實(shí)際是使用CountDownLatch實(shí)現(xiàn)。
- NIO需要有Buffer,而創(chuàng)建和銷毀Buff比較耗時(shí),所以Kafka弄了個(gè)個(gè)BufferPool,BufferPool線程安全是仿造AQS實(shí)現(xiàn)
- 有些不可變類的設(shè)計(jì)保證線程安全
- append大體流程(這里只是提取關(guān)鍵信息,詳細(xì)可看后文)
5.1 步驟2 加鎖,以實(shí)現(xiàn)非線程安全隊(duì)列的插入
5.2 步驟5 追加失敗 則向Buffer對(duì)象池中申請(qǐng)對(duì)象,Buffer是線程安全的設(shè)計(jì)
5.3 向Buffer對(duì)象池申請(qǐng)到對(duì)象后,再加鎖,重試插入必須要重試,否則有可能產(chǎn)生內(nèi)存碎片。
數(shù)據(jù)結(jié)構(gòu)

數(shù)據(jù)結(jié)構(gòu).png
- batches: 隊(duì)列,cow類型
- batchSize: 隊(duì)列中RecordBatch底層ByteBuffer大小
- compression:壓縮類型
- incomplete:未發(fā)送完成的RecordBatch集合
- free: BufferPool對(duì)象
- KafkaProducer的send方法最終會(huì)調(diào)用RecordAccumulator.append方法添加到RecordAccumulator中。
前置知識(shí)之隊(duì)列對(duì)象
- RecordAccumulator持有一個(gè)隊(duì)列對(duì)象,這個(gè)隊(duì)列對(duì)象讀多寫(xiě)少,所以用了COW, 而不是用ConcurrentHashMap。TopicPartition 作為Map的key被設(shè)計(jì)成不可變類,不可變用來(lái)put再查找才沒(méi)問(wèn)題,否則設(shè)計(jì)成可變可能查找不到。重寫(xiě)equals一定要重寫(xiě)hashcode 。否則兩對(duì)象相同如果hashcode不同,put map里面的時(shí)候可能存在放到不同桶里面,這就違反了單一數(shù)據(jù)原則。hashcode相同equals可以不同,map里面有體現(xiàn)。
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
@Override
public int hashCode() {
if (hash != 0)
return hash;
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopicPartition other = (TopicPartition) obj;
if (partition != other.partition)
return false;
if (topic == null) {
if (other.topic != null)
return false;
} else if (!topic.equals(other.topic))
return false;
return true;
}
@Override
public String toString() {
return topic + "-" + partition;
}
}
前置知識(shí)之MemoryRecords
- 隊(duì)列對(duì)象的RecordBatch持有MemoryRecords對(duì)象引用,這里是真是保存數(shù)據(jù)的地方,
MemoryRecords持有Compressor對(duì)象,用來(lái)進(jìn)行壓縮處理。Compressor對(duì)象有幾個(gè)比較重要的屬性。如圖裝飾器模式典型應(yīng)用展示,bufferStream通過(guò)添加自動(dòng)擴(kuò)容給ByteBuffer 添加功能,appendStream通過(guò)添加壓縮功能對(duì)bufferStream進(jìn)行功能添加。往recordBatch插入數(shù)據(jù)時(shí)會(huì)判斷MemoryRecords引用的ByteBuffer是否還可插入,可以的話插入,不可以就new一個(gè)新的recordBatch
MemoryRecords:
private final Compressor compressor;
private ByteBuffer buffer;
Compressor:
private final CompressionType type;
private final DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;

裝飾器模式典型應(yīng)用.png
- 看下Compressor如何對(duì)ByteBuffer實(shí)現(xiàn)裝飾
public Compressor(ByteBuffer buffer, CompressionType type) {
// 通過(guò)kafka配置設(shè)置的壓縮類型
this.type = type;
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
// 裝飾器模式GZIPOutputStream jdk自帶
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
// snappy壓縮方式,可以使用反射獲取,這種方式在不額外依賴jar包的情況下,可以不用額外依賴
case SNAPPY:
try {
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
前置知識(shí)之RecordBatch
- kafka同步發(fā)送如何實(shí)現(xiàn),同步發(fā)送在send()結(jié)果加個(gè)get()即可實(shí)現(xiàn),也就是說(shuō)同步是調(diào)用FutureRecordMetadata.get方法, 實(shí)際是使用CountDownLatch。
FutureRecordMetadata:
private final ProduceRequestResult result;
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await();
return valueOrError();
}
ProduceRequestResult:
private final CountDownLatch latch = new CountDownLatch(1);
public void await() throws InterruptedException {
latch.await();
}
-
插入記錄的RecordBatch和插入后返回的FutureRecordMetadata共同持有同一個(gè)ProduceRequestResult, 實(shí)現(xiàn)的手段也比較簡(jiǎn)單,就是講返回 FutureRecordMetadata調(diào)用插入的RecordBatch中方法,再將RecordBatch對(duì)應(yīng)屬性回傳給FutureRecordMetadata。
關(guān)系圖.png - 當(dāng)RecordBatch發(fā)送完成時(shí)會(huì)調(diào)用ProduceRequestResult的done方法。
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
this.topicPartition = topicPartition;
this.baseOffset = baseOffset;
this.error = error;
// 這里實(shí)現(xiàn)了,get同步的結(jié)束
this.latch.countDown();
}
前置知識(shí)之BufferPool
-
ByteBuffer分配和釋放比較耗資源,Kafka實(shí)現(xiàn)了一個(gè)緩存池BufferPool。
image.png - 主要看下分配和釋放資源的代碼,分配資源:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//加鎖同步
this.lock.lock();
try {
// 請(qǐng)求的是指定大小而且free有空閑的則直接從空閑隊(duì)列返回
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// free隊(duì)列都是固定poolableSize大小的
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
// 為了讓availableMemory > size,freeUp方法會(huì)從free隊(duì)列不斷釋放ByteBuff
freeUp(size);
this.availableMemory -= size;
lock.unlock();
// new一個(gè)Heap的ByteBuff
return ByteBuffer.allocate(size);
} else {
// 沒(méi)有足夠空間只能阻塞
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
//放到等待隊(duì)列
this.waiters.addLast(moreMemory);
// 循環(huán)等待
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// 阻塞等待, await會(huì)釋放鎖
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// 異常則移除阻塞隊(duì)列
this.waiters.remove(moreMemory);
throw e;
} finally {
// 時(shí)間統(tǒng)計(jì)
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
// 超時(shí)報(bào)錯(cuò)
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// 有空閑了
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// 先分配一部分等更多的
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// 移除等待隊(duì)列
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// 還是有空閑喚醒下一個(gè)線程
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// 解鎖
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
- 看下釋放空間的
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
// 釋放的是poolableSize大小的,則放入free隊(duì)列管理
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
//釋放的不是poolableSize大小,僅僅修改availableMemory的值
this.availableMemory += size;
}
// 喚醒一個(gè)因空間不足的線程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
append實(shí)現(xiàn)邏輯
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// 統(tǒng)計(jì)正在向RecordAccmulator中追加的線程數(shù)
appendsInProgress.incrementAndGet();
try {
// 步驟1 查找TopPartition對(duì)應(yīng)的Deque
Deque<RecordBatch> dq = getOrCreateDeque(tp);
// 步驟2 加鎖,synchronized不把free.allocate一起加進(jìn)來(lái)的原因是:
// 減少鎖的持有時(shí)間,free.allocate會(huì)阻塞,假設(shè)線程1消息比較大,線程2消息比較小
// 線程1不能插入現(xiàn)有的RecordBatch需要new一個(gè),線程2可以插入,此時(shí)如果free.allocate也在同步代碼塊
// 如果線程2這樣的線程比較多,則會(huì)造成多個(gè)線程阻塞。
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
// 步驟3 追加record
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
// 步驟4 成功則直接返回
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
// 步驟5 追加失敗 則向Buffer對(duì)象池中申請(qǐng)對(duì)象
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
// 這里加鎖是為了避免內(nèi)存碎片,如果不加鎖多線程可能會(huì)創(chuàng)建多個(gè)RecordBatch,后續(xù)線程插入只插入最后一個(gè)
// 那么,有幾個(gè)已經(jīng)RecordBatch就不會(huì)被插入。
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
// 步驟7 追加record,重試的原因?yàn)? 此時(shí)如果有多個(gè)線程阻塞在free.allocate,某個(gè)線程成功了,則新建RecordBatch
// 其他線程插入此時(shí)的RecordBatch即可
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) {
// 步驟8 追加成功需釋放資源并返回,因?yàn)橹挥衝ew的RecordBatch才需要申請(qǐng)新的Buffer
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
// 步驟9 在新建的RecordBatch中新增record
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
// 步驟10 添加到incomplete中
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
Sender發(fā)送模塊
- kafka是封裝了java的nio,所以如果沒(méi)有nio基礎(chǔ)需要先學(xué)習(xí)Nio以便更好理解sender發(fā)送模塊
發(fā)送時(shí)序圖

發(fā)送時(shí)序圖.png
- 這里主要講send和poll方法,因?yàn)檫@里是真正出發(fā)請(qǐng)求發(fā)送到Node的源碼
send和poll主體流程

send和poll主體流程.png
- 這里send只是將需要發(fā)送的請(qǐng)求標(biāo)記為SelectionKey.OP_WRITE,真正發(fā)送的主體還是poll流程
poll流程
public void poll(long timeout) throws IOException {
/* check ready keys */
int numReadyKeys = select(timeout);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
pollSelectionKeys(readyKeys, false, endSelect);
// 清除所有SelectionKey,避免下一次在進(jìn)行處理
readyKeys.clear();
//處理發(fā)起連接時(shí),馬上就建立連接的請(qǐng)求,這種一般只在broker和client在同一臺(tái)機(jī)器上才存在
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
}
//將暫存起來(lái)的網(wǎng)絡(luò)響應(yīng)添加到已完成網(wǎng)絡(luò)響應(yīng)集合里面
addToCompletedReceives();
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
boolean sendFailed = false;
//READ事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
//read方法會(huì)從channel中將數(shù)據(jù)讀取到Buffer中(還是通過(guò)KafkaChannel中的transportLayer),
while ((networkReceive = channel.read()) != null) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
//將讀到的請(qǐng)求存起來(lái)
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
}
//寫(xiě)事件
if (channel.ready() && key.isWritable()) {
//從buffer中寫(xiě)入數(shù)據(jù)到Channel(KafkaChannel中的transportLayer)
Send send = channel.write();
}
}
- 為何沒(méi)注冊(cè) 讀也可以讀key.interestOps可以多熟悉
finish connect的時(shí)候這么做, 寫(xiě)入之后有寫(xiě)回就可以讀了
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- 時(shí)序性保證
- 應(yīng)用在 Server 端時(shí),Server 為了保證消息的時(shí)序性,在 Selector 中提供了兩個(gè)方法:mute(String id) 和 unmute(String id),對(duì)該 KafkaChannel 做標(biāo)記來(lái)保證同時(shí)只能處理這個(gè) Channel 的一個(gè) request(可以理解為排它鎖)。當(dāng) Server 端接收到 request 后,先將其放入 stagedReceives 集合中,此時(shí)該 Channel 還未 mute,這個(gè) Receive 會(huì)被放入 completedReceives 集合中。Server 在對(duì) completedReceives 集合中的 request 進(jìn)行處理時(shí),會(huì)先對(duì)該 Channel mute,處理后的 response 發(fā)送完成后再對(duì)該 Channel unmute,然后才能處理該 Channel 其他的請(qǐng)求
- 應(yīng)用在 Client 端時(shí),Client 并不會(huì)調(diào)用 Selector 的 mute() 和 unmute() 方法,client 發(fā)送消息的時(shí)序性而是通過(guò) InFlightRequests(保存了max.in.flight.requests.per.connection參數(shù)的值) 和 RecordAccumulator 的 mutePartition 來(lái)保證的,因此對(duì)于 Client 端而言,這里接收到的所有 Receive 都會(huì)被放入到 completedReceives 的集合中等待后續(xù)處理。
- 傳輸過(guò)程中tcp能保證消息按順序到達(dá)
- 總結(jié)下發(fā)送流程
- sender 線程第一次調(diào)用 poll() 方法時(shí),初始化與 node 的連接;
- sender 線程第二次調(diào)用 poll() 方法時(shí),發(fā)送 Metadata 請(qǐng)求;
- sender 線程第三次調(diào)用 poll() 方法時(shí),獲取 metadataResponse,并更新 metadata。
kafka生產(chǎn)者配置對(duì)應(yīng)源碼部分
| 參數(shù)名稱 | 說(shuō)明 | 默認(rèn)值 |
|---|---|---|
| acks | 用于設(shè)置在什么情況一條才被認(rèn)為已經(jīng)發(fā)送成功了。acks=0:msg 只要被 producer 發(fā)送出去就認(rèn)為已經(jīng)發(fā)送完成了;acks=1:如果 leader 接收到消息并發(fā)送 ack (不會(huì)等會(huì)該 msg 是否同步到其他副本)就認(rèn)為 msg 發(fā)送成功了; acks=all或者-1:leader 接收到 msg 并從所有 isr 接收到 ack 后再向 producer 發(fā)送 ack,這樣才認(rèn)為 msg 發(fā)送成功了,這是最高級(jí)別的可靠性保證。 | 1 |
| buffer.memory | producer 可以使用的最大內(nèi)存,如果超過(guò)這個(gè)值,producer 將會(huì) block max.block.ms 之后拋出異常 | 32mb |
| compression.type | Producer 數(shù)據(jù)的壓縮格式,可以選擇 none、gzip、snappy、lz4, 策略模式實(shí)現(xiàn) | none |
| retries | msg 發(fā)送失敗后重試的次數(shù),允許重試,如果 max.in.flight.requests.per.connection 設(shè)置不為1,可能會(huì)導(dǎo)致亂序. 實(shí)現(xiàn)原理是可重試的異常重新丟入隊(duì)列 | 0 |
| batch.size | producer 向 partition 發(fā)送數(shù)據(jù)時(shí),是以 batch 形式的發(fā)送數(shù)據(jù),當(dāng) batch 的大小超過(guò) batch.size 或者時(shí)間達(dá)到 linger.ms 就會(huì)發(fā)送 batch | 16kb |
| linger.ms | 在一個(gè) batch 達(dá)不到 batch.size 時(shí),這個(gè) batch 最多將會(huì)等待 linger.ms 時(shí)間,超過(guò)這個(gè)時(shí)間這個(gè) batch 就會(huì)被發(fā)送 | 0 |
| max.in.flight.requests.per.connection | 對(duì)一個(gè) connection,同時(shí)發(fā)送最大請(qǐng)求數(shù),不為1時(shí),不能保證順序性。 | 5 |
設(shè)計(jì)模式學(xué)習(xí)
- 配置時(shí)配置TransportLayer的實(shí)現(xiàn)類,運(yùn)行時(shí)根據(jù)配置參數(shù)決定具體實(shí)現(xiàn)類
- Builder模式,解析配置時(shí)使用, 效果類似連續(xù). append().append()
- 裝飾器模式 new DatOp(new BuyyteOp)




