Java 實現(xiàn)阻塞隊列 Netty發(fā)送限流

場景:項目使用Netty作為TCP客戶端發(fā)送消息給TCP服務(wù)器出現(xiàn)了消息丟失問題(發(fā)送的是文件,按照規(guī)則分成幾十個分包),奇怪的是,我每個分包都收到了服務(wù)器響應(yīng)接收成功,但是服務(wù)器的開發(fā)人員說我發(fā)的消息包接收不全,我TMD的服了,我每個分包都收到響應(yīng)的啊,由于服務(wù)器端的不配合,我只能懷疑是服務(wù)器并發(fā)不夠強(qiáng),我一下子發(fā)過去他處理不了,然后發(fā)送要做限流,用阻塞隊列實現(xiàn);

使用BlockingQueue實現(xiàn)

     /**
     * 消息發(fā)送列表
     */
    private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();

    private final Runnable sendHandler = () -> {
        log.info("啟動消息消費線程");
        while (true) {
            try {
                log.info("隊列阻塞ing");
                //這里take()會阻塞,直到有數(shù)據(jù)
                String message = sendQueue.take();
                log.info("發(fā)送消息:{}",message);
                //每次發(fā)送間隔1秒,不然服務(wù)器并發(fā)不行,處理不了。。。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.error(e.toString());
            }
        }
    };

   @Test
    public void test() throws InterruptedException {
        //Demo就這樣啟動一個線程,生產(chǎn)環(huán)境就不要這樣做了,不然。。。
        //啟動消息發(fā)送線程
        new Thread(sendHandler).start();

        //模擬發(fā)送20條消息
        for (int i = 1; i <= 10; i++) {
            sendQueue.offer("message1:"+i);
        }

        //休眠20秒
        Thread.sleep(20000);

        //再模擬發(fā)送20條消息
        for (int i = 1; i <= 10; i++) {
            sendQueue.offer("message2:"+i);
        }

        //休眠20秒
        Thread.sleep(20000);

        log.info("退出程序");
    }

查看日志

17:46:08.247 [Thread-0] INFO com.carrun.阻塞隊列 - 啟動消息消費線程
17:46:08.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:08.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:1
17:46:09.250 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:09.250 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:2
17:46:10.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:10.251 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:3
17:46:11.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:11.251 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:4
17:46:12.252 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:12.252 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:5
17:46:13.253 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:13.253 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:6
17:46:14.253 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:14.253 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:7
17:46:15.254 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:15.254 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:8
17:46:16.254 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:16.254 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:9
17:46:17.254 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:17.254 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:10
17:46:18.255 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:28.246 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:1
17:46:29.247 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:29.247 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:2
17:46:30.247 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:30.247 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:3
17:46:31.248 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:31.248 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:4
17:46:32.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:32.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:5
17:46:33.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:33.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:6
17:46:34.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:34.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:7
17:46:35.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:35.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:8
17:46:36.250 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:36.250 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:9
17:46:37.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:37.251 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:10
17:46:38.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:48.246 [main] INFO com.carrun.阻塞隊列 - 退出程序

由日志可以看出,先發(fā)送10條數(shù)據(jù)(message1,每條間隔1條),然后等待了10秒,接著發(fā)送后面添加的那10條數(shù)據(jù)(message2,每條間隔1條),達(dá)到了限流發(fā)送要求。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容