六、RocketMQ案例(三)

1、Transaction example

什么是Transaction 消息?

它可以被認(rèn)為是兩階段提交消息實(shí)現(xiàn),以確保分布式系統(tǒng)中的最終一致性。事務(wù)性消息確??梢栽臃绞綀?zhí)行本地事務(wù)的執(zhí)行和消息的發(fā)送。

使用限制

(1)Transaction 消息沒有scheduler和batch支持。
(2)為了避免多次檢查單個(gè)消息并導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為15次,但用戶可以通過更改“transactionCheckMax”來更改此限制“代理配置中的參數(shù),如果已經(jīng)通過”transactionCheckMax“檢查了一條消息,則代理將默認(rèn)丟棄此消息并同時(shí)打印錯(cuò)誤日志。用戶可以通過覆蓋“AbstractTransactionCheckListener”類來更改此行為。
(3)在broker的配置中由參數(shù)“transactionTimeout”確定的一段時(shí)間之后將檢查Transaction 消息。用戶也可以通過在發(fā)送事務(wù)消息時(shí)設(shè)置用戶屬性“CHECK_IMMUNITY_TIME_IN_SECONDS”來更改此限制,此參數(shù)優(yōu)先于“transactionMsgTimeout”參數(shù)。
(4)可以多次檢查或消費(fèi)Transaction 消息。
(5)對(duì)用戶的目標(biāo)主題的已提交消息可能會(huì)失敗。目前,它取決于日志記錄。RocketMQ本身的高可用性機(jī)制確保了高可用性。如果要確保Transaction 消息不會(huì)丟失并且保證事務(wù)完整性,建議使用同步雙寫。機(jī)制。
(6)事務(wù)消息的producer ID不能與其他類型消息的producer ID共享。與其他類型的消息不同,Transaction 消息允許后向查詢。MQ Server按其producer ID查詢客戶端。

Application

Transactional 狀態(tài)

事務(wù)性消息有三種狀態(tài):
(1)TransactionStatus.CommitTransaction:提交事務(wù),這意味著允許消費(fèi)者使用此消息。
(2)TransactionStatus.RollbackTransaction:回滾事務(wù),表示該消息將被刪除而不允許使用。
(3)TransactionStatus.Unknown:中間狀態(tài),表示需要MQ檢查以確定狀態(tài)。

發(fā)送transactional 消息

(1)創(chuàng)建transactional producer
使用TransactionMQProducer類創(chuàng)建producer client,并指定唯一的producerGroup,并且可以設(shè)置自定義線程池來處理檢查請(qǐng)求。執(zhí)行本地事務(wù)后,需要根據(jù)執(zhí)行結(jié)果回復(fù)MQ,并返回Transactional 狀態(tài)。

package example6;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

public class TransactionProducer {


    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });


        producer.setNamesrvAddr("192.168.247.132:9876");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();


    }
}

(2)實(shí)現(xiàn)TransactionListener接口
“executeLocalTransaction”方法用于在發(fā)送半消息成功時(shí)執(zhí)行本地事務(wù)。它返回上一節(jié)中提到的三種事務(wù)狀態(tài)之一。
“checkLocalTransaction”方法用于檢查本地事務(wù)狀態(tài)并響應(yīng)MQ檢查請(qǐng)求。它還返回上一節(jié)中提到的三種事務(wù)狀態(tài)之一。


public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap();

  //這個(gè)方法會(huì)在每一條消息發(fā)出去后 執(zhí)行 保證事務(wù)的一致。
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

  //每隔一段時(shí)間  rocketMQ 會(huì)回調(diào) 這個(gè)方法 判斷 每一條消息是否提交。防止 消息狀態(tài)停滯 或者出現(xiàn)超時(shí)的情況
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

2、OpenMessaging示例

OpenMessaging,其中包括建立行業(yè)指南和消息傳遞,流媒體規(guī)范,為財(cái)務(wù),電子商務(wù),物聯(lián)網(wǎng)和大數(shù)據(jù)領(lǐng)域提供通用框架。設(shè)計(jì)原則是面向云,簡單,靈活和獨(dú)立于語言的分布式異構(gòu)環(huán)境。符合這些規(guī)范將有可能在所有主要平臺(tái)和操作系統(tǒng)上開發(fā)異構(gòu)消息傳遞應(yīng)用程序。

RocketMQ提供了OpenMessaging 0.1.0-alpha的部分實(shí)現(xiàn),以下示例演示了如何基于OpenMessaging訪問RocketMQ。

OMSProducer

以下示例說明如何在同步,異步或單向傳輸中向RocketMQ代理發(fā)送消息。

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPullConsumer

使用OMS PullConsumer輪詢來自指定隊(duì)列的消息。

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPushConsumer

將OMS PushConsumer附加到指定的隊(duì)列并使用MessageListener消耗消息

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
                context.ack();
            }
        });

    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容