kafka設置發(fā)送消息的大小

一、kafka默認的發(fā)送一條消息的大小是1M,如果不配置,當發(fā)送的消息大于1M是,就會報錯

[2018-07-03 14:49:38,411] ERROR Error when sending message to topic testTopic with key: null, value: 2095476 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 2095510 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

二、具體配置

1、 kafka topic

/bin/kafka-topics.sh --zookeeper 192.168.6.102:2181 --alter --topic testTopic --config max.message.bytes=52428800

2、server.properties中添加

message.max.bytes=5242880(5M)
replica.fetch.max.bytes=6291456(6M)每個分區(qū)試圖獲取的消息字節(jié)數(shù)。要大于等于message.max.bytes

3、producer.properties中添加

max.request.size = 5242880 (5M)請求的最大大小為字節(jié)。要小于 message.max.bytes

4、consumer.properties中添加

fetch.message.max.bytes=6291456(6M)每個提取請求中為每個主題分區(qū)提取的消息字節(jié)數(shù)。要大于等于message.max.bytes

5、在生產(chǎn)端使用java發(fā)送消息

public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "node1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // key.serializer.class默認為serializer.class
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("max.request.size", "52428800");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
            producer.send(new KeyedMessage<String, String>(
                    "testkafka", "jack",
                    "rose"));
        producer.close();
    }

三、重啟

1、更改完配置要重啟kafka server才能生效

1.1、先停止kafka.
a、通過命令:bin/kafka-server-stop.sh
b、找到kafka進程,命令:ps -ef | grep kafka,然后kill掉
1.2、啟動kafka server:
nohup bin/kafka-server-start.sh config/server.properties&

2、重新執(zhí)行生產(chǎn)端的命令

./bin/kafka-console-producer.sh  --broker-list 192.168.6.102:8997 --topic testTopic < /usr/local/test.txt --producer.config /usr/local/kafka10/config/producer.properties

注:在Linux控制臺發(fā)送消息時,控制臺有輸入字數(shù)限制,不利于測試,所以將大的消息放在文本文件里test.txt,通過< /usr/local/test.txt追加到控制臺

3、重新執(zhí)行消費斷的命令

./bin/kafka-console-consumer.sh --zookeeper 192.168.6.102:2181 --topic testTopic --consumer.config config/consumer.properties
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容