在前一篇文章爬蟲架構(gòu)|利用Kafka處理數(shù)據(jù)推送問題(1)中對(duì)Kafka做了一個(gè)介紹,以及環(huán)境搭建,最后是選擇使用阿里云的Kafka,這一篇文章繼續(xù)說使用阿里云的Kafka的一些知識(shí)。
一、發(fā)布者最佳實(shí)踐
發(fā)布的完整代碼(根據(jù)自己的業(yè)務(wù)做相應(yīng)處理):
package com.yimian.controller.kafka;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSON;
import com.yimian.model.SpiderData;
/**
* 生產(chǎn)者
*
* @author huangtao
*
*/
@Controller
@RequestMapping(value = "kafka/producer")
public class KafkaProducerController {
private static Producer<String, String> producer;
private static Properties kafkaProperties;
static {
// 設(shè)置sasl文件的路徑
JavaKafkaConfigurer.configureSasl();
// 加載kafka.properties
kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// 設(shè)置接入點(diǎn),請(qǐng)通過控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
// 設(shè)置SSL根證書的路徑,請(qǐng)記得將XXX修改為自己的路徑
// 與sasl路徑類似,該文件也不能被打包到j(luò)ar中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
// 根證書store的密碼,保持不變
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
// 接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL鑒權(quán)方式,保持不變
props.put(SaslConfigs.SASL_MECHANISM, "ONS");
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 請(qǐng)求的最長等待時(shí)間
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// 構(gòu)造Producer對(duì)象,注意,該對(duì)象是線程安全的,一般來說,一個(gè)進(jìn)程內(nèi)一個(gè)Producer對(duì)象即可;
// 如果想提高性能,可以多構(gòu)造幾個(gè)對(duì)象,但不要太多,最好不要超過5個(gè)
producer = new KafkaProducer<String, String>(props);
}
/**
* 發(fā)送消息給kafka
* @param topic
* @param msg
*/
public static void sendMsgToKafka(String topic, SpiderData msg) {
try {
// 發(fā)送消息,并獲得一個(gè)Future對(duì)象
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(topic, String.valueOf(new Date().getTime()),
JSON.toJSONString(msg)));
// 同步獲得Future對(duì)象的結(jié)果
RecordMetadata recordMetadata = metadataFuture.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Exception e) {
/**
* 要考慮重試~
* 在分布式環(huán)境下,由于網(wǎng)絡(luò)等原因,偶爾的發(fā)送失敗是常見的。這種失敗有可能是消息已經(jīng)發(fā)送成功,但是 Ack 失敗,也有可能是確實(shí)沒發(fā)送成功。
* 消息隊(duì)列 Kafka 是 VIP 網(wǎng)絡(luò)架構(gòu),會(huì)主動(dòng)掐掉空閑連接(一般 30 秒沒活動(dòng)),也就是說,不是一直活躍的客戶端會(huì)經(jīng)常收到”connection rest by peer”這樣的錯(cuò)誤,因此建議都考慮重試。
*/
// 參考常見報(bào)錯(cuò):
// https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
System.out.println("error occurred");
e.printStackTrace();
}
}
@RequestMapping(value = "init", produces = "text/html;charset=UTF-8")
@ResponseBody
public void init() {
// 構(gòu)造一個(gè)Kafka消息
String topic = kafkaProperties.getProperty("topic"); // 消息所屬的Topic,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后,填寫在這里
SpiderData data = new SpiderData();
data.setDescUrl("www.baidu.com");
data.setTitle("百度");
sendMsgToKafka(topic, data);
}
}
Kafka的發(fā)送非常簡單,代碼片段如下:
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
topic, \\ topic
null, \\ 分區(qū)編號(hào),這里最好為 null,交給 producer 去分配
System.currentTimeMillis(), \\時(shí)間戳
String.valueOf(message.hashCode()), \\ key,可以在控制臺(tái)通過這個(gè) Key 查找消息,這個(gè) key 最好唯一;
message)); \\ value,消息內(nèi)容
message可以是一個(gè)JSON類型的對(duì)象,如上面例子中的JSON.toJSONString(new SpiderData())
1.1、Key 和 Value
Kafka 0.10.0.0 的消息字段只有兩個(gè):Key 和 Value。為了便于追蹤,重要消息最好都設(shè)置一個(gè)唯一的 Key。通過 Key 追蹤某消息,打印發(fā)送日志和消費(fèi)日志,了解該消息的發(fā)送和消費(fèi)情況;更重要的是,您可以在控制臺(tái)可以根據(jù) Key 查詢消息的內(nèi)容。
1.2、失敗重試
在分布式環(huán)境下,由于網(wǎng)絡(luò)等原因,偶爾的發(fā)送失敗是常見的。這種失敗有可能是消息已經(jīng)發(fā)送成功,但是 Ack 失敗,也有可能是確實(shí)沒發(fā)送成功。
消息隊(duì)列 Kafka 是 VIP 網(wǎng)絡(luò)架構(gòu),會(huì)主動(dòng)掐掉空閑連接(一般 30 秒沒活動(dòng)),也就是說,不是一直活躍的客戶端會(huì)經(jīng)常收到”connection rest by peer”這樣的錯(cuò)誤,因此建議都考慮重試。
1.3、異步發(fā)送
需要注意的是這個(gè)接口是異步發(fā)送的;如果你想得到發(fā)送的結(jié)果,可以調(diào)用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
1.4、線程安全
Producer 是線程安全的,且可以往任何 Topic 發(fā)送消息。一般一個(gè)應(yīng)用,對(duì)應(yīng)一個(gè) Producer 就足夠了。
1.5、Ack
消息隊(duì)列 Kafka 沒有考慮這個(gè)參數(shù),都認(rèn)為是“all”,即所有消息同步到 Slave 節(jié)點(diǎn)后才會(huì)返回成功的確認(rèn)消息給客戶端。
1.6、Batch
Batch 的基本思路是:把消息緩存在內(nèi)存中,并進(jìn)行打包發(fā)送。Kafka 通過 Batch 來提高吞吐,但同時(shí)也會(huì)增加延遲,生產(chǎn)時(shí)應(yīng)該對(duì)兩者予以權(quán)衡。
在構(gòu)建 Producer 時(shí),需要考慮以下兩個(gè)參數(shù):
-
batch.size: 發(fā)往每個(gè) Partition 的消息個(gè)數(shù)緩存量達(dá)到這個(gè)數(shù)值時(shí),就會(huì)觸發(fā)一次網(wǎng)絡(luò)請(qǐng)求,把消息真正發(fā)往服務(wù)器; -
linger.ms: 每個(gè)消息待在緩存中的最大時(shí)間,超過這個(gè)時(shí)間,就會(huì)忽略batch.size的限制,立即把消息發(fā)往服務(wù)器。
由此可見,Kafka 什么時(shí)候把消息真正發(fā)往服務(wù)器,是通過上面兩個(gè)參數(shù)共同決定的;
batch.size 有助于提高吞吐,linger.ms 有助于控制延遲。您可以根據(jù)具體業(yè)務(wù)進(jìn)行調(diào)整。
1.7、OOM
結(jié)合 Kafka Batch 的設(shè)計(jì)思路,Kafka 會(huì)緩存消息并打包發(fā)送,如果緩存太多,則有可能造成 OOM。
-
buffer.memory: 所有緩存消息的總體大小超過這個(gè)數(shù)值后,就會(huì)觸發(fā)把消息發(fā)往服務(wù)器。此時(shí)會(huì)忽略batch.size和linger.ms的限制。 -
buffer.memory的默認(rèn)數(shù)值是 32M,對(duì)于單個(gè) Producer 來說,可以保證足夠的性能。需要注意的是,如果你在同一個(gè) JVM 中啟動(dòng)多個(gè) Producer,那么每個(gè) Producer 都有可能占用32M 緩存空間,此時(shí)便有可能觸發(fā) OOM。 - 在生產(chǎn)時(shí),一般沒有必要啟動(dòng)多個(gè) Producer;如果特殊情況需要,則需要考慮
buffer.memory的大小,避免觸發(fā) OOM。
1.8、分區(qū)順序
單個(gè)分區(qū)內(nèi),消息是按照發(fā)送順序儲(chǔ)存的,是基本有序的。
但消息隊(duì)列 Kafka 并不保證單個(gè)分區(qū)內(nèi)絕對(duì)有序,所以在某些情況下,會(huì)發(fā)生少量消息亂序。比如:消息隊(duì)列 Kafka 為了提高可用性,某個(gè)分區(qū)掛掉后把消息 Failover 到其它分區(qū)。
二、訂閱者最佳實(shí)踐
消費(fèi)的完整代碼(根據(jù)自己的業(yè)務(wù)做相應(yīng)處理):
package com.yimian.controller.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yimian.model.SpiderData;
/**
* 消費(fèi)者
*
* @author huangtao
*
*/
@Controller
@RequestMapping(value = "kafka/consumer")
public class KafkaConsumerController {
private static Consumer<String, String> consumer;
static {
// 設(shè)置sasl文件的路徑
JavaKafkaConfigurer.configureSasl();
// 加載kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// 設(shè)置接入點(diǎn),請(qǐng)通過控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
// 設(shè)置SSL根證書的路徑,請(qǐng)記得將XXX修改為自己的路徑
// 與sasl路徑類似,該文件也不能被打包到j(luò)ar中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
// 根證書store的密碼,保持不變
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
// 接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL鑒權(quán)方式,保持不變
props.put(SaslConfigs.SASL_MECHANISM, "ONS");
// 兩次poll之間的最大允許間隔
// 請(qǐng)不要改得太大,服務(wù)器會(huì)掐掉空閑連接,不要超過30000
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);
// 每次poll的最大數(shù)量
// 注意該值不要改得太大,如果poll太多數(shù)據(jù),而不能在下次poll之前消費(fèi)完,則會(huì)觸發(fā)一次負(fù)載均衡,產(chǎn)生卡頓
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
// 消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 當(dāng)前消費(fèi)實(shí)例所屬的消費(fèi)組,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后填寫
// 屬于同一個(gè)組的消費(fèi)實(shí)例,會(huì)負(fù)載消費(fèi)消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
// 構(gòu)造消息對(duì)象,也即生成一個(gè)消費(fèi)實(shí)例
consumer = new KafkaConsumer<String, String>(props);
// 設(shè)置消費(fèi)組訂閱的Topic,可以訂閱多個(gè)
// 如果GROUP_ID_CONFIG是一樣,則訂閱的Topic也建議設(shè)置成一樣
List<String> subscribedTopics = new ArrayList<String>();
// 如果需要訂閱多個(gè)Topic,則在這里add進(jìn)去即可
// 每個(gè)Topic需要先在控制臺(tái)進(jìn)行創(chuàng)建
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
}
@RequestMapping(value = "init", produces = "text/html;charset=UTF-8")
@ResponseBody
public void init() {
// 循環(huán)消費(fèi)消息
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// 必須在下次poll之前消費(fèi)完這些數(shù)據(jù), 且總耗時(shí)不得超過SESSION_TIMEOUT_MS_CONFIG
// 建議開一個(gè)單獨(dú)的線程池來消費(fèi)消息,然后異步返回結(jié)果
for (ConsumerRecord<String, String> record : records) {
JSONObject jsonMsg = JSON.parseObject(record.value());
SpiderData spiderData = JSONObject.toJavaObject(jsonMsg, SpiderData.class);
System.out.println(spiderData.toString());
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
// 參考常見報(bào)錯(cuò):
// https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
e.printStackTrace();
}
}
}
}
消費(fèi)時(shí)把JSON數(shù)據(jù)反序列化:
for (ConsumerRecord<String, String> record : records) {
JSONObject jsonMsg = JSON.parseObject(record.value());
SpiderData spiderData = JSONObject.toJavaObject(jsonMsg, SpiderData.class);
}
2.1、消費(fèi)消息基本流程
Kafka 訂閱者在訂閱消息時(shí)的基本流程是:
- Poll 數(shù)據(jù)
- 執(zhí)行消費(fèi)邏輯
- 再次 poll 數(shù)據(jù)
2.2、負(fù)載消費(fèi)
每個(gè) Consumer Group 可以包含多個(gè)消費(fèi)實(shí)例,也即可以啟動(dòng)多個(gè) Kafka Consumer,并把參數(shù) group.id 設(shè)置成相同的值。屬于同一個(gè) Consumer Group 的消費(fèi)實(shí)例會(huì)負(fù)載消費(fèi)訂閱的 topic。
示例1:Consumer Group A 訂閱了 Topic A,并開啟三個(gè)消費(fèi)實(shí)例 C1、C2、C3,則發(fā)送到 Topic A 的每條消息最終只會(huì)傳給 C1、C2、C3 的某一個(gè)。Kafka 默認(rèn)會(huì)均勻地把消息傳給各個(gè)消息實(shí)例,以做到消費(fèi)負(fù)載均衡。
Kafka 負(fù)載消費(fèi)的內(nèi)部原理是,把訂閱的 Topic 的分區(qū),平均分配給各個(gè)消費(fèi)實(shí)例。因此,消費(fèi)實(shí)例的個(gè)數(shù)不要大于分區(qū)的數(shù)量,否則會(huì)有實(shí)例分配不到任何分區(qū)而處于空跑狀態(tài)。這個(gè)負(fù)載均衡發(fā)生的時(shí)間,除了第一次啟動(dòng)上線之外,后續(xù)消費(fèi)實(shí)例發(fā)生重啟、增加、減少等變更時(shí),都會(huì)觸發(fā)一次負(fù)載均衡。
消息隊(duì)列 Kafka 分區(qū)的數(shù)量至少是 16 個(gè),已經(jīng)足夠滿足大部分用戶的需求,且云上服務(wù)會(huì)根據(jù)容量調(diào)整分區(qū)數(shù)。
2.3、多個(gè)訂閱
一個(gè) Consumer Group 可以訂閱多個(gè) Topic。一個(gè) Topic 也可以被多個(gè) Consumer Group 訂閱,且各個(gè) Consumer Group 獨(dú)立消費(fèi) Topic 下的所有消息。
示例1:Consumer Group A 訂閱了 Topic A,Consumer Group B 也訂閱了 Topic A,則發(fā)送到 Topic A 的每條消息,不僅會(huì)傳一份給 Consumer Group A 的消費(fèi)實(shí)例,也會(huì)傳一份給 Consumer Group B 的消費(fèi)實(shí)例,且這兩個(gè)過程相互獨(dú)立,相互沒有任何影響。
2.4、消費(fèi)位點(diǎn)
每個(gè) Topic 會(huì)有多個(gè)分區(qū),每個(gè)分區(qū)會(huì)統(tǒng)計(jì)當(dāng)前消息的總條數(shù),這個(gè)稱為最大位點(diǎn) MaxOffset。Kafka Consumer 會(huì)按順序依次消費(fèi)分區(qū)內(nèi)的每條消息,記錄已經(jīng)消費(fèi)了的消息條數(shù),稱為ConsumerOffset。
剩余的未消費(fèi)的條數(shù)(也稱為消息堆積量) = MaxOffset - ConsumerOffset
2.5、位點(diǎn)提交
Kafka 消費(fèi)者有兩個(gè)相關(guān)參數(shù):
-
enable.auto.commit:默認(rèn)值為 true。 -
auto.commit.interval.ms: 默認(rèn)值為 1000,也即 1s。
這兩個(gè)參數(shù)組合的結(jié)果就是,每次 poll 時(shí),再拉取數(shù)據(jù)前會(huì)預(yù)先做下面這件事:
- 檢查上次提交位點(diǎn)的時(shí)間,如果距離當(dāng)前時(shí)間已經(jīng)超過 auto.commit.interval.ms,則啟動(dòng)位點(diǎn)提交動(dòng)作;
因此,如果 enable.auto.commit 設(shè)置為 true,需要在每次 poll 時(shí),確保前一次 poll 出來的數(shù)據(jù)已經(jīng)消費(fèi)完畢,否則可能導(dǎo)致位點(diǎn)跳躍;
如果想自己控制位點(diǎn)提交,則把 enable.auto.commit 設(shè)為 false,并調(diào)用 commit(offsets)函數(shù)自行控制位點(diǎn)提交。
2.6、消息重復(fù)以及消費(fèi)冪等
Kafka 消費(fèi)的語義是 “At Lease Once”, 也就是至少投遞一次,保證消息不丟,但是不會(huì)保證消息不重復(fù)。在出現(xiàn)網(wǎng)絡(luò)問題、客戶端重啟時(shí)均有可能出現(xiàn)少量重復(fù)消息,此時(shí)應(yīng)用消費(fèi)端,如果對(duì)消息重復(fù)比較敏感(比如說訂單交易類),則應(yīng)該做到消息冪等。
以數(shù)據(jù)庫類應(yīng)用為例,常用做法是:
- 發(fā)送消息時(shí),傳入 key 作為唯一流水號(hào)ID;
- 消費(fèi)消息時(shí),判斷 key 是否已經(jīng)消費(fèi)過,如果已經(jīng)消費(fèi)過了,則忽略,如果沒消費(fèi)過,則消費(fèi)一次;
當(dāng)然,如果應(yīng)用本身對(duì)少量消息重復(fù)不敏感,則不需要做此類冪等檢查。
2.7、消費(fèi)失敗
Kafka 是按分區(qū)一條一條消息順序向前消費(fèi)推進(jìn)的,如果消費(fèi)端拿到某條消息后消費(fèi)邏輯失敗,比如應(yīng)用服務(wù)器出現(xiàn)了臟數(shù)據(jù),導(dǎo)致某條消息處理失敗,等待人工干預(yù),該怎么辦呢?
- 如果失敗后一直嘗試再次執(zhí)行消費(fèi)邏輯,則有可能造成消費(fèi)線程阻塞在當(dāng)前消息,無法向前推進(jìn),造成消息堆積;
- 由于 Kafka 自身沒有處理失敗消息的設(shè)計(jì),實(shí)踐中通常會(huì)打印失敗的消息、或者存儲(chǔ)到某個(gè)服務(wù)(比如創(chuàng)建一個(gè) Topic 專門用來放失敗的消息),然后定時(shí) check 失敗消息的情況,分析失敗原因,根據(jù)情況處理。
2.8、消費(fèi)阻塞以及堆積
消費(fèi)端最常見的問題就是消費(fèi)堆積,最常造成堆積的原因是:
- 消費(fèi)速度跟不上生產(chǎn)速度,此時(shí)應(yīng)該提高消費(fèi)速度,詳情見本文下一節(jié)<提高消費(fèi)速度>;
- 消費(fèi)端產(chǎn)生了阻塞;
消費(fèi)端拿到消息后,執(zhí)行消費(fèi)邏輯,通常會(huì)執(zhí)行一些遠(yuǎn)程調(diào)用,如果這個(gè)時(shí)候同步等待結(jié)果,則有可能造成一直等待,消費(fèi)進(jìn)程無法向前推進(jìn)。
消費(fèi)端應(yīng)該竭力避免堵塞消費(fèi)線程,如果存在等待調(diào)用結(jié)果的情況,設(shè)置等待的超時(shí)時(shí)間,超過時(shí)間后,作消費(fèi)失敗處理。
2.9、提高消費(fèi)速度
提高消費(fèi)速度有兩個(gè)辦法:
- 增加 Consumer 實(shí)例個(gè)數(shù);
- 增加消費(fèi)線程;
增加 Consumer 實(shí)例,可以在進(jìn)程內(nèi)直接增加(需要保證每個(gè)實(shí)例一個(gè)線程,否則沒有太大意義),也可以部署多個(gè)消費(fèi)實(shí)例進(jìn)程;需要注意的是,實(shí)例個(gè)數(shù)超過分區(qū)數(shù)量后就不再能提高速度,將會(huì)有消費(fèi)實(shí)例不工作;
增加 Consumer 實(shí)例本質(zhì)上也是增加線程的方式來提升速度,因此更加重要的性能提升方式是增加消費(fèi)線程,最基本的方式如下:
- 定義一個(gè)線程池;
- poll 數(shù)據(jù);
- 把數(shù)據(jù)提交到線程池進(jìn)行并發(fā)處理;
- 等并發(fā)結(jié)果返回成功再次poll數(shù)據(jù)執(zhí)行。
2.10消息過濾
Kafka 自身沒有消息過濾的語義。實(shí)踐中可以采取以下兩個(gè)辦法:
- 如果過濾的種類不多,可以采取多個(gè) Topic 的方式達(dá)到過濾的目的;
- 如果過濾的種類多,則最好在客戶端業(yè)務(wù)層面自行過濾。
實(shí)踐中根據(jù)業(yè)務(wù)具體情況進(jìn)行選擇,可以綜合運(yùn)用上面兩種辦法。
2.11、消息廣播
Kafka 自身沒有消息廣播的語義,可以通過創(chuàng)建不同的 Consumer Group來模擬實(shí)現(xiàn)。
2.12、訂閱關(guān)系
同一個(gè) Consumer Group 內(nèi),各個(gè)消費(fèi)實(shí)例訂閱的 Topic 最好保持一致,避免給排查問題帶來干擾。
參考資料:阿里云消息隊(duì)列Kafka的幫助文檔