精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

三劍客 RocketMQ 事務消息 + 本地消息表 + XXL-Job 對賬,實現分布式事務 1w-10wqps 高并發

開發 架構
下游服務消費消息時,需通過 “訂單 ID” 實現冪等(避免重復扣庫存),同時記錄消費狀態,為后續對賬提供查詢依據;消費失敗時返回RECONSUME_LATER,觸發 RocketMQ 重試,重試耗盡后進入死信隊列。

CP (強一致)和AP(高并發)的根本沖突

從上面的指標數據可以知道, Seata AT/TCC是 強一致,并發能力弱。

CP (強一致)和AP(高并發)是一對  根本矛盾,存在根本沖突。

10Wqps 的高并發事務,并不是CP,而是屬于AP 高并發。Seata  如果不做特殊改造,很難滿足。

CAP 定理

CAP 該定理指出一個 分布式系統 最多只能同時滿足一致性(Consistency)可用性(Availability)和分區容錯性(Partition tolerance)這三項中的兩項。

CAP定理的三個要素可以用來描述分布式系統的一致性和可用性。

如果事務要追求高并發,根據cap定理,需要放棄強一致性,只需要保證數據的最終一致性

所以,在實踐可以使用本地消息表的方案來解決分布式事務問題。

經典ebay 本地消息表方案

本地消息表方案最初是ebay提出的,其實也是BASE理論的應用,屬于可靠消息最終一致性的范疇。

消息生產方/ 消息消費方,需要額外建一個消息表,并記錄消息發送狀態。

一個簡單的本地消息表, 設計如下:

字段

類型

注釋

id

long

id

msg_type

varchar

消息類型

biz_id

varchar

業務唯一標志

content

text

消息體

state

varchar

狀態(待發送,已消費)

create_time

datetime

創建時間

update_time

datetime

更新時間

消息表和業務數據要在一個事務里提交,也就是說他們要在一個數據庫里面。

然后消息會經過MQ發送到消息的消費方。如果消息發送失敗,會進行重試發送。

消息消費方  需要處理這個消息,并完成自己的業務邏輯。

此時如果本地事務處理成功,表明已經處理成功了,如果處理失敗,那么就會重試執行。

如果是業務上面的失敗,可以給生產方發送一個業務補償消息,通知生產方進行回滾等操作。

經典 ebay 本地消息表步驟

生產方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發送一遍。

發送消息方:

  • 需要有一個消息表,記錄著消息狀態相關信息。
  • 業務數據和消息表在同一個數據庫,要保證它倆在同一個本地事務。直接利用本地事務,將業務數據和事務消息直接寫入數據庫。
  • 在本地事務中處理完業務數據和寫消息表操作后,通過寫消息到 MQ 消息隊列。使用專門的投遞工作線程進行事務消息投遞到MQ,根據投遞ACK去刪除事務消息表記錄。
  • 消息會發到消息消費方,如果發送失敗,即進行重試。

消息消費方:

  • 處理消息隊列中的消息,完成自己的業務邏輯。
  • 如果本地事務處理成功,則表明已經處理成功了。
  • 如果本地事務處理失敗,那么就會重試執行。
  • 如果是業務層面的失敗,給消息生產方發送一個業務補償消息,通知進行回滾等操作。

生產方和消費方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發送一遍。

圖片圖片

經典ebay本地消息表方案中,還設計了靠譜的自動對賬補賬邏輯,確保數據的最終一致性。

經典ebay本地消息表的注意事項

使用本地消息表實現分布式事務可以確保消息在分布式環境中的可靠傳遞和一致性。

然而,需要注意以下幾點:

  • 消息的冪等性: 消費者一定需要保證接口的冪等性,消息的冪等性非常重要,以防止消息重復處理導致的數據不一致。
  • 本地消息表的設計: 本地消息表的設計需要考慮到消息狀態、重試次數、創建時間等字段,以便實現消息的跟蹤和管理。
  • 定時任務和重試機制: 需要實現定時任務或者重試機制來確保消息的可靠發送和處理。

經典ebay本地消息表訪問的優點和缺點:

優點:

  • 本地消息表建設成本比較低,實現了可靠消息的傳遞確保了分布式事務的最終一致性。
  • 無需提供回查方法,進一步減少的業務的侵入。
  • 在某些場景下,還可以進一步利用注解等形式進行解耦,有可能實現無業務代碼侵入式的實現。

缺點:

  • 本地消息表與業務耦合在一起,難于做成通用性,不可獨立伸縮。
  • 本地消息表是基于數據庫來做的,而數據庫是要讀寫磁盤IO的,因此在高并發下是有性能瓶頸的。
  • 數據大時,消息積壓問題,掃表效率慢。
  • 數據大時,事務表數據爆炸,定時掃表存在延遲問題。

使用定時任務(如 XXL-Job )實現分布式事務最終一致性方案

通過 XXL-Job 定時任務替代延遲消息,定期查詢 “待對賬” 業務數據,對比 Service A 與 Service B 的業務狀態,通過 “主動核查 + 差異修復” 確保最終一致性。

核心是 “本地事務保初始一致 + 定時任務查狀態差異 + 人工 / 自動補單修偏差”

1. Service A(發起方):本地消息表設計與事務保障

Service A 在執行核心業務(如創建訂單)時,需在本地數據庫事務中同時完成兩件事:

  • 執行核心業務邏輯(如插入訂單表,狀態標記為 “已創建”);
  • 插入 “本地消息對賬表”(字段含:對賬 ID、業務 ID(如訂單 ID)、業務類型(如 “訂單扣庫存”)、Service A 狀態(如 “訂單已創建”)、Service B 狀態(初始為 “未確認”)、對賬狀態(初始為 “待對賬”)、創建時間、最后對賬時間),確保 “業務成功則對賬記錄必存在”,避免初始數據缺失。

