一、kafka默認的發(fā)送一條消息的大小是1M,如果不配置,當發(fā)送的消息大于1M是,就會報錯
[2018-07-03 14:49:38,411] ERROR Error when sending message to topic testTopic with key: null, value: 2095476 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 2095510 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
二、具體配置
1、 kafka topic
/bin/kafka-topics.sh --zookeeper 192.168.6.102:2181 --alter --topic testTopic --config max.message.bytes=52428800
2、server.properties中添加
message.max.bytes=5242880(5M)
replica.fetch.max.bytes=6291456(6M)每個分區(qū)試圖獲取的消息字節(jié)數(shù)。要大于等于message.max.bytes
3、producer.properties中添加
max.request.size = 5242880 (5M)請求的最大大小為字節(jié)。要小于 message.max.bytes
4、consumer.properties中添加
fetch.message.max.bytes=6291456(6M)每個提取請求中為每個主題分區(qū)提取的消息字節(jié)數(shù)。要大于等于message.max.bytes
5、在生產(chǎn)端使用java發(fā)送消息
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "node1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默認為serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("max.request.size", "52428800");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>(
"testkafka", "jack",
"rose"));
producer.close();
}
三、重啟
1、更改完配置要重啟kafka server才能生效
1.1、先停止kafka.
a、通過命令:bin/kafka-server-stop.sh
b、找到kafka進程,命令:ps -ef | grep kafka,然后kill掉
1.2、啟動kafka server:
nohup bin/kafka-server-start.sh config/server.properties&
2、重新執(zhí)行生產(chǎn)端的命令
./bin/kafka-console-producer.sh --broker-list 192.168.6.102:8997 --topic testTopic < /usr/local/test.txt --producer.config /usr/local/kafka10/config/producer.properties
注:在Linux控制臺發(fā)送消息時,控制臺有輸入字數(shù)限制,不利于測試,所以將大的消息放在文本文件里test.txt,通過< /usr/local/test.txt追加到控制臺
3、重新執(zhí)行消費斷的命令
./bin/kafka-console-consumer.sh --zookeeper 192.168.6.102:2181 --topic testTopic --consumer.config config/consumer.properties