精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

分布式場景下的事務(wù)機(jī)制

開發(fā) 前端
首先客戶端Producer通過sendMessageInTransaction方法發(fā)送事務(wù)消息,Broker判斷是事務(wù)消息就將消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回給客戶端,客戶端繼續(xù)執(zhí)行邏輯。

事務(wù)消息是RocketMQ的一個(gè)非常特色的高級(jí)特性,它的基礎(chǔ)訴求是通過RocketMQ的事務(wù)機(jī)制,來保證上下游的數(shù)據(jù)?致性。

我們在單機(jī)版本下面只需要在業(yè)務(wù)方法上加上對(duì)應(yīng)的事務(wù)就可以達(dá)到效果,但是分布式的場景下,多個(gè)系統(tǒng)之間的協(xié)調(diào)配合,你無法知道到底是那個(gè)先執(zhí)行那個(gè)后執(zhí)行,當(dāng)然在微服務(wù)里面存在Seate框架來保證事務(wù),但是這事務(wù)的保證始終是心頭大患,只能用一句話形容魚和熊掌不可兼得。

而RocketMq的事務(wù)消息能夠在提升性能的情況下滿足要求,其主要實(shí)現(xiàn)是支持分布式情況下保障消息生產(chǎn)和本地事務(wù)的最終一致性,消息生產(chǎn)我們可以使用順序消息去執(zhí)行,這樣我們只需要滿足這兩個(gè)的事務(wù)即可。

 實(shí)現(xiàn)過程

圖片圖片

準(zhǔn)備階段:生產(chǎn)者將消息發(fā)送到Broker,Broker向生產(chǎn)者發(fā)送ack表示消息發(fā)送成功,但是此時(shí)的消息為一個(gè)等待狀態(tài),不會(huì)被消費(fèi)者去消費(fèi)。(生產(chǎn)者繼續(xù)執(zhí)行接下來的代碼)

確認(rèn)階段:當(dāng)我們執(zhí)行完所有的代碼后,本地事務(wù)要么回滾要么提交,此時(shí)當(dāng)我們了解本地事務(wù)的狀態(tài)后,將結(jié)果推送給Broker做二次確認(rèn)結(jié)果,如果為Commit則將修改激活準(zhǔn)備推送給消費(fèi)者,如果為Rollback則將消息進(jìn)行回滾。

補(bǔ)償機(jī)制:當(dāng)出現(xiàn)異常情況沒有發(fā)生二次確認(rèn),此時(shí)我們在固定時(shí)間后將會(huì)進(jìn)行回查,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài),重寫Commit或者Rollback。

 涉及狀態(tài)以及注意點(diǎn)

事務(wù)消息存在三種狀態(tài):

CommitTransaction:提交事務(wù)狀態(tài),此狀態(tài)下允許消費(fèi)者消費(fèi)。

RollbackTransaction:回滾事務(wù)狀態(tài),此狀態(tài)下消息會(huì)被刪除。

Unknown:中間狀態(tài),此狀態(tài)下會(huì)等待本地事務(wù)處理結(jié)果進(jìn)行對(duì)應(yīng)操作。

注意點(diǎn):

本消息狀態(tài)是一種對(duì)消費(fèi)者不可見的狀態(tài),將消息的內(nèi)容放到系統(tǒng)Topic的RMQ_SYS_TRANS_HALF_TOPIC隊(duì)列里面去。

事務(wù)消息中的相關(guān)參數(shù)可以進(jìn)行設(shè)置,比如:本地事務(wù)回查次數(shù)transactionCheckMax默認(rèn)15次,本地事務(wù)回查的間隙transactionCheckInterval默認(rèn)60s,超出后會(huì)直接將消息丟棄。

RocketMQ的事務(wù)消息是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以定義到全局事務(wù)中,要么同時(shí)成功,要么同時(shí)失敗,通過RocketMQ的事務(wù)信息可以實(shí)現(xiàn)可靠消息的最終一致性方案。

 源碼解析

Producer端通過構(gòu)建TransactionMQProducer對(duì)象綁定事務(wù)監(jiān)聽。

