kafka生產(chǎn)者多種實現(xiàn)方式

本文介紹kafka生產(chǎn)者多種實現(xiàn)方式,
方式1:
編寫一個Produce類繼承kafkaProduce:
public class Producer extends KafkaProducer<String, String> {
public Producer(Properties properties) {
super(properties);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record) {
    return super.send(record);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {

    return super.send(record, callback);
}

}
編寫ProduceUtils工具類:
public class ProduceUtils {

private static ProduceUtils instance;
private static Producer producer;

//獲取實例
public static synchronized ProduceUtils getInstance() {
    if (instance == null) {
        instance = new ProduceUtils();
        System.out.println("初始化 kafka producer...");
    }
    return instance;
}


/**
 * 初始化
 */
public void init() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertyUtil.getInstance().getValueByKey("brokerList"));
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "rawMessage");// 自定義客戶端id
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// key
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);

// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.knowyou.tt.NewPartitioner");//自定義分區(qū)函數(shù)
producer = new Producer(properties);
System.out.println("loading the properities......");
}

/**
 * 發(fā)送主題和數(shù)據(jù)
 *
 * @param topic
 * @param value
 */
public void send(String topic, String value) {
    try {
        producer.send(new ProducerRecord<String, String>(topic, value));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * 發(fā)送主題,數(shù)據(jù),和數(shù)據(jù)
 *
 * @param topic
 * @param key
 * @param value
 */
public void send(String topic, String key, String value) {
    try {
        producer.send(new ProducerRecord<String, String>(topic, value));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * @param topic
 * @param key
 * @param value
 */
public void sendCallBack(String topic, String key, String value) {
    try {
        producer.send(new ProducerRecord<String, String>(topic, value), new ProduceCallback());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

編寫回調(diào)函數(shù)Callback:
public class ProduceCallback implements Callback {
private static final Logger log = LoggerFactory.getLogger(ProduceCallback.class);

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
    //send success
    if (null == exception) {
        log.info("send message success");
        return;
    }
    //send failed
    log.error("send message failed");
}

}

調(diào)用ProduceUtils即可以實現(xiàn)生產(chǎn)功能。

方式2:
編寫一個生產(chǎn)者的工廠類:
public class ProduceFactory {
private static final Logger logger = Logger.getLogger(ProduceFactory.class);

private static KafkaProducer kafkaProducer = null;

private static ProduceFactory produceFactory = null;

private ProduceFactory() {
}

/**
 * 單例
 *
 * @return
 */
public static ProduceFactory getInstance() {
    if (produceFactory == null) {
        synchronized (ProduceFactory.class) {
            if (produceFactory == null) {
                produceFactory = new ProduceFactory();
            }
        }
    }
    return produceFactory;
}

/**
 * 初始化kafka生產(chǎn)者
 *
 * @throws Exception
 */
public void init() throws Exception {
    try {
        Properties properties = new Properties();
        InputStream stream = this.getClass().getClassLoader().getResourceAsStream("kafka.properties");
        properties.load(stream);
        kafkaProducer = new KafkaProducer(properties);
    } catch (Exception e) {
        logger.error("kafka produce init error:" + e.getMessage());
    }
}

/**
 * @param topic
 * @param key
 * @param value
 */
public void send(String topic, String key, String value) {
    ProducerRecord record;
    try {
        record = new ProducerRecord<>(topic, key, value);
        kafkaProducer.send(record, new SendCallback(record, 0));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * @param topic
 * @param value
 */
public void send(String topic, String value) {
    ProducerRecord record;
    try {
        record = new ProducerRecord<>(topic, value);
        kafkaProducer.send(record, new SendCallback(record, 0));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * producer回調(diào)
 */
static class SendCallback implements Callback {
    ProducerRecord<String, String> record;
    int sendSeq = 0;

    public SendCallback(ProducerRecord record, int sendSeq) {
        this.record = record;
        this.sendSeq = sendSeq;
    }

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        //send success
        if (null == e) {
            String meta = "topic:" + recordMetadata.topic() + ", partition:"
                    + recordMetadata.topic() + ", offset:" + recordMetadata.offset();
            logger.info("send message success, record:" + record.toString() + ", meta:" + meta);
            return;
        }
        //send failed
        logger.error("send message failed, seq:" + sendSeq + ", record:" + record.toString() + ", errmsg:" + e.getMessage());
        if (sendSeq < 1) {
            kafkaProducer.send(record, new SendCallback(record, ++sendSeq));
        }
    }
}

3,采用spring-kafka的模式生產(chǎn)數(shù)據(jù)
先做一個工廠類:
public class ProduceFactory {

public static KafkaTemplate<String, String> getKafkaTemplate() {
    return new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory(getConfigs()), true);
}

private static Map<String, Object> getConfigs() {
    HashMap<String, Object> properties = new HashMap<String, Object>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:6667,192.168.1.11:6667,192.168.1.12:6667");
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
    return properties;
}

}

再做一個工具類:
public class KafkaTool {
private static KafkaTemplate<String, String> template = ProduceFactory.getKafkaTemplate();

public static void sendData(final String topic, final String line) {
    template.execute(new KafkaOperations.ProducerCallback() {
        public Object doInKafka(Producer producer) {
            producer.send(new ProducerRecord<String, String>(topic, line), new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        System.out.println("fail to send kafka");
                    }
                }
            });
            return null;
        }
    });
}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容