2. Service B(依賴方):業務狀態可查與結果反饋

Service B 執行依賴業務(如扣減庫存)時,需:

  • 執行核心業務邏輯(如扣減庫存表,標記 “已扣減”,并關聯 Service A 的業務 ID(訂單 ID));
  • 提供狀態查詢接口(如 “根據訂單 ID 查詢庫存扣減狀態”),返回 “已成功”“已失敗”“處理中” 三種明確狀態,方便定時任務核查;
  • 若 Service B 執行成功 / 失敗,可主動調用 Service A 的 “狀態回調接口” 更新本地消息對賬表的 “Service B 狀態”(非強制,定時任務會兜底核查)。

3、XXL-Job 定時任務設計:對賬與修復

定時任務執行時,按以下步驟完成對賬:

1)篩選待對賬數據:查詢 Service A 本地消息對賬表中 “對賬狀態 = 待對賬” 且 “創建時間超過 5 分鐘”(避免業務未執行完就對賬)的記錄,按分片范圍批量獲取(如每次查 1000 條,避免一次性查太多導致 OOM);

2) 雙端狀態查詢:對每條待對賬記錄,分別調用 Service A 的 “業務狀態接口”(確認訂單是否真的已創建)、Service B 的 “狀態查詢接口”(確認庫存是否已扣減);

3)狀態一致性判斷與處理:

  • 若 “Service A 成功 + Service B 成功”:更新本地消息對賬表 “對賬狀態 = 已一致”“Service B 狀態 = 已成功”,完成對賬;
  • 若 “Service A 失敗 + Service B 失敗”:更新 “對賬狀態 = 已一致”“Service B 狀態 = 已失敗”,無需額外處理;
  • 若 “Service A 成功 + Service B 失敗 / 處理中”:觸發自動重試(調用 Service B 的 “重試執行接口”,如重新扣減庫存),重試 3 次仍失敗則標記 “對賬狀態 = 不一致”,生成業務工單;
  • 若 “Service A 失敗 + Service B 成功”:屬于異常數據(Service A 業務失敗但 Service B 執行成功),直接標記 “對賬狀態 = 不一致”,生成業務工單;

4)異常兜底:若調用 Service A/Service B 接口超時,標記該記錄 “對賬狀態 = 待重試”,下次定時任務重新核查,避免因臨時網絡問題誤判不一致。

RocketMQ 事務消息 + 本地消息表 + XXL-Job 對賬  分布式式事務方案實操

本方案將規整為標準 Markdown 格式,同時補充 XXL-Job 定時任務事務對賬機制,通過 “事務消息保初始一致性 + 定時對賬兜底差異” 的雙層保障,確保電商下單(生成訂單→扣減庫存→發送通知)場景的分布式事務最終一致。

1. 系統架構

以電商下單場景為核心,涉及 3 個服務與 1 個中間件,職責分工明確:

  • 訂單服務(發起方):核心服務,負責生成訂單、記錄本地消息表、發送 RocketMQ 事務消息,同時提供訂單狀態查詢接口。
  • 庫存服務(依賴方 1):訂閱訂單消息,執行庫存扣減,提供庫存扣減狀態查詢接口,支持重試執行。
  • 通知服務(依賴方 2):訂閱訂單消息,發送短信 / APP 通知,提供通知發送狀態查詢接口。
  • RocketMQ:作為事務協調中間件,接收訂單服務的事務消息,確認本地事務成功后投遞消息至下游服務。
  • XXL-Job:定時任務調度中心,部署對賬任務,定期核查訂單服務與下游服務的業務狀態差異,兜底修復不一致數據。

2. 數據庫表設計

訂單服務的 本地消息表message_log)包括對賬相關字段(如對賬狀態、重試次數、下次對賬時間),支撐定時對賬邏輯;

message_log 同時 包含 核心業務字段,確保消息與訂單的關聯可追溯。

-- 訂單服務:本地消息表(含對賬字段)
CREATE TABLE message_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id BIGINT NOT NULL COMMENT '訂單ID(關聯t_order表,唯一)',
    rocketmq_msg_id VARCHAR(64) DEFAULT NULL COMMENT 'RocketMQ消息唯一ID(關聯消息中間件)',
    message_content TEXT NOT NULL COMMENT '消息內容(JSON格式,含orderId、skuId、quantity等)',
    business_type VARCHAR(32) NOT NULL COMMENT '業務類型:ORDER_CREATE(創建訂單)、INVENTORY_DEDUCT(扣庫存)、NOTICE_SEND(發通知)',
    msg_status ENUM('INIT','SENT','CONSUMED','FAIL') DEFAULT 'INIT' COMMENT '消息狀態:INIT=初始,SENT=已投遞,CONSUMED=已消費,FAIL=失敗',
    reconcile_status ENUM('PENDING','SUCCESS','FAIL','RETRY') DEFAULT 'PENDING' COMMENT '對賬狀態:PENDING=待對賬,SUCCESS=對賬一致,FAIL=對賬不一致,RETRY=待重試',
    retry_count TINYINT DEFAULT 0 COMMENT '對賬重試次數(最大5次)',
    next_reconcile_time DATETIME NOT NULL COMMENT '下次對賬時間(定時任務篩選依據)',
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '消息創建時間',
    update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '狀態更新時間',
    UNIQUE KEY uk_order_id_business_type (order_id, business_type) COMMENT '避免同一訂單同一業務類型重復發消息'
);
-- 訂單服務:訂單表(簡化,僅保留核心字段)
CREATE TABLE t_order (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id VARCHAR(64) NOT NULL COMMENT '訂單唯一編號',
    user_id BIGINT NOT NULL COMMENT '用戶ID',
    sku_id BIGINT NOT NULL COMMENT '商品SKU ID',
    quantity INT NOT NULL COMMENT '購買數量',
    order_status ENUM('CREATED','PAID','SHIPPED','FINISHED','CANCELED') DEFAULT 'CREATED' COMMENT '訂單狀態',
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '訂單創建時間',
    UNIQUE KEY uk_order_id (order_id)
);
-- 庫存服務:庫存扣減記錄表(支撐狀態查詢與冪等)
CREATE TABLE inventory_deduct_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id VARCHAR(64) NOT NULL COMMENT '訂單ID(關聯訂單服務)',
    sku_id BIGINT NOT NULL COMMENT '商品SKU ID',
    deduct_quantity INT NOT NULL COMMENT '扣減數量',
    deduct_status ENUM('SUCCESS','FAIL','PROCESSING') DEFAULT 'PROCESSING' COMMENT '扣減狀態',
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_order_id (order_id) COMMENT '按訂單ID冪等,避免重復扣減'
);

