項目中用到kafka,應用作為生產(chǎn)者,發(fā)送消息時,報錯:
Magic v1 does not support record headers
網(wǎng)上也有很多同樣的博客記錄這個錯誤,如:https://zhuanlan.zhihu.com/p/205676507
原因是 Kafka服務端和客戶端的版本不兼容導致了報錯;
應用環(huán)境:spring boot 版本:2.1.6.RELEASE
查看客戶端kafka版本:2.2.7.RELEASE

spring-kafka-2.7.0.png
服務端版本:0.10.2

kafka-server.png
到這里確定是版本不一致導致的,如何解決呢?
方案一:
服務端升級到2.0
由于使用的是 阿里云的消息隊列kafka,2.0費用比當前版本貴很多,故該方案pass
方案二:
客戶端版本使用1.0的
使用原生的kafka,即自己寫kafka configuration,由于我們還要消費kafka的消息,故還需要寫一個監(jiān)聽器,用來消費消息,比較麻煩;換個思路:消費消息仍然使用 spring-kafka,發(fā)送消息使用kafka原生API
- maven依賴
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
沒有version,查看maven依賴,發(fā)現(xiàn)spring boot 幫我們管理了spring-kafka的版本

spring-kafka.png
Ctrl + f 查找 spring-kafka.version,如下圖:

spring-kafka-2.png
-
KafkaConfig配置類
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
/**
* kafka 配置
* 由于kafka服務端使用的是 0.10.2版本,而我們的框架使用的spring boot版本為2.4.5,
* kafka starter版本中kafka-clients版本為2.6.0;kafka客戶端作為生產(chǎn)者發(fā)送消息
* 時,會增加請求頭,而服務端不支持,導致無法正常生產(chǎn)消息,故kafka作為生產(chǎn)者,不能直
* 接使用boot starter,需使用kafka原生客戶端
* @author guodong.zhang
*/
@Configuration
public class KafkaConfig {
@Value("#{'${spring.kafka.consumer.bootstrap-servers}'.split(',')}")
private List<String> bootstrapServers;
@Bean
public KafkaProducer<String, String> initKafkaTemplate() {
Properties props = new Properties();
//設置接入點,請通過控制臺獲取對應Topic的接入點
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//請求的最長等待時間
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//設置客戶端內(nèi)部重試次數(shù)
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//設置客戶端內(nèi)部重試間隔
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
//構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內(nèi)一個Producer對象即可;
//如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
}
- 生產(chǎn)消息類
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Future;
/**
* 生產(chǎn)者,推送消息至kafka
*
* @author guodong.zhang
*/
@Slf4j
@Component
public class MessageSenderClient {
@Resource
private KafkaProducer kafkaProducer;
/**
* 消息發(fā)送至 kafka
*
* @param topic 路由
* @param data 消息內(nèi)容
*/
public void send(String topic, Object data) {
if (StringUtils.isEmpty(topic) || data == null) {
throw new IllegalStateException("發(fā)送消息參數(shù)不能為空");
}
try {
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, JSON.toJSONString(data));
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("MessageSenderClient kafka send Produce ok:{}", recordMetadata.toString());
} catch (Exception e) {
log.info("MessageSenderClient kafka send error,", e);
}
}
/**
* 消息發(fā)送至 kafka
*
* @param topic
* @param partition 分區(qū)
* @param data
*/
public void send(String topic, Integer partition, Object data) {
if (StringUtils.isEmpty(topic) || data == null) {
throw new IllegalStateException("發(fā)送消息參數(shù)不能為空");
}
if (partition == null) {
send(topic, data);
}
try {
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, partition, null, JSON.toJSONString(data));
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("MessageSenderClient kafka send Produce ok:{}", recordMetadata.toString());
} catch (Exception e) {
log.info("MessageSenderClient kafka send error,", e);
}
}
}
- 需要發(fā)送消息的地方調(diào)用該方法
@Value("${kafka.producer.job_collection_es_topic}")
private String topic;
@Autowired
private MessageSenderClient kafkaProducer;
private void sendMsg(Object target) {
if (target == null || ((JSONObject) target).get(BIZ_ID) == null) {
throw new BusinessException("發(fā)送消息參數(shù)不能為空");
}
String bizId = ((JSONObject) target).get(BIZ_ID).toString();
int partition = Integer.parseInt(bizId) % PARTITION_NUM;
// 推送到kafka
kafkaProducer.send(topic, partition, target);
}
-
測試驗證
msg.png
疑問:
kafka客戶端與服務端版本需要對應,而我這里kafka-clients使用的版本是 2.0.1,卻可以正常發(fā)送消息

kafka-client-2.0.1.jpg