TransactionListener transactionListener = new TransactionListener() {    @Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        return LocalTransactionState.COMMIT_MESSAGE;    }
    @Override    public LocalTransactionState checkLocalTransaction(MessageExt msg) {        return LocalTransactionState.COMMIT_MESSAGE;    }};TransactionMQProducer producer = new TransactionMQProducer(producerGroupTemp);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr("127.0.0.1:9876");product.start();SendResult result = producer.sendMessageInTransaction(message, arg);

執(zhí)行sendMessageInTransaction方法來發(fā)送消息。

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
  // 檢查TransactionListener是否存在,如果不存在就直接拋異常
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // 事務(wù)消息不支持延遲等特性
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 設(shè)置half屬性,表明是事務(wù)屬性
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // 設(shè)置所屬生成者組
    // broker向生產(chǎn)者發(fā)送回查事務(wù)請(qǐng)求根據(jù)這個(gè)producergroup找到指定的channel
    // 生產(chǎn)者能找到所有在同一個(gè)組的機(jī)器實(shí)例從而檢查事務(wù)狀態(tài)
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 同步發(fā)送
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    // 消息返回信息
    switch (sendResult.getSendStatus()) {
            // 第一階段消息發(fā)送成功
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    // 設(shè)置事務(wù)ID屬性
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    // 執(zhí)行本地事務(wù)
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    // 發(fā)送消息成功后,執(zhí)行本地操作
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 本地事務(wù)執(zhí)行完畢向broker提交事務(wù)或回滾事務(wù)
        this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

首先發(fā)送第一階段信息直接返回半提交狀態(tài),然后執(zhí)行本地事務(wù)返回事務(wù)的三種狀態(tài),未知,回滾,提交,最后執(zhí)行endTransaction方法,把事務(wù)執(zhí)行的狀態(tài)告訴broker。

endTransaction方法

根據(jù)本地事務(wù)執(zhí)行狀態(tài)構(gòu)建requestHeader對(duì)象執(zhí)行二階段提交。

public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    // 獲取消息中的MessageId
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    // 找到broker地址
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    // 構(gòu)建EndTransactionRequestHeader對(duì)象
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    // offset是prepare消息中offsetMsgId中獲取的
    requestHeader.setCommitLogOffset(id.getOffset());
    requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
    // 社會(huì)提交/回滾狀態(tài)
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            // 提交
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            // 回滾
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            // 未知
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 發(fā)送給broker端
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

將本地方法執(zhí)行事務(wù)的結(jié)果發(fā)送給Broker,通過endTransactionOneway方法創(chuàng)建Code為END_TRANSACTION的消息,然后在Broker就會(huì)找出對(duì)應(yīng)的Processor來處理。

    Broker端處理     

Broker總共存在兩個(gè)處理,首先針對(duì)第一個(gè)階段發(fā)送的Half消息,broker要進(jìn)行相關(guān)的操作,后面endTransaction提交進(jìn)來的事務(wù)狀態(tài),針對(duì)三種狀態(tài)進(jìn)行相關(guān)操作。

接收第一階段發(fā)送的Half消息

SendMessageProcessor的sendMessage方法中去執(zhí)行處理事務(wù)消息。

// 發(fā)送Half消息時(shí),在屬性中設(shè)置了PROPERTY_TRANSACTION_PREPARED為true,這里根據(jù)這個(gè)屬性判斷是否是事務(wù)消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    // 事務(wù)消息進(jìn)入這里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盤的方式存入store
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
}

如果消息攜帶事務(wù)標(biāo)記就去執(zhí)行TransactionMessageService類的prepareMessage方法進(jìn)行相關(guān)的處理。

// 解析Half消息
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
      // 把真實(shí)的topic和真實(shí)的queueId放在消息的屬性中
     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
         String.valueOf(msgInner.getQueueId()));
     // 設(shè)置默認(rèn)的事務(wù)狀態(tài)為TRANSACTION_NOT_TYPE=>unknow
     msgInner.setSysFlag(
         MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
     // 將消息的topic設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC,這個(gè)是對(duì)消費(fèi)者不可見的
     msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
     // 設(shè)置queueId=0
     msgInner.setQueueId(0);
     msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
     return msgInner;
}