3. 代碼實現

3.1 Producer 端(訂單服務):事務消息發送

通過 TransactionMQProducer 發送事務消息,確保 “生成訂單” 與 “記錄本地消息” 在同一本地事務中,保證初始數據一致性;

同時初始化消息的對賬狀態(PENDING)與下次對賬時間(默認 5 分鐘后,避免業務未執行完就對賬)。

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Service
public class OrderServiceImpl implements OrderService {
    // 注入RocketMQ事務生產者(單例,由Spring容器初始化)
    @Resource
    private TransactionMQProducer transactionMQProducer;
    @Resource
    private OrderMapper orderMapper;
    @Resource
    private MessageLogMapper messageLogMapper;
    /
     * 創建訂單 + 發送事務消息
     */
    @Override
    public void createOrder(OrderCreateDTO dto) throws Exception {
        // 1. 構造訂單數據(生成唯一訂單號)
        String orderId = generateOrderId();
        TOrder order = TOrder.builder()
                .orderId(orderId)
                .userId(dto.getUserId())
                .skuId(dto.getSkuId())
                .quantity(dto.getQuantity())
                .orderStatus("CREATED")
                .build();
        // 2. 構造RocketMQ事務消息(主題:OrderTopic,標簽:INVENTORY_DEDUCT+NOTICE_SEND,支持多下游訂閱)
        String msgContent = JSON.toJSONString(dto);
        Message message = new Message(
                "OrderTopic",  // 主題:下游服務訂閱此主題
                "INVENTORY_DEDUCT||NOTICE_SEND",  // 標簽:區分業務類型,下游可按標簽過濾
                orderId.getBytes(StandardCharsets.UTF_8),  // 消息Key:訂單ID,便于定位
                msgContent.getBytes(StandardCharsets.UTF_8)
        );
        // 3. 發送事務消息(將訂單數據作為參數透傳給事務監聽器)
        transactionMQProducer.sendMessageInTransaction(message, order);
    }
    /
     * 事務監聽器:執行本地事務 + 事務回查
/
    @Resource
    private TransactionListener orderTransactionListener;
    // 初始化事務生產者時綁定監聽器(Spring Bean初始化方法)
    @PostConstruct
    public void initProducer() {
        transactionMQProducer.setTransactionListener(orderTransactionListener);
    }
    /
     * 本地事務邏輯(由監聽器調用,確保訂單與消息表同成功/同失敗)
/
    @Transactional(rollbackFor = Exception.class)
    public LocalTransactionState executeLocalTransaction(TOrder order, Message message) {
        try {
            // 步驟1:保存訂單到訂單表
            orderMapper.insert(order);
            // 步驟2:記錄本地消息表(對賬狀態初始為PENDING,下次對賬時間5分鐘后)
            Date nextReconcileTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
            MessageLog log = MessageLog.builder()
                    .orderId(order.getOrderId())
                    .rocketmqMsgId(message.getMsgId())
                    .messageContent(new String(message.getBody()))
                    .businessType("ORDER_CREATE")
                    .msgStatus("INIT")
                    .reconcileStatus("PENDING")
                    .retryCount(0)
                    .nextReconcileTime(nextReconcileTime)
                    .build();
            messageLogMapper.insert(log);
            // 步驟3:提交本地事務,返回COMMIT(通知RocketMQ投遞消息)
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事務失敗,回滾,返回ROLLBACK(通知RocketMQ丟棄消息)
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    /*
     * 事務回查邏輯(Broker未收到Commit/Rollback時觸發)
/
    public LocalTransactionState checkLocalTransaction(String orderId) {
        // 查本地消息表,按訂單ID判斷本地事務狀態
        MessageLog log = messageLogMapper.selectByOrderId(orderId);
        if (log == null) {
            return LocalTransactionState.ROLLBACK_MESSAGE; // 本地無記錄,回滾
        }
        // 本地消息已記錄,說明本地事務成功,返回COMMIT
        if ("INIT".equals(log.getMsgStatus()) || "PENDING".equals(log.getReconcileStatus())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    // 工具方法:生成唯一訂單號
    private String generateOrderId() {
        return "ORDER_" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
    }
}

3.2 Consumer 端(庫存服務):消息消費與冪等控制

下游服務消費消息時,需通過 “訂單 ID” 實現冪等(避免重復扣庫存),同時記錄消費狀態,為后續對賬提供查詢依據;消費失敗時返回RECONSUME_LATER,觸發 RocketMQ 重試,重試耗盡后進入死信隊列。

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Component
// 訂閱訂單主題,僅消費“扣庫存”標簽的消息
@RocketMQMessageListener(topic = "OrderTopic", selectorExpression = "INVENTORY_DEDUCT", consumerGroup = "inventory_consumer_group")
public class InventoryConsumer implements RocketMQListener<MessageExt> {
    @Resource
    private InventoryMapper inventoryMapper;
    @Resource
    private InventoryDeductLogMapper deductLogMapper;
    @Resource
    private MessageLogFeignClient messageLogFeignClient; // 調用訂單服務的消息表接口
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void onMessage(MessageExt messageExt) {
        // 1. 解析消息(獲取訂單ID、商品ID、扣減數量)
        String msgContent = new String(messageExt.getBody());
        OrderCreateDTO dto = JSON.parseObject(msgContent, OrderCreateDTO.class);
        String orderId = dto.getOrderId();
        Long skuId = dto.getSkuId();
        Integer quantity = dto.getQuantity();
        // 2. 冪等控制:查詢是否已扣減(按訂單ID)
        InventoryDeductLog existLog = deductLogMapper.selectByOrderId(orderId);
        if (existLog != null && "SUCCESS".equals(existLog.getDeductStatus())) {
            // 已成功扣減,直接返回成功
            messageLogFeignClient.updateMsgStatus(orderId, "CONSUMED"); // 通知訂單服務更新消息狀態
            return;
        }
        try {
            // 3. 執行庫存扣減(先查庫存是否充足)
            Inventory inventory = inventoryMapper.selectBySkuId(skuId);
            if (inventory == null || inventory.getStock() < quantity) {
                // 庫存不足,記錄失敗狀態,返回失敗(不重試,避免無效循環)
                deductLogMapper.insert(InventoryDeductLog.builder()
                        .orderId(orderId)
                        .skuId(skuId)
                        .deductQuantity(quantity)
                        .deductStatus("FAIL")
                        .build());
                messageLogFeignClient.updateMsgStatus(orderId, "FAIL"); // 通知訂單服務更新消息狀態
                throw new RuntimeException("庫存不足,扣減失敗");
            }
            // 4. 扣減庫存并記錄日志
            inventory.setStock(inventory.getStock() - quantity);
            inventoryMapper.updateById(inventory);
            deductLogMapper.insert(InventoryDeductLog.builder()
                    .orderId(orderId)
                    .skuId(skuId)
                    .deductQuantity(quantity)
                    .deductStatus("SUCCESS")
                    .build());
            // 5. 通知訂單服務更新消息狀態為“已消費”
            messageLogFeignClient.updateMsgStatus(orderId, "CONSUMED");
            // 返回消費成功
        } catch (Exception e) {
            // 消費失敗,記錄“處理中”狀態,返回重試
            deductLogMapper.insertOrUpdate(InventoryDeductLog.builder()
                    .orderId(orderId)
                    .skuId(skuId)
                    .deductQuantity(quantity)
                    .deductStatus("PROCESSING")
                    .build());
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    /
     * 對外提供庫存扣減狀態查詢接口(供XXL-Job對賬調用)
     */
    public String queryDeductStatus(String orderId) {
        InventoryDeductLog log = deductLogMapper.selectByOrderId(orderId);
        if (log == null) {
            return "NOT_PROCESSED"; // 未處理
        }
        return log.getDeductStatus(); // SUCCESS/FAIL/PROCESSING
    }
    /
     * 對外提供庫存扣減重試接口(供XXL-Job對賬修復調用)
     */
    public boolean retryDeduct(String orderId) {
        // 邏輯同onMessage,僅針對“PROCESSING/FAIL”狀態的記錄重試,此處省略
        return true;
    }
}

4. 基礎流程說明

4.1 正常流程

1、用戶發起下單請求,訂單服務調用 createOrder 方法,發送事務消息。

2、RocketMQ 收到 “半消息” 后,觸發訂單服務的本地事務(executeLocalTransaction):   - 本地事務成功:保存訂單、記錄本地消息表(msg_status=INITreconcile_status=PENDING),返回 COMMIT_MESSAGE。   - RocketMQ 確認后,將消息投遞至庫存服務、通知服務。

3、庫存服務 / 通知服務消費消息,執行業務邏輯(扣庫存 / 發通知),消費成功后通知訂單服務更新消息狀態為 CONSUMED

4、5 分鐘后,XXL-Job 對賬任務觸發,核查訂單服務與下游服務狀態一致,更新對賬狀態為 SUCCESS,流程閉環。

4.2 基礎異常流程(無對賬時)

  • 本地事務失敗:訂單 / 消息表插入失敗,返回 ROLLBACK_MESSAGE,RocketMQ 不投遞消息,無下游影響。
  • Broker 超時未收狀態:RocketMQ 觸發事務回查(checkLocalTransaction),按本地消息表狀態返回 COMMIT_MESSAGE,重新投遞消息。
  • Consumer 消費失敗:返回 RECONSUME_LATER,RocketMQ 按重試策略重試(默認 16 次),重試耗盡后進入死信隊列。

5. XXL-Job 定時事務對賬機制

5.1 對賬任務核心目標

解決 “基礎流程無法覆蓋的一致性問題”,例如:

  • Consumer 消費成功但未通知訂單服務更新消息狀態;
  • RocketMQ 投遞成功但 Consumer 因網絡問題未接收,重試超時;
  • 本地事務成功、消息投遞成功,但 Consumer 業務執行一半(如庫存扣減成功但日志未記錄)。

5.2 對賬任務配置

  • 執行頻率:每 5 分鐘執行一次(與消息表 next_reconcile_time 匹配,避免高頻占用資源)。
  • 分片策略:按 order_id 尾號分片(如 10 個分片,尾號 0-9),多執行器并行對賬,支撐百萬級訂單對賬效率。
  • 超時控制:單個分片任務超時時間設為 30 秒,超時標記為 “待重試”,下次對賬重新處理。
  • 任務依賴:依賴訂單服務、庫存服務、通知服務的 “狀態查詢接口” 與 “重試執行接口”。

5.3 核心對賬邏輯(XXL-Job 任務代碼)

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class OrderReconcileJob {
    @Resource
    private MessageLogMapper messageLogMapper;
    @Resource
    private OrderService orderService;
    @Resource
    private InventoryFeignClient inventoryFeignClient; // 調用庫存服務接口
    @Resource
    private NoticeFeignClient noticeFeignClient; // 調用通知服務接口
    @Resource
    private ReconcileWorkOrderMapper workOrderMapper; // 對賬工單表
    /*
     * XXL-Job 對賬任務(分片執行)
/
    @XxlJob("orderReconcileJob")
    public void execute() throws Exception {
        // 1. 獲取分片參數(當前分片號、總分片數)
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        // 2. 篩選待對賬數據:
        // - 對賬狀態為PENDING/RETRY
        // - 下次對賬時間 <= 當前時間
        // - 重試次數 < 5次
        // - 按order_id尾號分片查詢(避免重復)
        List<MessageLog> pendingLogs = messageLogMapper.selectPendingReconcile(
                shardIndex, shardTotal, 5, new Date()
        );
        if (pendingLogs.isEmpty()) {
            XxlJobHelper.log("當前分片無待對賬數據,分片號:{}", shardIndex);
            return;
        }
        // 3. 遍歷待對賬記錄,逐一對賬
        for (MessageLog log : pendingLogs) {
            String orderId = log.getOrderId();
            try {
                XxlJobHelper.log("開始對賬訂單:{}", orderId);
                // 步驟1:查詢各服務狀態
                // - 訂單服務狀態:是否已創建(CREATED)
                String orderStatus = orderService.queryOrderStatus(orderId);
                // - 庫存服務狀態:是否已扣減(SUCCESS/FAIL/PROCESSING)
                String inventoryStatus = inventoryFeignClient.queryDeductStatus(orderId);
                // - 通知服務狀態:是否已發送(SUCCESS/FAIL/PROCESSING)
                String noticeStatus = noticeFeignClient.queryNoticeStatus(orderId);
                // 步驟2:狀態一致性判斷與處理
                // 場景1:訂單已創建,庫存/通知均成功 → 對賬一致
                if ("CREATED".equals(orderStatus) 
                        && "SUCCESS".equals(inventoryStatus) 
                        && "SUCCESS".equals(noticeStatus)) {
                    messageLogMapper.updateReconcileStatus(orderId, "SUCCESS");
                    XxlJobHelper.log("訂單{}對賬一致,狀態更新為SUCCESS", orderId);
                    continue;
                }
                // 場景2:訂單已創建,庫存/通知存在PROCESSING → 待重試(下次對賬再查)
                if ("CREATED".equals(orderStatus) 
                        && ("PROCESSING".equals(inventoryStatus) || "PROCESSING".equals(noticeStatus))) {
                    // 更新下次對賬時間(10分鐘后)和重試次數
                    Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
                    messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
                    XxlJobHelper.log("訂單{}存在處理中狀態,下次對賬時間:{}", orderId, nextTime);
                    continue;
                }
                // 場景3:訂單已創建,庫存/通知存在FAIL → 觸發自動重試
                if ("CREATED".equals(orderStatus) 
                        && ("FAIL".equals(inventoryStatus) || "FAIL".equals(noticeStatus))) {
                    // 重試次數未超5次,調用下游重試接口
                    if (log.getRetryCount() < 5) {
                        boolean inventoryRetry = "FAIL".equals(inventoryStatus) 
                                ? inventoryFeignClient.retryDeduct(orderId) : true;
                        boolean noticeRetry = "FAIL".equals(noticeStatus) 
                                ? noticeFeignClient.retrySend(orderId) : true;
                        if (inventoryRetry && noticeRetry) {
                            // 重試成功,更新下次對賬時間(5分鐘后)
                            Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
                            messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
                            XxlJobHelper.log("訂單{}重試下游服務成功,下次對賬時間:{}", orderId, nextTime);
                        } else {
                            // 重試失敗,更新重試次數,下次繼續
                            Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(30));
                            messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
                            XxlJobHelper.log("訂單{}重試下游服務失敗,下次對賬時間:{}", orderId, nextTime);
                        }
                        continue;
                    }
                    // 重試次數超5次,生成人工工單
                    workOrderMapper.insert(ReconcileWorkOrder.builder()
                            .orderId(orderId)
                            .workOrderNo("RECONCILE_" + System.currentTimeMillis())
                            .faultDesc("訂單" + orderId + ":庫存狀態=" + inventoryStatus + ",通知狀態=" + noticeStatus + ",重試5次失敗")
                            .workOrderStatus("PENDING")
                            .createTime(new Date())
                            .build());
                    // 更新對賬狀態為FAIL
                    messageLogMapper.updateReconcileStatus(orderId, "FAIL");
                    XxlJobHelper.log("訂單{}對賬失敗,生成人工工單", orderId);
                    continue;
                }
                // 場景4:訂單狀態異常(如CANCELED)→ 對賬一致(無需下游處理)
                if ("CANCELED".equals(orderStatus)) {
                    messageLogMapper.updateReconcileStatus(orderId, "SUCCESS");
                    XxlJobHelper.log("訂單{}已取消,對賬狀態更新為SUCCESS", orderId);
                    continue;
                }
            } catch (Exception e) {
                // 對賬過程異常(如接口超時),標記為RETRY,下次再查
                Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
                messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
                XxlJobHelper.log("訂單{}對賬異常,原因:{},下次對賬時間:{}", orderId, e.getMessage(), nextTime);
            }
        }
        XxlJobHelper.handleSuccess("當前分片對賬完成,分片號:{},處理記錄數:{}", shardIndex, pendingLogs.size());
    }
}

5.4 對賬關鍵保障

1、冪等性:對賬任務按 order_id 與 reconcile_status 篩選,僅處理 “待對賬 / 待重試” 記錄,避免重復對賬。

2、重試策略:采用 “指數退避” 重試(5 分鐘→10 分鐘→30 分鐘),減少無效重試對服務的壓力。

3、人工兜底:重試 5 次仍失敗后生成工單,由運維 / 業務人員介入(如手動補扣庫存、重發通知),確保無數據遺漏。

4、數據清理:對賬狀態為 SUCCESS 且超過 30 天的記錄,定期歸檔至歷史表(如 message_log_hist),避免主表數據量過大影響查詢效率。

6. 完整異常場景處理

異常場景

現象

處理機制

本地事務失敗

訂單 / 消息表未插入,返回 ROLLBACK

RocketMQ 不投遞消息,無下游影響

Broker 回查

未收到 Commit/Rollback,觸發回查

查本地消息表,存在則返回 Commit,重新投遞

Consumer 消費超時

消息未被消費,RocketMQ 重試

重試 16 次后進入死信隊列,對賬任務發現后重試

消費成功未更狀態

Consumer 成功但未通知訂單服務,消息表仍為 INIT

對賬任務查下游狀態為 SUCCESS,更新消息表狀態為 CONSUMED

下游業務失敗(庫存不足)

庫存扣減失敗,狀態為 FAIL

對賬任務重試 5 次后生成人工工單,手動處理(如補充庫存)

經典ebay本地消息表 事務表數據爆炸 問題

經典ebay本地消息表 事務表數據爆炸, 定時任務掃表會很慢,存在巨大的延遲問題

解決的方案如下:

1、索引優化:在消息表中對狀態字段增加索引,以加速掃表操作。索引可以加速消息的檢索和篩選,從而提高操作效率。

2、分頁查詢:將掃表操作劃分為多次分頁查詢,避免一次性查詢大量數據造成的性能問題。

3、多線程 +  分段查詢:

  • 如果有業務標識,可以通過業務標識進行多線程分段掃表查詢。
  • 如果沒有業務標識可以按區間查詢比如線程1查詢0-1000的數據,線程2查詢1001-2000的數據。

4、表較大時進行分庫分表:如果表較大可以進行分庫分表操作。

10Wqps 本地消息表事務架構方案大總結

最終,通過引入一個中間的Rocketmq承擔本地消息表的職責,除了解決事務的一致性外,同樣可以解決消息的丟失與冪等性問題,一舉多得。

而且從業務的健壯性與數據一致性來看,一般都會增加一個補償機制, 實現數據的 最終一致性。這也是BASE理論所支持的。

如何設計 10Wqps高并發分布式事務? 如果能講 到尼恩答案 的 水平 , 面試官一定口水直流,  大廠 offer 就到手啦。

責任編輯:武曉燕 來源: 技術自由圈
相關推薦

2025-11-05 01:45:00

2023-09-18 08:27:20

RabbitMQRocketMQKafka

2024-06-07 08:06:36

2024-06-13 09:25:14

2024-06-26 11:55:44

2025-06-04 01:00:00

2025-04-29 04:00:00

分布式事務事務消息

2023-01-04 09:23:58

2011-03-28 16:04:44

nagios

2021-12-16 13:04:41

消息隊列緩存

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2022-06-27 08:21:05

Seata分布式事務微服務

2023-07-17 08:34:03

RocketMQ消息初體驗

2019-06-27 10:06:54

Linux 性能工具

2010-02-04 16:22:21

2022-01-27 08:44:58

調度系統開源

2024-01-26 13:17:00

rollbackMQ訂單系統

2023-10-04 00:20:31

grepLinux

2009-02-26 18:22:49

桌面虛擬化Linux

2017-07-25 08:53:14

CorrectLinkCCA-SD算法
點贊
收藏

51CTO技術棧公眾號

日韩精品视频网站| 国产欧美日韩在线一区二区| 亚洲午夜日本在线观看| 久久伦理网站| 国产免费福利视频| 一本久道久久久| 中文字幕亚洲一区| 白嫩情侣偷拍呻吟刺激 | 亚洲一区在线观看视频| 欧美日韩在线高清| 成人av免费播放| 日本成人在线视频网站| 国语自产精品视频在线看抢先版图片| 99久久99久久精品免费看小说.| 亚洲大奶少妇| 欧美三级在线播放| 女性女同性aⅴ免费观女性恋 | 国内精久久久久久久久久人| 国产一区二区三区四区在线| 老司机精品在线| 69成人精品免费视频| 日本成人在线免费视频| 国语对白在线刺激| 中文字幕一区二区视频| 欧洲一区二区在线| 无码精品人妻一区二区三区影院| 国内精品伊人久久久久影院对白| 国产精品99久久久久久www| 久久午夜无码鲁丝片午夜精品| 欧美韩日一区| 亚洲性无码av在线| 亚洲av无码国产精品久久| 91蝌蚪精品视频| 欧美一级xxx| 91国内在线播放| 97人人做人人爽香蕉精品| 精品欧美aⅴ在线网站| www插插插无码视频网站| 18+视频在线观看| 中文字幕综合网| 伊人久久av导航| 97电影在线观看| 国产亚洲视频系列| 神马欧美一区二区| 国产高清视频免费最新在线| 久久精品一级爱片| 欧美日韩综合精品| 蝌蚪视频在线播放| 久久久久亚洲蜜桃| 神马影院我不卡| 国产在线观看免费| 国产女主播视频一区二区| 日韩高清在线播放| 最新97超碰在线| 国产精品成人免费精品自在线观看| 深夜福利成人| 成人日韩欧美| 樱花影视一区二区| 97超碰在线人人| 性欧美18xxxhd| 色综合久久综合网97色综合 | av动漫免费观看| 国产原创在线观看| 亚洲一区二区五区| 国产伦精品一区二区三区四区视频_ | 一级一级黄色片| 免费人成精品欧美精品| 成人中文字幕在线观看| 精品国产va久久久久久久| 成人综合婷婷国产精品久久免费| 狠狠干一区二区| 番号集在线观看| 亚洲视频在线一区| www.国产在线视频| 日韩欧美看国产| 欧美日韩视频在线观看一区二区三区| 国产成人美女视频| 成人午夜网址| 亚洲欧美综合精品久久成人| 欧美a在线播放| 欧美一区二区三区另类| 5566成人精品视频免费| 中文字幕男人天堂| 高潮精品一区videoshd| 免费精品视频一区二区三区| 五月婷婷在线视频| 亚洲一区二区三区不卡国产欧美| 狠狠爱免费视频| 成人污版视频| 日韩精品亚洲元码| 亚洲熟女少妇一区二区| 日韩午夜一区| 成人亚洲欧美一区二区三区| 亚洲 欧美 自拍偷拍| 亚洲欧洲国产日韩| 无码人妻丰满熟妇区96| 白嫩亚洲一区二区三区| 日韩精品免费在线观看| 国产黄色片在线免费观看| 亚洲欧美日韩专区| 亚洲综合色av| 国产区视频在线| 亚洲高清视频在线| 美女在线视频一区二区| 色狼人综合干| 欧美黑人巨大xxx极品| 黄色污污视频软件| av电影在线观看一区| 一区二区三区观看| 春暖花开亚洲一区二区三区| 精品国产髙清在线看国产毛片| 能直接看的av| 午夜在线精品偷拍| 999国内精品视频在线| 欧美性天天影视| 色av综合在线| 亚洲av无码一区二区三区网址| 婷婷综合网站| 国产精品99久久久久久人| 天堂中文在线观看视频| 国产精品美女久久久久久久| 国产又大又硬又粗| 开心激情综合| 久久久久久免费精品| 国产视频第一页| 中文天堂在线一区| 粉嫩虎白女毛片人体| 欧美精品中文字幕亚洲专区| 久久99热精品| 国产精品美女一区| 国产精品电影一区二区三区| 男人透女人免费视频| 香蕉人人精品| 欧美诱惑福利视频| 午夜18视频在线观看| 亚洲国产综合人成综合网站| 国产吃瓜黑料一区二区| 午夜精品久久99蜜桃的功能介绍| 成人欧美一区二区三区在线湿哒哒| av网站在线免费观看| 欧洲精品中文字幕| 一级黄色录像毛片| 免费在线观看精品| 亚洲最新免费视频| 四虎国产精品免费久久5151| 日韩在线中文字| 国产精品自产拍| 亚洲美女免费在线| 在线观看欧美一区二区| 激情婷婷亚洲| 精品午夜一区二区| 韩国成人动漫| 亚洲一区999| 一本色道久久综合精品婷婷| 国产精品乱子久久久久| 亚洲一区二区偷拍| 欧美精品99| 国产精品美女诱惑| 美女精品视频一区| 好吊视频一区二区三区| 亚洲www啪成人一区二区麻豆| 国产精品入口麻豆| 快she精品国产999| 一区二区三区我不卡| 久久综合给合| 欧美激情欧美狂野欧美精品| 欧美视频综合| 欧美日韩精品欧美日韩精品| 永久看片925tv| 不卡一卡二卡三乱码免费网站| 无码人妻精品一区二区三区在线| 精品国产午夜| 99视频免费观看蜜桃视频| 超黄网站在线观看| 伊人久久五月天| 国产极品999| 岛国精品视频在线播放| 人成免费在线视频| 成人夜色视频网站在线观看| 久久精品国产精品亚洲色婷婷| 国产一区二区在线| 92看片淫黄大片欧美看国产片| www.色在线| 中文日韩电影网站| 亚洲精品国产精品国| 一本大道久久a久久精品综合| 调教驯服丰满美艳麻麻在线视频| 国产精品一二三| 免费av网址在线| 一二三区不卡| 欧美日韩无遮挡| 日韩精品三级| 国产精品久久久av久久久| 成人video亚洲精品| 国产亚洲精品久久久| 成人福利小视频| 欧美亚洲国产bt| 国产成人一区二区三区影院在线| 国产女主播视频一区二区| 黄色激情在线观看| 国产一区激情在线| 久久久久久久久久久久久国产精品| 亚洲一区二区三区无吗| 欧洲一区二区在线观看| 国产精品一线| 亚洲mm色国产网站| 日本在线中文字幕一区二区三区| 欧美精品久久久久| 久久99精品久久| 国产一区二区三区视频 | 成人激情校园春色| 中日韩av在线播放| 视频一区欧美日韩| 少妇大叫太大太粗太爽了a片小说| 成人动漫免费在线观看| 久久涩涩网站| 国产精品45p| 99在线高清视频在线播放| 国产第一亚洲| 国产99久久久欧美黑人| 美女的胸无遮挡在线观看| 欧美激情第99页| 午夜激情在线| 久久精品这里热有精品| 香蕉视频在线播放| 亚洲香蕉在线观看| 欧洲天堂在线观看| 日韩精品极品在线观看| 污视频在线免费观看| 精品欧美一区二区久久| a级片在线视频| 91精品国产综合久久久久| 在线观看亚洲国产| 欧美日产在线观看| 一级黄色大片免费观看| 欧美日韩在线播放三区四区| 国产日韩久久久| 在线视频国产一区| 啪啪小视频网站| 欧美亚洲综合另类| 中文在线免费看视频| 欧美视频一区二区三区四区| 精品久久久久久久久久久久久久久久久久| 福利精品视频在线| 久久久久久少妇| 91黄视频在线| 老熟妇一区二区三区啪啪| 欧美自拍偷拍午夜视频| 中文字幕制服诱惑| 欧美蜜桃一区二区三区| 国产麻豆91视频| 日韩写真欧美这视频| 亚洲精品视频网| 亚洲国产天堂网精品网站| 五月婷婷丁香花| 亚洲欧美国产日韩中文字幕| 福利在线视频导航| 久久精品免费播放| 污视频在线看网站| 96精品视频在线| 色婷婷综合久久久中字幕精品久久| 国产精品久久久久久久久免费| 欧美大陆国产| 亚洲在线免费看| 久久婷婷国产| 日本精品国语自产拍在线观看| 日韩欧美大片| 成人午夜视频免费观看| 亚洲精品乱码久久久久久蜜桃麻豆| 无码人妻丰满熟妇区毛片18| 秋霞午夜av一区二区三区| 欧美体内she精高潮| 91首页免费视频| 蜜桃av.com| 亚洲图片有声小说| 免费av中文字幕| 欧美一区二区三区婷婷月色| 天天射天天操天天干| 一区二区三区www| 色www永久免费视频首页在线| 欧美一级片在线播放| 国产福利一区二区三区在线播放| 99久久自偷自偷国产精品不卡| 日本在线中文字幕一区| 在线综合视频网站| 亚洲毛片播放| 欧美性受xxxxxx黑人xyx性爽| 国产 日韩 欧美大片| 日本乱子伦xxxx| 亚洲精品国产a久久久久久| 日韩免费av网站| 日韩欧美一区中文| 二区三区在线| 高清欧美性猛交xxxx黑人猛交| 日本精品网站| 国产在线一区二| 国产精品不卡| 人妻丰满熟妇av无码区app| 国产成a人亚洲精| 午夜黄色福利视频| 欧美日韩国产一中文字不卡| 99国产精品久久久久久久成人| 国产婷婷97碰碰久久人人蜜臀| 亚洲精品白浆| 国产啪精品视频| 国产免费久久| 国模无码视频一区二区三区| 国产91综合一区在线观看| 殴美一级黄色片| 日韩欧美亚洲国产一区| 嫩草影院一区二区| 蜜臀久久99精品久久久无需会员 | 玖玖在线精品| 国产黑丝一区二区| 亚洲精品伦理在线| 一本色道久久综合亚洲| 夜夜嗨av一区二区三区四区| 亚洲v.com| 国产综合第一页| 狠狠久久婷婷| 国产又黄又嫩又滑又白| 亚洲欧美色图小说| 国产又粗又大又爽视频| 亚洲视频在线观看视频| 中老年在线免费视频| 国产精品国产精品国产专区蜜臀ah| 综合色一区二区| xxxx在线免费观看| 国产精品色噜噜| 一级二级三级视频| 日韩最新av在线| 4438五月综合| 久久精品国产精品亚洲精品色| 久久精品国产成人一区二区三区| 9.1片黄在线观看| 欧美午夜免费电影| 在线观看完整版免费| 国产精品久久久久久中文字| 欧美日韩国产在线观看网站| www.99av.com| 国产精品私房写真福利视频| 曰批又黄又爽免费视频| 色偷偷88888欧美精品久久久| 欧美一级免费| 肉大捧一出免费观看网站在线播放 | 日本sm极度另类视频| 亚洲自拍都市欧美小说| 男人天堂999| 国产嫩草影院久久久久| 天堂av免费在线观看| 中文字幕亚洲综合久久| 一区在线不卡| 青青青青在线视频| av高清久久久| 亚洲av无码乱码国产精品fc2| 中文字幕少妇一区二区三区| 国产一区精品二区| 欧美黑人在线观看| 99国产精品99久久久久久| 久久久免费高清视频| 国产一区二区三区网站| 亚洲国产aⅴ精品一区二区三区| 老司机午夜网站| 成人av第一页| 国产男人搡女人免费视频| 久久激情视频久久| 成人动态视频| 成人黄色一区二区| 亚洲女同ⅹxx女同tv| 四虎永久在线观看| 国产精品日日摸夜夜添夜夜av| 91精品国产麻豆国产在线观看| 日本一区二区免费视频| 一本一本大道香蕉久在线精品| 91大神xh98hx在线播放| 97视频资源在线观看| 久久精品导航| 精品国产欧美日韩不卡在线观看| 亚洲激情视频网| 国产黄色精品| 免费在线观看亚洲视频 | 久久久久无码国产精品| 国产网站欧美日韩免费精品在线观看| 国语自产精品视频在线看抢先版结局| 日韩一区二区高清视频| 久久精品一区二区三区不卡| www.五月婷婷| 国产精品jvid在线观看蜜臀 | 成人黄色一区二区| 亚洲一区日韩精品中文字幕| 国产高清视频在线观看| 国产免费高清一区| 久久精品国产亚洲一区二区三区| 国产精品99精品| 日韩最新在线视频| 亚洲人成伊人成综合图片| 中文字幕av一区二区三区人妻少妇 | 免费一级欧美片在线观看网站| 农村妇女精品一二区| 亚洲高清视频在线|