kafka生產(chǎn)者連接池模式

package com.ky.produce;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**

  • @author xwj

  • 線程池生產(chǎn)者
    */
    public class ProduceThreadPool {

    private static Properties properties = new Properties();

    private static KafkaProducer<String, String> producer;

    private static ThreadPoolExecutor service;

    private static TimeUnit timeUnit = TimeUnit.SECONDS;

    private static BlockingQueue blockingQueue = new LinkedBlockingQueue<Runnable>();

    static {
    int corePoolSize = 40;
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.11.110:6667,10.1.11.111:6667,10.1.11.112:6667");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    producer = new KafkaProducer<>(properties);
    int maximumPoolSize = 100;
    long keepAliveTime = 60;
    service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue);
    System.out.println("init ok....");
    }

//發(fā)送數(shù)據(jù)
public static void sendData(String topic, String msg) {
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, String.valueOf(System.currentTimeMillis()), msg);
        service.submit(new ProducerThread(producer, record));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

package com.ky.produce;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

  • @author xwj

  • <p>

  • 生產(chǎn)者線程
    */
    public class ProducerThread implements Runnable {

    private Logger log = LoggerFactory.getLogger(ProducerThread.class);

    private KafkaProducer<String, String> producer;
    private ProducerRecord<String, String> record;

    ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
    this.producer = producer;
    this.record = record;
    }

    @Override
    public void run() {
    producer.send(record, (metadata, e) -> {
    if (null != e) {
    e.printStackTrace();
    }
    if (null != metadata) {
    log.info("消息發(fā)送成功 : " + String.format("offset: %s, partition:%s, topic:%s timestamp:%s",
    metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
    }
    });
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容