模仿rocket實(shí)現(xiàn)mq

項(xiàng)目地址
使用和rocket相似

// producer
public class ProducerExample {
    static final String BROKER = "localhost:8989";
    static final String TOPIC = "test-topic";

    public static void main(String[] args) {
        DefaultProducer producer = new DefaultProducer();
        producer.start();
        producer.addBroker(TOPIC, BROKER);
        for (int i = 0; i < 10; i++) {
            Message msg = new Message();
            msg.setTopic(TOPIC);
            msg.setBody(("this is body" + i).getBytes(Charset.forName("UTF-8")));
            SendResult result = producer.send(msg);
            if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
                System.out.println("success");
            } else {
                System.out.println("error");
            }
        }
    }
}
// consumer
public class ConsumerExample {
    static final String BROKER = "localhost:8989";
    static final String TOPIC = "test-topic";

    public static void main(String[] args) {
        DefaultConsumer consumer = new DefaultConsumer();

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(Message msg) {
                System.out.println(new String(msg.getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        consumer.addBroker(TOPIC, BROKER);
        consumer.subscribe(TOPIC);
    }
}

后期準(zhǔn)備做的

  • 完善客戶端channel管理。
  • broker分布式。這依賴于客戶端的負(fù)載均衡,將topic分片存儲。由于目前無持久化,因此使用同步雙寫保證數(shù)據(jù)一致性。
  • 數(shù)據(jù)持久化。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容