場景:項目使用Netty作為TCP客戶端發(fā)送消息給TCP服務(wù)器出現(xiàn)了消息丟失問題(發(fā)送的是文件,按照規(guī)則分成幾十個分包),奇怪的是,我每個分包都收到了服務(wù)器響應(yīng)接收成功,但是服務(wù)器的開發(fā)人員說我發(fā)的消息包接收不全,我TMD的服了,我每個分包都收到響應(yīng)的啊,由于服務(wù)器端的不配合,我只能懷疑是服務(wù)器并發(fā)不夠強(qiáng),我一下子發(fā)過去他處理不了,然后發(fā)送要做限流,用阻塞隊列實現(xiàn);
使用BlockingQueue實現(xiàn)
/**
* 消息發(fā)送列表
*/
private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
private final Runnable sendHandler = () -> {
log.info("啟動消息消費線程");
while (true) {
try {
log.info("隊列阻塞ing");
//這里take()會阻塞,直到有數(shù)據(jù)
String message = sendQueue.take();
log.info("發(fā)送消息:{}",message);
//每次發(fā)送間隔1秒,不然服務(wù)器并發(fā)不行,處理不了。。。
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(e.toString());
}
}
};
@Test
public void test() throws InterruptedException {
//Demo就這樣啟動一個線程,生產(chǎn)環(huán)境就不要這樣做了,不然。。。
//啟動消息發(fā)送線程
new Thread(sendHandler).start();
//模擬發(fā)送20條消息
for (int i = 1; i <= 10; i++) {
sendQueue.offer("message1:"+i);
}
//休眠20秒
Thread.sleep(20000);
//再模擬發(fā)送20條消息
for (int i = 1; i <= 10; i++) {
sendQueue.offer("message2:"+i);
}
//休眠20秒
Thread.sleep(20000);
log.info("退出程序");
}
查看日志
17:46:08.247 [Thread-0] INFO com.carrun.阻塞隊列 - 啟動消息消費線程
17:46:08.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:08.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:1
17:46:09.250 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:09.250 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:2
17:46:10.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:10.251 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:3
17:46:11.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:11.251 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:4
17:46:12.252 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:12.252 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:5
17:46:13.253 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:13.253 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:6
17:46:14.253 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:14.253 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:7
17:46:15.254 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:15.254 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:8
17:46:16.254 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:16.254 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:9
17:46:17.254 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:17.254 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message1:10
17:46:18.255 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:28.246 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:1
17:46:29.247 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:29.247 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:2
17:46:30.247 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:30.247 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:3
17:46:31.248 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:31.248 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:4
17:46:32.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:32.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:5
17:46:33.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:33.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:6
17:46:34.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:34.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:7
17:46:35.249 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:35.249 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:8
17:46:36.250 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:36.250 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:9
17:46:37.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:37.251 [Thread-0] INFO com.carrun.阻塞隊列 - 發(fā)送消息:message2:10
17:46:38.251 [Thread-0] INFO com.carrun.阻塞隊列 - 隊列阻塞ing
17:46:48.246 [main] INFO com.carrun.阻塞隊列 - 退出程序
由日志可以看出,先發(fā)送10條數(shù)據(jù)(message1,每條間隔1條),然后等待了10秒,接著發(fā)送后面添加的那10條數(shù)據(jù)(message2,每條間隔1條),達(dá)到了限流發(fā)送要求。