進(jìn)行topic的切換,將原來的topic存入到消息的屬性里面,將消息的topic設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC。

處理endTransaction方法

在endTransaction方法中將消息同步給Broker處理的Code對(duì)應(yīng)為END_TRANSACTION,Broker就會(huì)找出對(duì)應(yīng)的Processor來處理該類即調(diào)用EndTransactionProcessor類的processRequest方法處理。

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 根據(jù)commitLogOffset獲取文件中的message,獲取到了返回success
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 檢查消息是否一致
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 生成要保存的消息
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
            // 把真實(shí)的topic消息存儲(chǔ)到CommitLog中
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 移除prepare消息,存入opQueueMap中
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
    // 回滾
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 查詢到half消息則返回成功
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 檢查消息是否一致
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 移除prepare消息,存入opQueueMap中
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}

僅僅展示相關(guān)核心代碼,其主要邏輯:首先去判斷請(qǐng)求的方式是commit還是rollback,如果是commit查詢到消息還原消息原來的topic,然后刪除half topic上的消息轉(zhuǎn)存到opQueueMap中,如果是rollback直接進(jìn)行刪除half topic上的消息并轉(zhuǎn)存到opQueueMap中去。

注意:opQueueMap的引入為了解決有可能出現(xiàn)網(wǎng)絡(luò)、進(jìn)程、線程等各種因素導(dǎo)致消費(fèi)端未能成功處理消息的情況,該機(jī)制的作用是在消費(fèi)者端將未成功處理的消息重新發(fā)送到服務(wù)端進(jìn)行重試,直到確認(rèn)消息已經(jīng)被成功處理或者達(dá)到最大重試次數(shù)后進(jìn)行回滾操作。而 Op 消息本身則是通過修改消息狀態(tài)來實(shí)現(xiàn)的。

消息回查

當(dāng)網(wǎng)絡(luò)中斷或者響應(yīng)超時(shí)等各種異常信息導(dǎo)致消息并沒有傳送到broker端去,為了解決這一問題在Broker就開啟一個(gè)回查線程每隔一分鐘執(zhí)行一次處理超過6s未回查的消息,當(dāng)超過15次回查后直接將消息丟棄。

在啟動(dòng)BrokerController類時(shí),會(huì)去調(diào)用startProcessorByHa方法如果是Master節(jié)點(diǎn)就會(huì)去啟動(dòng)一個(gè)線程每隔6s處理未回查的消息,檢查最大次數(shù)為15次。

public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 檢查回查消息 timeout = 6s checkMax=15
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

在check方法里面去調(diào)用listener.resolveHalfMsg(msgExt)方法去處理事務(wù)消息。

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

執(zhí)行sendCheckMessage方法發(fā)送一個(gè)檢查事務(wù)狀態(tài)的Code為CHECK_TRANSACTION_STATE的消息,在客戶端MQClientAPIImpl初始化的時(shí)候就會(huì)去注冊一個(gè)Code對(duì)應(yīng)的Processor,最終就會(huì)去執(zhí)行checkTransactionState方法,判斷本地事務(wù)的狀態(tài),然后再去執(zhí)行endTransactionOneway發(fā)起END_TRANSACTION處理。

