精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

深度解析:基于 RocketMQ 實現(xiàn)分布式事務(wù)的技術(shù)實踐與原理探究

開發(fā)
本文我們嘗試基于RocketMQ實現(xiàn)下單的分布式的事務(wù)。可能會有讀者會有疑問,之前我們不是基于Seata完成了分布式事務(wù),為什么我們還要用到RocketMQ呢?

在上一篇文章Spring Boot自動裝配原理以及實踐我們完成了服務(wù)通用日志監(jiān)控組件的開發(fā),確保每個服務(wù)都可以基于一個注解實現(xiàn)業(yè)務(wù)功能的監(jiān)控。 而本文我們嘗試基于RocketMQ實現(xiàn)下單的分布式的事務(wù)。可能會有讀者會有疑問,之前我們不是基于Seata完成了分布式事務(wù),為什么我們還要用到RocketMQ呢?

我們的再來回顧一下我們下單功能大抵是做以下三件事情:

  • 創(chuàng)建訂單,將訂單記錄存到數(shù)據(jù)庫中。
  • 扣款,記錄用戶扣款后錢包所剩下的額度。
  • 扣除商品庫存,并發(fā)放商品。

我們將該場景放到高并發(fā)場景下,這個功能勢必要考慮性能和可靠性問題,所以我們在業(yè)務(wù)需求清楚明了的情況下,就希望能有一種方式確保下單功能在高并發(fā)場景保證性能、可靠性。 而Seata的AT模式確實可以保證最終一致性,但是seata的AT模式本質(zhì)上是依賴于global_table、branch_table等數(shù)據(jù)表維護應(yīng)用層分布式事務(wù),在操作期間會涉及大量的更新和刪除操作,隨著時間的推移還是會出現(xiàn)大量的索引碎片,導(dǎo)致索引性能下降。

所以我們就考慮采用RocketMQ實現(xiàn)分布式事務(wù),盡管RocketMQ對于分布式事務(wù)的實現(xiàn)業(yè)務(wù)侵入性相對強一些,但它可以保證業(yè)務(wù)層面的功能解耦從而提升并發(fā)性能,且RocketMQ還對消息消費可靠性做了許多不錯的優(yōu)化,例如:失敗重試、死信隊列等,所以我們還是嘗試使用RocketMQ來改良我們的下單分布式事務(wù)問題。

一、詳解RocketMQ落地分布式事務(wù)案例

1. 需求說明

用戶下單大抵需要在三個服務(wù)中完成:

  • 訂單服務(wù)完成訂單創(chuàng)建,基于用戶傳入的產(chǎn)品編碼、用戶編碼、產(chǎn)品購買數(shù)生成訂單信息,對應(yīng)的調(diào)用參數(shù)如下:
{
    "accountCode": "0932897",
    "productCode": "P003",
    "count": 1
}
  • 基于入?yún)⒌挠脩舸a定位到用戶錢包金額,完成賬戶扣款。
  • 基于產(chǎn)品和購買數(shù)完成庫存扣減。

這其中會跨域三個服務(wù),分別是訂單服務(wù)創(chuàng)建訂單、賬戶服務(wù)扣款、商品服務(wù)扣減庫存。

2. 落地思路

以我們業(yè)務(wù)為最終目標,RocketMQ實現(xiàn)分布式事務(wù)的原理是基于2PC的,流程大抵如下:

  • 訂單服務(wù)發(fā)送一個事務(wù)消息到消息隊列,消息內(nèi)容就是我們的訂單信息,這里面包含用戶賬號、購買的產(chǎn)品代碼、購買產(chǎn)品數(shù)量等數(shù)據(jù)。
  • MQ收到half消息,并回復(fù)ack確認。
  • 生產(chǎn)者(訂單服務(wù)order-service)得知我們發(fā)送的消息已被收到,訂單服務(wù)則執(zhí)行本地事務(wù)并提交事務(wù),即將訂單信息寫入數(shù)據(jù)庫中,同時在該事務(wù)內(nèi)將訂單插入結(jié)果寫入transaction_log表中。
  • 生產(chǎn)者(訂單服務(wù)order-service)完成本地事務(wù)的提交,告知MQ將事務(wù)消息commit,此時消費者就可以消費這條消息了,注意若生產(chǎn)者消費失敗,則將消息rollback,一切就當沒有發(fā)生過。
  • 如果上述的消息是commit則將消息持久化到commitLog中,以便后續(xù)MQ宕機或者服務(wù)宕機后依然可以繼續(xù)消費這條沒有被消費的消息。
  • (非必要步驟)若MQ長時間沒有收到生產(chǎn)者的commit或者rollback的信號,則攜帶事務(wù)id找生產(chǎn)者查詢transaction_log索要當前消息狀態(tài),如果看到對應(yīng)的消息則判定生產(chǎn)者事務(wù)成功將消息commit給消費者消費,若沒看到則說明生產(chǎn)者本地事務(wù)執(zhí)行失敗,回滾該消息。
  • 消費者即我們的用戶服務(wù)或者庫存服務(wù)收到消息則執(zhí)行本地事務(wù)并提交,若失敗則會不斷重試,直到達到上限則將消息存到死信隊列并告警。
  • 人工介入查看死信隊列查看失敗消息手工補償數(shù)據(jù)。

二、實踐-基于RocketMQ實現(xiàn)分布式事務(wù)

1. 部署RocketMQ(Linux環(huán)境)

在編寫業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個版本:https://rocketmq.apache.org/download/。

完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對應(yīng)的配置內(nèi)容如下所示:

export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時啟動時腳本會拋出runserver.sh: 70: [[: Exec format error錯誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報錯找到runserver.sh這行對應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對應(yīng)的GC算法:

以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:

choose_gc_options()
{

      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
 
}

完成后鍵入./mqnamesrv &將MQ啟動,如果彈窗輸出下面這條結(jié)果,則說明mq的NameServer啟動成功。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動broker,需要注意的是默認情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動成功,自此mq就在windows環(huán)境部署成功了。我們就可以開始編碼工作了。

The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

2. 服務(wù)引入MQ完成下單功能開發(fā)

(1) 服務(wù)引入RocketMQ依賴

完成RocketMQ部署之后,我們就可以著手編碼工作了,首先我們要在在三個服務(wù)中引入RocketMQ的依賴,由于筆者的spring-boot版本比較老,所以這里筆者為了統(tǒng)一管理在父pom中指定了mq較新的版本號:

<!--rocketmq-->
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

然后我們分別對order、account、product三個服務(wù)中引入依賴:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
           
        </dependency>

(2) 注冊中心配置RocketMQ信息

由于我們的分布式事務(wù)涉及3個服務(wù),而且mq的消費模式采用的是發(fā)布訂閱模式,所以我們的生產(chǎn)者(order-service)和消費者(account-serivce)都配置為cloud-group

rocketmq.name-server=172.29.193.12:9876
# 指定消費者組
rocketmq.producer.group=cloud-group

之所以沒有沒將消費者2(product-service)也配置到cloud-group中的原因也很簡單,同一個消息只能被同一個消費者組中的一個成員消費,假如我們的將product-service配置到同一個消費者組中就會出現(xiàn)因一條消息只能被一個服務(wù)消費而導(dǎo)致product-service收不到消息。

對此我們實現(xiàn)思路有兩種:

  • 將服務(wù)都放到同一個消費者組,消費模式改為廣播模式。
  • 將product-service設(shè)置到別的消費者組中。

考慮后續(xù)擴展筆者選擇方案2,將產(chǎn)品服務(wù)的訂閱者放到消費者組2中:

rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2

(3) 創(chuàng)建消息日志表

我們在上文進行需求梳理時有提到一個MQServer沒收到生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài)進行回查的操作,所以我們在生產(chǎn)者在執(zhí)行本地事務(wù)時,需要創(chuàng)建一張表記錄生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài),建表SQL如下:

DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `transaction_id` varchar(50) DEFAULT NULL,
  `log` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(4) 完成order服務(wù)half消息發(fā)送、監(jiān)聽、回查回調(diào)邏輯

我們的訂單服務(wù)需要做以下三件事:

  • 發(fā)送half消息給MQ。
  • half消息發(fā)送成功執(zhí)行本地事務(wù)并記錄日志。
  • 告知MQ可以提交事務(wù)消息。

所以我們需要定義一下消息格式,對象類中必須包含訂單號、產(chǎn)品編碼、用戶編碼、購買產(chǎn)品數(shù)量等信息。

@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {

    private static final long serialVersionUID = 1L;

 //設(shè)置主鍵自增,避免插入時沒必要的報錯
    @TableId(value = "ID", type = IdType.AUTO)
    private Integer id;

    /**
     * 訂單號
     */
    private String orderNo;

    /**
     * 用戶編碼
     */
    private String accountCode;

    /**
     * 產(chǎn)品編碼
     */
    private String productCode;

    /**
     * 產(chǎn)品扣減數(shù)量
     */
    private Integer count;

    /**
     * 余額
     */
    private BigDecimal amount;

    /**
     * 本次扣減金額
     */
    private BigDecimal price;
}

然后我們就可以編寫控制層的代碼了,通過獲取前端傳輸?shù)膮?shù)調(diào)用orderService完成half消息發(fā)送。

@PostMapping("/order/createOrderByMQ")
    public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
        log.info("基于mq完成用戶下單流程,請求參數(shù): " + JSON.toJSONString(orderDTO));
        orderService.createOrderByMQ(orderDTO);
        return ResultData.success("基于mq完成用戶下單完成");

    }

orderService的實現(xiàn)邏輯很簡單,定義好消息設(shè)置消息頭內(nèi)容和消息載體的對象,通過sendMessageInTransaction方法完成半消息發(fā)送,需要了解一下消息的主題(topic)為ORDER_MSG_TOPIC,只有訂閱這個主題的消費者才能消費這條消息:

@Autowired
    private RocketMQTemplate rocketMQTemplate;