public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        private final String brokerAddr = addr;
        private final MessageExt message = msg;
        private final CheckTransactionStateRequestHeader checkRequestHeader = header;
        private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();


        // 執(zhí)行線程方法
        @Override
        public void run() {
            TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
            TransactionListener transactionListener = getCheckListener();
            if (transactionCheckListener != null || transactionListener != null) {
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable exception = null;
                try {
                    if (transactionCheckListener != null) {
                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                    } else if (transactionListener != null) {
                        log.debug("Used new check API in transaction message");
                        // 檢查本地事務(wù)
                        localTransactionState = transactionListener.checkLocalTransaction(message);
                    } else {
                        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                    }
                } catch (Throwable e) {
                    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                    exception = e;
                }
                // 處理事務(wù)狀態(tài)
                this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);
            } else {
                log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
            }
        }
      // 
        private void processTransactionState(
            final LocalTransactionState localTransactionState,
            final String producerGroup,
            final Throwable exception) {
            final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
            thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
            thisHeader.setProducerGroup(producerGroup);
            thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
            thisHeader.setFromTransactionCheck(true);
            thisHeader.setBname(checkRequestHeader.getBname());


            String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (uniqueKey == null) {
                uniqueKey = message.getMsgId();
            }
            thisHeader.setMsgId(uniqueKey);
            thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
            switch (localTransactionState) {
                // 提交狀態(tài)
                case COMMIT_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                // 回滾狀態(tài)
                case ROLLBACK_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                    break;
                // 未知狀態(tài)
                case UNKNOW:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                    break;
                default:
                    break;
            }


            String remark = null;
            if (exception != null) {
                remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
            }
            doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);


            try {
                // 再次執(zhí)行endTransactionOneway發(fā)起END_TRANSACTION
                DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                    3000);
            } catch (Exception e) {
                log.error("endTransactionOneway exception", e);
            }
        }
    };


    this.checkExecutor.submit(request);
}

總結(jié)

首先客戶端Producer通過sendMessageInTransaction方法發(fā)送事務(wù)消息,Broker判斷是事務(wù)消息就將消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回給客戶端,客戶端繼續(xù)執(zhí)行邏輯。

然后調(diào)用endTransaction方法去提交本地事務(wù)通過endTransactionOneway將消息提交給Broker端,Broker端通過Code為END_TRANSACTION的處理器去處理消息調(diào)用processRequest方法來處理對(duì)應(yīng)的消息,

如果由于各種原因?qū)е孪⒌氖鬏?,為了防止這些現(xiàn)象的出現(xiàn)所以在BrokerController啟動(dòng)時(shí)就啟動(dòng)一個(gè)線程每隔6s處理未回查的消息(檢查最大次數(shù)為15次)的任務(wù)來進(jìn)行消息的回查,簡單來說就是通過sendCheckMessage方法去注冊一個(gè)Code為CHECK_TRANSACTION_STATE的消息將內(nèi)容發(fā)送給客戶端,然后客戶端在啟動(dòng)時(shí)也注冊對(duì)應(yīng)Code的處理邏輯,通過processTransactionState方法去處理事務(wù)的狀態(tài),如果正常最后還是會(huì)去執(zhí)行endTransactionOneway方法,完成事務(wù)消息。

責(zé)任編輯:武曉燕 來源: java從零到壹
相關(guān)推薦

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2024-06-13 08:04:23

2009-06-19 15:28:31

JDBC分布式事務(wù)

2009-09-18 15:10:13

分布式事務(wù)LINQ TO SQL

2021-09-29 09:07:37

分布式架構(gòu)系統(tǒng)

2022-12-08 08:13:11

分布式數(shù)據(jù)庫CAP

2023-09-11 15:40:43

鍵值存儲(chǔ)云服務(wù)

2021-09-28 09:43:11

微服務(wù)架構(gòu)技術(shù)

2021-02-01 09:35:53

關(guān)系型數(shù)據(jù)庫模型

2021-12-01 10:13:48

場景分布式并發(fā)

2019-06-26 09:41:44

分布式事務(wù)微服務(wù)

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2022-03-24 07:51:27

seata分布式事務(wù)Java

2025-05-15 08:05:00

2020-02-25 15:00:42

數(shù)據(jù)分布式架構(gòu)

2014-01-22 13:37:53

2023-02-21 16:41:41

分布式相機(jī)鴻蒙
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