@Override
    public void createOrderByMQ(OrderDto orderDto) {


        //創(chuàng)建half消息對應(yīng)的事務(wù)日志的id
        String transactionId = UUID.randomUUID().toString();

        //調(diào)用產(chǎn)品服務(wù)獲取商品詳情
        ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
        //計算總售價
        BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
        orderDto.setAmount(amount);

        //將訂單信息作為載體
        Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                //下單用戶編碼
                .setHeader("accountCode", orderDto.getAccountCode())
                //產(chǎn)品編碼
                .setHeader("productCode", orderDto.getProductCode())
                //產(chǎn)品購買數(shù)
                .setHeader("count", orderDto.getCount())
                //下單金額
                .setHeader("amount", amount)
                .build();

        //發(fā)送half消息
        rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);


    }

完成half消息發(fā)送之后,我們就必須知曉消息發(fā)送結(jié)果才能確定是否執(zhí)行本地事務(wù)并提交,所以我們的訂單服務(wù)必須創(chuàng)建一個監(jiān)聽器了解half消息的發(fā)送情況,executeLocalTransaction方法就是mq成功收到半消息后的回調(diào)函數(shù),一旦我們得知消息成功發(fā)送之后,MQ就會執(zhí)行這個方法,筆者通過這個方法獲取消息頭的參數(shù)創(chuàng)建訂單對象,調(diào)用createOrderWithRocketMqLog完成訂單的創(chuàng)建的本地事務(wù)成功的日志記錄。

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
    private final IOrderService orderService;
    private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;

    /**
     * 監(jiān)聽到發(fā)送half消息,執(zhí)行本地事務(wù)
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("order執(zhí)行本地事務(wù)");


        try {
            //解析消息頭
            MessageHeaders headers = message.getHeaders();
            //獲取購買金額
            BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
            //獲取訂單信息
            Order order = Order.builder()
                    .accountCode((String) headers.get("accountCode"))
                    .amount(amount)
                    .productCode((String) headers.get("productCode"))
                    .count(Integer.valueOf(String.valueOf(headers.get("count"))))
                    .build();
            //獲取事務(wù)id
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            //執(zhí)行本地事務(wù)和記錄事務(wù)日志
            orderService.createOrderWithRocketMqLog(order, transactionId);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("創(chuàng)建訂單失敗,失敗原因: {}", e.getMessage(), e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        
    }

    /**
     * 本地事務(wù)的檢查,檢查本地事務(wù)是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        MessageHeaders headers = message.getHeaders();
        //獲取事務(wù)ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("檢查本地事務(wù),事務(wù)ID:{}", transactionId);
        //根據(jù)事務(wù)id從日志表檢索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id", transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        //如果消息表存在,則說明生產(chǎn)者事務(wù)執(zhí)行完成,回復(fù)commit
        if (null != rocketmqTransactionLog) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        //回復(fù)rollback
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

createOrderWithRocketMqLog做了兩件事,分別是插入訂單信息和創(chuàng)建消息日志,這里筆者用到了事務(wù)注解確保了兩個操作的原子性。 這樣一來,MQserver后續(xù)的回查邏輯完全可以基于RocketmqTransactionLog 進行判斷,如果消息的事務(wù)id在表中存在,則說明生產(chǎn)者本地事務(wù)成功,反之就是失敗。

@Transactional(rollbackFor = Exception.class)
    @Override
    public void createOrderWithRocketMqLog(Order order, String transactionId) {
        //創(chuàng)建訂單編號
        order.setOrderNo(UUID.randomUUID().toString());
        //插入訂單信息
        orderMapper.insert(order);
        //事務(wù)日志
        RocketmqTransactionLog log = RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("執(zhí)行創(chuàng)建訂單操作")
                .build();
        rocketmqTransactionLogMapper.insert(log);
    }

補充一下基于MP生成的RocketmqTransactionLog 類代碼:

@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog對象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "ID", type = IdType.AUTO)
    private Integer id;

    private String transactionId;

    private String log;


}

(5) 完成account、product監(jiān)聽事件

然后我們就可以實現(xiàn)用戶服務(wù)和商品服務(wù)的監(jiān)聽事件了,一旦生產(chǎn)者提交事務(wù)消息之后,這幾個消費者都會收到這個topic(主題)的消息,進而完成當前服務(wù)的業(yè)務(wù)邏輯。

先來看看實現(xiàn)扣款的用戶服務(wù),我們的監(jiān)聽器繼承了RocketMQListener,基于@RocketMQMessageListener注解設(shè)置它訂閱的主題為createByRocketMQ,一旦收到這個主題的消息時這個監(jiān)聽器就會執(zhí)行onMessage方法,我們的邏輯很簡單,就是獲取消息的內(nèi)容完成扣款,唯一需要注意的就是線程安全問題。我們的壓測的情況下,單用戶可能會頻繁創(chuàng)建訂單,在并發(fā)期間同一個用戶的扣款消息可能同時到達扣款服務(wù)中,這就導(dǎo)致單位時間內(nèi)扣款服務(wù)從數(shù)據(jù)庫中查詢到相同的余額,執(zhí)行相同的扣款邏輯,導(dǎo)致金額少扣了。

所以我們必須保證扣款操作互斥和原子化,考慮到筆者當前項目環(huán)境是單體,所以就用簡單的synchronized 關(guān)鍵字解決問題。

@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {

    @Resource
    private AccountMapper accountMapper;

    //強制轉(zhuǎn)為runTimeException
    @SneakyThrows
    @Override
    public void onMessage(OrderDto orderDto) {
        log.info("賬戶服務(wù)收到消息,開始消費");
        QueryWrapper<Account> query = new QueryWrapper<>();
        query.eq("account_code", orderDto.getAccountCode());
        //解決單體服務(wù)下線程安全問題
        synchronized (this){
            Account account = accountMapper.selectOne(query);
            BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
            if (subtract.compareTo(BigDecimal.ZERO)<0){
                throw new Exception("用戶余額不足");
            }
            account.setAmount(subtract);
            log.info("更新賬戶服務(wù),請求參數(shù):{}", JSON.toJSONString(account));
            accountMapper.updateById(account);
        }


    }
}

然后就說商品服務(wù),邏輯也很簡單,也同樣要注意一下線程安全問題:

@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
    @Resource
    private ProductMapper productMapper;

    @Override
    public void onMessage(OrderDto orderDto) {
        log.info(" 產(chǎn)品服務(wù)收到消息,開始消費");
        QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
        queryWrapper.eq("product_code",orderDto.getProductCode());
        synchronized (this){
            Product product = productMapper.selectOne(queryWrapper);
            if (product.getCount()<orderDto.getCount()){
                throw new RuntimeException("庫存不足");
            }

            product.setCount(product.getCount()-orderDto.getCount());
            log.info("更新產(chǎn)品庫存信息,請求參數(shù):{}", JSON.toJSONString(product));
            productMapper.updateById(product);
        }



    }
}

三、基于幾個測試用例驗證MQ半消息事務(wù)

1. 前置準備與說明

完整編碼工作后,自測是非常有必要的,我們?nèi)粘M瓿砷_發(fā)任務(wù)后,都會結(jié)合需求場景以及功能編排一些自測用例查看最終結(jié)果是否與預(yù)期一致。 需要注意的是由于訂單業(yè)務(wù)邏輯較為復(fù)雜,很多業(yè)務(wù)場景一篇博客是不可能全部覆蓋,所以這里我們就測試一下基于RocketMQ實現(xiàn)分布式事務(wù)常見的幾個問題場景是否和預(yù)期一致。

在測試前我們必須做好前置準備工作,準備功能測試時涉及到的SQL語句,以本次用戶購買產(chǎn)品的業(yè)務(wù)為例,涉及到訂單表、用戶賬戶信息表、產(chǎn)品表、以及生產(chǎn)者本地事務(wù)日志表。

SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;

在每次測試完成之后,我們希望數(shù)據(jù)能夠還原,所以這里也需要準備一下每次測試結(jié)束后的更新語句,由于訂單表和消息日志表都是主鍵自增,考慮到這兩張表只涉及插入,所以筆者為了重置主鍵的值采取的是truncate語句。

truncate  table  t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;

2. 測試正常消費

第一個用例是查看所有服務(wù)都正常的情況下,訂單表是否有數(shù)據(jù),用戶表的用戶是否會正常扣款,以及商品表庫存是否會扣減。

測試前,我們先查看訂單表,確認沒有數(shù)據(jù)

查看我們的測試用戶,錢包額度為10000

再查看庫存表,可以看到數(shù)量為1000

確認完數(shù)據(jù)之后,我們就可以測試服務(wù)是否按照預(yù)期的方式執(zhí)行,將所有服務(wù)啟動。

我們通過網(wǎng)關(guān)發(fā)起調(diào)用,請求地址如下:

http://localhost:8090/order/order/createOrderByMQ

請求參數(shù)如下,從參數(shù)可以看出這個請求意為用戶代碼(accountCode)為demoData這個用戶希望購買1個(count)產(chǎn)品代碼(productCode)為P001的產(chǎn)品,該產(chǎn)品當前售價(price)為1元。

{
    "accountCode": "0932897",
    "productCode": "P003",
    "count": 1
}

調(diào)用完成后,查看訂單表,訂單數(shù)據(jù)生成無誤:

圖片圖片

查看用戶服務(wù)是否完成用戶扣款,扣款無誤:

查看產(chǎn)品表,可以看到產(chǎn)品數(shù)量也準確扣減:

3. 測試生產(chǎn)者commit提交失敗

我們希望測試一下發(fā)送完half消息之后,執(zhí)行本地事務(wù)完成,但是未提交commit請求時,MQServer是否會調(diào)用回查邏輯。

為了完成這一點我們必須按照以下兩個步驟執(zhí)行:

  • 在訂單服務(wù)提交事務(wù)消息處打個斷點。

  • 發(fā)起請求,當代碼執(zhí)行到這里的時候通過jps定位到進程號,將其強制殺死。如下所示,我們的代碼執(zhí)行到了提交事務(wù)消息這一步:

我們通過jps定位并將其殺死::

完成這些步驟后,我們再次將服務(wù)啟動,等待片刻之后可以發(fā)現(xiàn),MQServer會調(diào)用checkLocalTransaction回查生產(chǎn)者本地事務(wù)的情況。我們放行這塊代碼讓程序執(zhí)行下去,最后再查看數(shù)據(jù)庫中的數(shù)據(jù)結(jié)果是否符合預(yù)期。

4. 測試消費者消費失敗

測試消費者執(zhí)行報錯后是否會進行重試,這一點就比較好測試了,我們在消費者監(jiān)聽器中插入隨便插入一個報錯查看其是否會不斷重試。這里筆者就不多做演示,實驗結(jié)果是會進行不斷重試,當重試次數(shù)達到閾值時會將結(jié)果存到死信隊列中。

四、壓測MQ和Seata的性能

由于MQ是采用異步消費的形式解耦了服務(wù)間的業(yè)務(wù),而我們的Seata采用默認的AT模式每次執(zhí)行分布式事務(wù)時都會需要借助undo-log、全局鎖等的方式保證最終一致性。所以理論上RocketMQ的性能肯定是高于Seata的,對此我們不妨使用Jmeter進行壓測來驗證一下。

本次壓測只用了1000個并發(fā),MQ和seata的壓測結(jié)果如下,可以看到MQ無論從執(zhí)行時間還是成功率都遠遠優(yōu)秀于Seata的。

MQ的壓測結(jié)果:

Seata的壓測結(jié)果,可以看到大量的數(shù)據(jù)因為lock_table鎖超時而導(dǎo)致失敗,所以整體性能表現(xiàn)非常差勁:

五、詳解RocketMQ落地分布式事務(wù)常見問題

1. RocketMQ 如何保證事務(wù)的最終一致性

最終一致性是一種允許軟狀態(tài)存在的分布式事務(wù)解決方案,RocketMQ 保證事務(wù)最終一致性的方式主要是依賴生產(chǎn)者本地事務(wù)和消息可靠發(fā)送的原子性來最大努力保證最終一致性,注意這里筆者所強調(diào)的盡最大努力交付。

之所以說是最大努力交付是說RocketMQ是通過保證生產(chǎn)者事務(wù)和消息發(fā)送可靠性的原子性和一致性,由此保證消費者一定能夠消費到消息,理想情況下,只要消費者能夠正確消費消息,事務(wù)結(jié)果最終是可以保證一致性的,但是復(fù)雜的系統(tǒng)因素消費者可能會存在消費失敗的情況,此時事務(wù)最終一致性就無法保證,業(yè)界的做法是通過手動操作或者腳本等方式完成數(shù)據(jù)補償。

2. 什么是half消息

half消息即半消息,和普通消息的區(qū)別是該消息不會立馬被消費者消費,原因是half消息的存在是為了保證生產(chǎn)者本地事務(wù)和消費者的原子性和一致性,其過程如上文所介紹,初始發(fā)送的half消息是存儲在MQ一個內(nèi)存隊列中(并未投遞到topic中),只有生產(chǎn)者本地事務(wù)成功并發(fā)送commit通知后,這個消息才會被持久化到commitLog同時提交到topic隊列中,此時消費者才能夠消費該消息并執(zhí)行本地事務(wù)。

3. 為什么要先發(fā)送half消息再執(zhí)行本地事務(wù)?先執(zhí)行本地事務(wù),成功后在發(fā)送不行嗎?

先發(fā)送half消息的原因是為了盡可能確保生產(chǎn)者和消息隊列通信正常,只有通信正常了才能確保生產(chǎn)者本地事務(wù)和消息發(fā)送的原子性和一致性,由此保證分布式事務(wù)的可靠性。

先執(zhí)行本地事務(wù),執(zhí)行成功后再發(fā)送存在一個問題,試想一下,假設(shè)我們本地事務(wù)執(zhí)行成功,但是發(fā)送的消息因為網(wǎng)絡(luò)波動等諸多原因?qū)е翸Q沒有收到消息,此時生產(chǎn)者和消費者的分布式事務(wù)就會出現(xiàn)數(shù)據(jù)不一致問題。

而half消息則不同,它會優(yōu)先發(fā)送一個消費者感知不到的half消息確認通信可達,然后執(zhí)行本地事務(wù)后降消息設(shè)置未commit讓消費者消費,即使說commit消息未收到,因為half消息的存在,MQ在指定超時先限制后也可以通過回查的方式到生產(chǎn)者事務(wù)表查詢執(zhí)行情況。

4. 如果mq收到half消息,準備發(fā)送success信號的消息給生產(chǎn)者,但因為網(wǎng)絡(luò)波動導(dǎo)致生產(chǎn)者沒有收到這個消息要怎么辦?

此時生產(chǎn)者就會認為half消息發(fā)送失敗,本地事務(wù)不執(zhí)行,隨著時間推移MQ長時間沒收到commit或者rollback消息就會回查生產(chǎn)者消息日志表,明確沒看到數(shù)據(jù)則知曉生產(chǎn)者本地事務(wù)執(zhí)行失敗,直接rollback掉half消息,而消費者全程無感知,業(yè)務(wù)上的一致性也是可以保證。

5. MQ沒有收到生產(chǎn)者(訂單服務(wù))的commit或者rollback信號怎么保證事務(wù)最終一致性?

常規(guī)的做法就是建立一張表記錄消息狀態(tài),只要我們訂單信息插入成功就需要日志一下這條數(shù)據(jù),所以我們必須保證訂單數(shù)據(jù)插入和日志插入表中的原子性,確保生產(chǎn)者的事務(wù)和消息日志的ACID:

6. 如果生產(chǎn)者執(zhí)行本地事務(wù)失敗了怎么辦?

這一點前面的部分也已經(jīng)說明,首先將本地會事務(wù)回滾,并向消息隊列提交一個rollback的請求不提交half消息,消息就不會被消費者消費,保證最終一致性。

7. 前面說的都是事務(wù)流程?這和事務(wù)消息如何保證數(shù)據(jù)最終一致性有什么關(guān)系?

生產(chǎn)者和消息隊列事務(wù)流程可以確保生產(chǎn)者和消息隊列發(fā)送的一致性,確保寫操作都是同時成功或者失敗。只有保證兩者正常通信,才能確保消費者可以消費MQ中的消息從而完成數(shù)據(jù)最終一致性。

8. 消費者提交本地事務(wù)失敗了怎么辦?

我們都知道消息隊列只能保證消息可靠性,而無法保證分布式事務(wù)的強一致性,出現(xiàn)這種情況,消費者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會在一定時間后,繼續(xù)將這條消息推送給消費者,消費者就可以繼續(xù)執(zhí)行本地事務(wù)并提交了,直到成功消息隊列會進行N次重試,如果還是失敗,則可以到死信隊列中查看失敗消息,然后通過補償機制實現(xiàn)分布式事務(wù)最終一致性。

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2024-01-26 13:17:00

rollbackMQ訂單系統(tǒng)

2024-07-08 07:30:47

2022-06-21 08:27:22

Seata分布式事務(wù)

2025-03-25 10:29:52

2023-05-12 08:02:43

分布式事務(wù)應(yīng)用

2022-08-26 00:02:03

RocketMQ單體架構(gòu)MQ

2024-01-05 07:28:50

分布式事務(wù)框架

2024-01-08 08:05:08

分開部署數(shù)據(jù)體系系統(tǒng)拆分

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2024-01-09 08:00:58

2023-01-06 09:19:12

Seata分布式事務(wù)

2021-04-23 08:15:51

Seata XA AT

2024-08-15 08:03:52

2019-08-19 10:24:33

分布式事務(wù)數(shù)據(jù)庫

2024-06-13 09:25:14

2022-03-01 16:26:09

鏈路監(jiān)控日志監(jiān)控分布式系統(tǒng)

2024-11-28 15:11:28

2022-07-10 20:24:48

Seata分布式事務(wù)

2019-11-19 08:32:26

數(shù)據(jù)庫HLC事務(wù)
點贊
收藏

51CTO技術(shù)棧公眾號

免费在线视频你懂得| 久久精品性爱视频| 日韩成人一区| 一区二区视频免费在线观看| 国产伦精品一区二区三区视频孕妇 | 激情一区二区三区| 性色av一区二区三区四区| 亚洲国产精品日韩专区av有中文 | 九色精品美女在线| 色诱av手机版| 亚洲综合在线电影| 亚洲欧美成人一区二区三区| 韩国精品一区二区三区六区色诱| 日日骚一区二区三区| 亚洲日产av中文字幕| 日韩亚洲电影在线| 青青在线视频免费| 97在线视频免费观看完整版| 中文字幕精品一区| 激情小说综合网| av中文在线观看| 视频一区二区不卡| 97婷婷涩涩精品一区| 欧美丰满熟妇bbbbbb| 欧美亚洲高清| 亚洲精品综合精品自拍| 中国老熟女重囗味hdxx| 久久青草视频| 日本精品视频一区二区三区| 免费av观看网址| 超黄网站在线观看| 亚洲欧美激情插| 一区二区视频国产| av在线二区| 国产欧美一区二区精品婷婷| 久久天天狠狠| 无码国产精品高潮久久99| 国产精品18久久久久| 91精品视频在线看| 一区精品在线观看| 麻豆成人91精品二区三区| 秋霞午夜一区二区| 亚洲不卡在线视频| 国产欧美丝祙| 97在线免费观看视频| 国产精品9191| 最新成人av网站| 久久久久国产一区二区三区| 久久午夜无码鲁丝片| 国内一区二区三区| 欧美激情a∨在线视频播放| 久草视频免费在线播放| 欧美特黄视频| 久久久久久久一区二区| 国产一级视频在线播放| 尤物网精品视频| 国产69精品久久久| 亚洲精品www久久久久久| 亚洲美女视频在线免费观看| 91av视频在线观看| 一级片免费在线播放| 日日欢夜夜爽一区| 国产色视频一区| av av片在线看| 成人综合婷婷国产精品久久蜜臀 | 日韩av三区| 精品日韩av一区二区| 日韩精品国产一区| 久久婷婷国产| 亚洲日本成人女熟在线观看| 韩国女同性做爰三级| 久久综合电影| 色综合男人天堂| 欧美精品韩国精品| 日韩福利视频导航| 亚洲qvod图片区电影| 欧美一级淫片免费视频魅影视频| 久久99久久99| 91久久精品www人人做人人爽| 岛国av中文字幕| 男男视频亚洲欧美| 18成人免费观看网站下载| 亚洲男女视频在线观看| 91偷拍与自偷拍精品| 亚洲欧美日韩精品综合在线观看| 污视频在线免费观看| 97久久久精品综合88久久| 日韩av大全| free性欧美hd另类精品| 精品久久久久久久久久久久久| 免费的一级黄色片| 性xxxxfreexxxxx欧美丶| 欧美综合一区二区| 日本中文字幕有码| 国产午夜一区| 美女扒开尿口让男人操亚洲视频网站| 精品人妻中文无码av在线 | 美女爽到高潮91| 91国产在线播放| 国产在线视频你懂得| 亚洲同性gay激情无套| www.爱色av.com| 9999精品视频| 亚洲色图激情小说| 久久久久久久久99| 免费高清在线视频一区·| 国产精品有限公司| 免费在线观看av网站| 婷婷开心久久网| 日本中文字幕在线不卡| 欧美日韩xxxx| 欧美激情小视频| 97人妻人人澡人人爽人人精品| 日韩1区2区日韩1区2区| 国产精品10p综合二区| 天天在线视频色| 黑人巨大精品欧美一区二区| 热久久久久久久久| 国产精品嫩模av在线| 色综合五月天导航| 91国内精品视频| 91在线精品一区二区| 8x8ⅹ国产精品一区二区二区| 免费不卡av| 91麻豆精品国产91久久久久久久久 | 日韩女优制服丝袜电影| www.涩涩爱| 久久综合影视| 欧美精品亚洲精品| 一区二区精品伦理...| 亚洲国产成人在线播放| 黄色一级视频免费| 国产精品亚洲一区二区三区妖精| 成人欧美一区二区| 91精品久久| 5858s免费视频成人| 天天操天天摸天天舔| 蜜臀av亚洲一区中文字幕| 欧美最大成人综合网| 黄色污网站在线观看| 精品99一区二区三区| 高h视频免费观看| 国产一区二区在线影院| 国产a级片免费看| 亚洲欧洲一二区| 久久久精品国产亚洲| 国产欧美一级片| 亚洲精品成人精品456| 杨幂一区二区国产精品| 欧美区日韩区| 国产麻豆乱码精品一区二区三区| 九一在线视频| 在线观看中文字幕不卡| 免费网站在线高清观看| 日韩成人一级大片| 亚洲午夜精品一区二区| 青青在线精品| 久久久精品影院| 精品国产伦一区二区三| 一区二区不卡在线视频 午夜欧美不卡在| 无码精品a∨在线观看中文| 欧美a一欧美| 日本成人在线视频网址| 成年人视频在线观看免费| 欧美日韩在线播放三区| 性爱在线免费视频| 国产在线精品一区二区不卡了| 欧美一级二级三级| 国产第一亚洲| 久久精品一本久久99精品| 国产av无码专区亚洲av| 亚洲午夜三级在线| av在线网站观看| 免费观看日韩av| 欧美中文字幕在线观看视频| 日韩欧美影院| 国产精品视频专区| 18av在线播放| 亚洲欧洲国产精品| 一级淫片免费看| 香港成人在线视频| 亚洲精品91在线| 国产91精品欧美| 99精品视频播放| 婷婷综合在线| 精品国产乱码久久久久久久软件 | a视频网址在线观看| 欧美一区永久视频免费观看| 国产做受高潮漫动| 中文字幕一区二区三区色视频| 97在线免费公开视频| 我不卡神马影院| 久久99精品久久久久久秒播放器| 羞羞视频在线免费国产| 亚洲精品美女久久久| 亚洲精品一区二区二区| 亚洲福中文字幕伊人影院| av电影网站在线观看| 韩国v欧美v亚洲v日本v| 国产精品宾馆在线精品酒店| 9191国语精品高清在线| 欧美高清性xxxxhd | 欧美性猛交xxxx偷拍洗澡| 黄色香蕉视频在线观看| av亚洲产国偷v产偷v自拍| 污污的网站免费| 免费亚洲一区| 国产天堂视频在线观看| 婷婷久久国产对白刺激五月99| 91免费电影网站| 26uuu亚洲电影| 欧美激情精品久久久久久变态| 亚洲国产精品成人久久蜜臀| 一本色道久久综合精品竹菊| 精品小视频在线观看| 国产精品第四页| 国产ts在线播放| 成人精品电影在线观看| aaa一级黄色片| 日韩精品成人一区二区三区| 欧美 日韩 国产在线观看| 欧美粗暴jizz性欧美20| 视频一区二区精品| 免费精品国产| 久久一区二区精品| 欧美交a欧美精品喷水| 99久久精品无码一区二区毛片 | 国产人妻黑人一区二区三区| 卡一卡二国产精品 | 爱情岛亚洲播放路线| 最近2019好看的中文字幕免费 | 国产美女视频一区| 一路向西2在线观看| 一区二区三区午夜视频| 亚洲一区二区在线免费观看| 国内精品视频在线观看| 久久久久久欧美精品色一二三四| 成人免费福利| 国产不卡av在线免费观看| 在线能看的av网址| 555www成人网| 成av人片在线观看www| 国产做受高潮69| а√天堂中文在线资源8| 久久久最新网址| 678在线观看视频| 97视频在线看| 成人av三级| 国产精品精品久久久久久| 一区二区视频免费完整版观看| 欧美黑人xxxx| 视频在线这里都是精品| 欧美精品videosex性欧美| 国产va在线视频| 欧美一区二区视频97| 写真福利精品福利在线观看| 国产精品免费视频xxxx| 成人在线视频区| 国产精品久久久久久久天堂第1集| 国产一区一一区高清不卡| 国产精品夜间视频香蕉| 国产999精品在线观看| 成人激情av在线| baoyu135国产精品免费| 精品视频一区二区三区四区| 日韩影视高清在线观看| 欧美一区国产一区| 日韩久久电影| 菠萝蜜视频在线观看入口| 亚洲国产mv| 一区二区三区 欧美| 国产一区二区h| 国产三级国产精品| 国产精品毛片大码女人| 亚洲色图综合区| 欧美日韩中文字幕综合视频| 中文字幕人妻一区二区三区视频| 欧美性黄网官网| 一区二区视频网| 精品国产电影一区二区| 国产日韩精品在线看| 精品自在线视频| 国产高潮在线| 国产美女久久久| 福利片一区二区| 色吧亚洲视频| 黑人一区二区三区四区五区| 成人av一级片| 国产在线日韩欧美| 四虎永久免费在线观看| 亚洲欧美国产毛片在线| 欧美成人一区二区三区四区| 日韩欧美一区二区视频| 国产色在线 com| 欧美国产精品日韩| 欧美天堂一区二区| 久久久久网址| 中文字幕一区二区三区欧美日韩 | 欧美区高清在线| 91成人免费| 日本va中文字幕| 国产成人欧美日韩在线电影| 国产精品无码久久久久一区二区| 国产色产综合产在线视频| 欧美日韩大片在线观看| 欧美日韩在线综合| 久久米奇亚洲| 97激碰免费视频| 日韩三级久久| 在线看视频不卡| 噜噜噜在线观看免费视频日韩 | 自拍视频国产精品| 韩日毛片在线观看| 97人人模人人爽人人喊38tv| 精品国产123区| 老太脱裤子让老头玩xxxxx| 国产一区二区0| 色偷偷www8888| 欧美日韩一级大片网址| 久久精品a一级国产免视看成人| 国产性色av一区二区| 国产后进白嫩翘臀在线观看视频| 欧美最猛性xxxx| 精品欠久久久中文字幕加勒比| 精品欧美一区二区在线观看视频 | 精品一区二区成人免费视频| 日韩极品在线观看| 蜜桃传媒一区二区亚洲av| 午夜精品一区二区三区三上悠亚| www.中文字幕在线观看| 精品国产一区二区三区忘忧草| 天天操天天干天天插| 欧美极品少妇xxxxⅹ裸体艺术| 国产福利片在线观看| 国产精品国产精品| 欧美精品午夜| 国产精品果冻传媒| 亚洲国产精品一区二区尤物区| 日本久久综合网| 亚洲午夜女主播在线直播| 欧美理论影院| 视频一区视频二区视频| 久久精品观看| 香蕉视频久久久| 在线精品视频一区二区| 欧美一区二区三区在线观看免费| 欧美黑人性视频| 综合久久成人| 激情伊人五月天| 久久这里只精品最新地址| 国产91精品看黄网站在线观看| 精品日韩成人av| 精品人人视频| 欧洲视频一区二区三区| 奇米精品一区二区三区在线观看 | 亚洲美女爱爱视频| 欧美高清一级片在线观看| 亚洲一卡二卡在线| 中文字幕日韩在线视频| 在线视频成人| 欧美 亚洲 视频| 成av人片一区二区| 亚洲黄色免费观看| 中文字幕亚洲激情| 亚洲成人偷拍| 欧美 日韩 国产 高清| 久久久蜜桃精品| 91在线公开视频| 久久久天堂国产精品女人| 日韩成人av在线资源| 黄色手机在线视频| 一级日本不卡的影视| 你懂的免费在线观看视频网站| 欧美激情视频播放| 美女亚洲一区| 亚洲欧美手机在线| 精品国产户外野外| 毛片在线播放a| 国产精品三区在线| 日本怡春院一区二区| 久热这里有精品| 亚洲欧洲av一区二区| 国产激情综合| av天堂永久资源网| 国产精品久久久久久久久果冻传媒| 日韩在线 中文字幕| 久久久精品一区二区| 国产欧美啪啪| 日本人69视频| 精品成人av一区| 成人影欧美片| 日本成人三级电影网站| 国产精品88888| 中文在线免费观看| 国内精品免费午夜毛片| 久久精品亚洲人成影院| 一区二区不卡免费视频| 91精品国产欧美一区二区| 国产一区二区主播在线| 国产一区二区三区小说|