成人xxxxx色| 久久精品中文字幕免费mv| 可以在线看的av网站| 深夜视频在线免费| 蜜桃精品视频在线| 欧美高清第一页| 黄色在线观看av| 国产精品一区二区三区www| 亚洲成人精品影院| 日韩欧美亚洲v片| 亚洲黄色小说网址| 免费在线一区观看| 91精品国产91久久久久久吃药 | 亚洲综合资源| 午夜影院久久久| 亚洲自拍三区| 日本午夜在线| 成人一区二区三区在线观看| 国产精品美女久久| 青青青国产在线| 欧美在线免费| 久久精品99无色码中文字幕| 成人片黄网站色大片免费毛片| 欧洲大片精品免费永久看nba| 欧美艳星brazzers| 日韩中文字幕三区| 98色花堂精品视频在线观看| 国产精品传媒视频| 欧美午夜视频在线| 天堂在线免费av| 成人午夜av电影| 亚洲xxx大片| 一级久久久久久久| 天堂资源在线中文精品| 午夜精品久久久久久久99热| 真实国产乱子伦对白在线| 日本一本不卡| 亚洲午夜激情免费视频| 免费成人深夜夜行p站| 99这里只有精品视频| 欧美一区二区在线看| 国产亚洲视频一区| 日本成人在线网站| 欧美精品777| 污色网站在线观看| 自拍偷自拍亚洲精品被多人伦好爽 | 欧美日韩免费做爰视频| 国产精品福利在线观看播放| 最近2019中文字幕大全第二页 | 国产精品美女无圣光视频| 五月激情六月丁香| 国产精品久久久久久久免费软件| 午夜精品久久久久久99热| 成年人午夜视频| 亚洲毛片一区| 国产91精品久久久久久久| 日韩成人免费观看| 亚洲欧美网站| 国产精品高潮在线| 最近中文字幕在线免费观看 | 狠狠综合久久av一区二区小说| 香港三级韩国三级日本三级| 成人观看网址| 色老综合老女人久久久| 黄色免费网址大全| 国产麻豆一区| 91精品国产色综合久久| 免费看三级黄色片| 免费看久久久| 亚洲性xxxx| 免费在线观看a级片| 中文不卡在线| 久久久之久亚州精品露出| 日韩av电影网址| 老司机免费视频久久| 国产精品一区二区三区在线播放| 97超碰中文字幕| 国产成人自拍网| 久久精品国产综合精品| 阿v免费在线观看| 亚洲欧美一区二区三区国产精品 | 日韩中文一区| 日p在线观看| 亚洲尤物在线视频观看| 妺妺窝人体色www在线小说| 高清不卡av| 91麻豆精品国产无毒不卡在线观看| 日本一区二区三区在线免费观看| 国产精品白浆| 国产亚洲欧美一区| 久久久久久天堂| 免费日韩视频| 成人性生交大片免费看视频直播| 亚洲精品网站在线| 久久精子c满五个校花| 日韩国产精品毛片| 中文字幕在线高清| 日韩一区二区三区免费观看| 亚洲精品成人无码熟妇在线| 久久久久久久久久久久久久久久久久| 久久免费精品日本久久中文字幕| 中日韩精品视频在线观看| 久久99九九99精品| 久久国产主播精品| 先锋影音在线资源站91| 欧洲精品一区二区三区在线观看| 欧美性猛交乱大交| 欧美日韩中文一区二区| 91国产美女在线观看| 国产精品热久久| 久久久午夜精品| 国产自产在线视频| 亚洲青青久久| 在线观看国产精品日韩av| 国产无遮挡又黄又爽又色| 精品一区二区免费在线观看| 欧美午夜免费| 伊人网在线播放| 欧美zozo另类异族| 91香蕉一区二区三区在线观看| 亚洲制服少妇| 精品日本一区二区三区| 在线看女人毛片| 欧美精选一区二区| 懂色av蜜桃av| 亚洲制服少妇| 久久99影院| www在线观看黄色| 欧美一区二区三区日韩| 欧美成人短视频| 日韩精品亚洲专区| 欧美xxxx黑人又粗又长密月| 福利成人导航| 日韩精品最新网址| 欧美成人黄色网| 狠狠色丁香婷综合久久| 色综合电影网| 亚洲mmav| 亚洲视频在线免费看| 国产又黄又爽又色| 91年精品国产| 人妻有码中文字幕| 日韩精品免费一区二区夜夜嗨| 午夜精品久久久久久久男人的天堂| 99热这里只有精品99| 亚洲精品一二三四区| 中文字幕色网站| 影音先锋成人在线电影| 亚洲一区二区三区乱码aⅴ蜜桃女| 色大18成网站www在线观看| 精品视频在线视频| 欧美精品日韩在线| 另类综合日韩欧美亚洲| 亚洲综合网中心| 精品国产亚洲一区二区在线观看| 久久久精品在线观看| 国产毛片久久久久| 亚洲精品乱码久久久久久| 熟妇女人妻丰满少妇中文字幕| 综合天天久久| 国产偷久久久精品专区| 日韩伦理在线| 亚洲天堂影视av| 在线观看免费高清视频| 亚洲欧美日韩久久| 中国极品少妇xxxx| 亚洲专区一区二区三区| 天堂社区 天堂综合网 天堂资源最新版| 一呦二呦三呦精品国产| 日韩在线中文视频| 亚洲精品网站在线| 日韩欧美在线观看视频| 欧美aaa级片| 国产91综合一区在线观看| 青青艹视频在线| 日韩在线观看| 风间由美久久久| 伊人久久国产| 日韩在线高清视频| 亚洲精品综合网| 在线一区二区视频| 极品颜值美女露脸啪啪| 久久婷婷一区二区三区| 天天综合成人网| 国产亚洲毛片| 9l视频自拍9l视频自拍| 香蕉久久夜色精品国产使用方法 | 国产亚洲视频系列| 久久婷婷中文字幕| aa国产精品| 一区二区三区观看| 麻豆一区二区| 91久久嫩草影院一区二区| 高清精品在线| 久久亚洲影音av资源网| 欧美色图另类| 日韩午夜在线影院| 午夜一区二区三区四区| 一区二区三区高清在线| 日本爱爱爱视频| 不卡一区在线观看| 特级西西444www| 午夜一区二区三区不卡视频| 热久久最新地址| 精品国产一区一区二区三亚瑟| 99三级在线| 欧美日韩尤物久久| 2018日韩中文字幕| 2024最新电影在线免费观看| 亚洲天堂成人在线视频| 日韩中文字幕免费观看| 91精品国产色综合久久ai换脸 | 国产乱码精品一品二品| 人人爽人人av| 国产美女诱惑一区二区| 欧美一区二区三区综合| 久久综合99| 欧美亚洲另类久久综合| 久久精品国产亚洲5555| 99国产视频| 欧美天堂一区| 国产精品精品久久久| 乡村艳史在线观看| 性视频1819p久久| 亚洲wwwww| 麻豆国产精品va在线观看不卡| 国产免费永久在线观看| 精品一区二区三区三区| 天堂av资源网| 精品国产乱码久久久久久浪潮| 国产农村老头老太视频| 欧美久久久一区| 96日本xxxxxⅹxxx17| 欧美日韩国产欧美日美国产精品| 无码人妻aⅴ一区二区三区有奶水 无码免费一区二区三区 | 欧美日韩在线不卡一区| 日韩在线黄色| 国产在线资源一区| 国产亚洲精品美女久久| 国产精品一区二| 成人在线视频你懂的| 999视频在线观看| 免费一区二区三区在线视频| 91欧美精品成人综合在线观看| 四虎永久精品在线| 成人激情视频免费在线| 亚洲美女色播| 亚洲va国产va天堂va久久| 二区三区精品| a级国产乱理论片在线观看99| 精品视频在线观看网站| 97超碰资源| 韩国女主播一区二区三区| 精品麻豆av| 一区三区在线欧| 日韩精品最新在线观看| 国产亚洲一区| 亚洲人成影视在线观看| 在线精品国产| 可以看毛片的网址| 性久久久久久| 久热精品在线播放| 国产一区欧美日韩| 欧美图片自拍偷拍| xfplay精品久久| 夜夜春很很躁夜夜躁| 亚洲视频一区二区在线观看| 欧美国产日韩综合| 日韩欧美成人区| 亚洲在线视频播放| 日韩欧美在线观看一区二区三区| 亚洲精品无码久久久| 日韩精品免费综合视频在线播放 | 激情网站五月天| 久久se这里有精品| 稀缺小u女呦精品呦| 久久久蜜桃精品| 免费精品在线视频| 性做久久久久久免费观看| 69亚洲精品久久久蜜桃小说| 制服丝袜av成人在线看| 欧美一级在线免费观看| 在线观看日韩视频| 羞羞视频在线观看不卡| 国产97在线亚洲| 国产电影一区| 精品麻豆av| 一区二区三区四区在线观看国产日韩| 亚洲国产精品无码av| 日本免费新一区视频| 国产一精品一aⅴ一免费| 国产精品素人一区二区| 精品无码久久久久久久| 欧美体内she精视频| 亚洲第一色视频| 国产亚洲欧美aaaa| 国产啊啊啊视频在线观看| 国产精品高潮视频| 久久动漫网址| 蜜臀在线免费观看| 久久精品人人| 欧美激情 亚洲| 中文字幕一区二区三区在线不卡 | 51精品视频一区二区三区| 婷婷色在线观看| 久久在线视频在线| 成人福利一区二区| 九九99久久| 欧美日韩一视频区二区| 91极品尤物在线播放国产| 91色porny| 久一区二区三区| 91精品国产综合久久国产大片| 国产一区精品| 98视频在线噜噜噜国产| 日韩一区二区三区精品视频第3页| 日韩免费中文专区| 亚洲在线网站| 一级国产黄色片| 亚洲国产精品综合小说图片区| 97人妻精品一区二区三区动漫| 亚洲色图偷窥自拍| 亚洲黄色免费看| 国内精品久久国产| 亚洲视频精品| 97超碰免费在线观看| 亚洲欧美综合色| 97人妻精品一区二区三区软件 | 成年人视频免费在线播放| 成人亚洲欧美一区二区三区| 久久国产电影| 色悠悠久久综合网| 亚洲国产精品99久久久久久久久| 成年人免费高清视频| 亚洲成人中文字幕| 欧美人与动牲性行为| 91精品久久香蕉国产线看观看| 手机亚洲手机国产手机日韩| 91日韩视频在线观看| 国产日韩影视精品| 中文永久免费观看| 国产一区二区三区在线视频| 亚洲日本网址| 亚洲春色在线视频| 久久成人久久爱| 在线免费看av网站| 日韩视频免费观看高清完整版在线观看| 一区二区高清不卡| 91精品久久久久久久久久另类 | 欧美富婆性猛交| 伊人久久影院| 日韩av高清在线看片| 99久久免费国产| 中文字幕精品无码一区二区| 亚洲欧美另类中文字幕| 日韩免费va| 亚洲一区综合| 国产麻豆视频精品| 久久这里只有精品免费| 亚洲激情视频网| 日韩高清成人| 男女啪啪的视频| 成人中文字幕在线| 欧美h在线观看| 亚洲最新在线视频| 成人污污视频| 国产美女主播在线播放| 久久久久国产免费免费| 中文字幕精品无码亚| 久久伊人精品天天| 国产一级成人av| www.色就是色| 伊人一区二区三区| 天堂av网在线| 国产情人节一区| 亚洲国产裸拍裸体视频在线观看乱了中文 | 国产精品高潮呻吟久久久久| 免费无遮挡无码永久视频| 国产日韩欧美在线一区| av免费在线观看不卡| 97视频在线免费观看| 成人在线丰满少妇av| 免费看三级黄色片| 91极品美女在线| 欧美大片黄色| 日韩偷拍一区二区| 国产成人综合在线| а中文在线天堂| 欧美另类在线播放| 国产精品欧美三级在线观看| 人妻精品久久久久中文字幕69| 欧美视频中文字幕在线| 91cn在线观看| 日韩av高清| gogo大胆日本视频一区| 在线播放一级片| 欧美亚州一区二区三区| 女同性一区二区三区人了人一| 91网站免费视频| 亚洲福利影片在线|