接口冪等性解決方案:深入理解 “一鎖、二判、三更新”
前言
在分布式系統和高并發業務場景中,接口冪等性是保障數據一致性的關鍵環節。無論是網絡延遲導致的重試、用戶誤操作的重復提交,還是分布式服務間的異步調用重試,都可能引發重復執行問題,進而導致數據重復插入、金額重復扣減、狀態異常變更等嚴重后果。
本文將圍繞一鎖、二判、三更新核心口令,詳細拆解接口冪等性的解決方案。
一鎖、二判、三更新
解決接口冪等問題的核心邏輯可濃縮為一句口令一鎖、二判、三更新。這三個步驟環環相扣,缺一不可,嚴格遵循即可有效抵御并發重復請求,保障數據一致性。下面我們逐一拆解每個步驟的原理、實現方式及注意事項。
一鎖:加互斥鎖,阻斷并發重復請求
核心目標:通過加鎖確保同一時間內,只有一個請求能進入后續的判斷和更新流程,從源頭阻斷并發導致的重復執行。
為什么需要鎖?
在高并發場景下,即使后續有冪等判斷,若多個請求同時繞過判斷步驟(如并發查詢時均未查到歷史數據),仍會導致重復更新。例如:兩個并發的下單請求同時查詢是否已有該用戶的訂單,均返回無,隨后同時執行創建訂單操作,最終生成兩筆重復訂單。
因此,鎖的作用是強制并發請求排隊執行,確保同一業務場景下,只有一個請求能進入后續流程。
鎖的類型與選擇
- 分布式鎖:適用于分布式系統(多服務實例、多機器部署),常見實現方式包括
Redis分布式鎖、ZooKeeper分布式鎖、etcd鎖等。核心要求是跨服務、跨機器的互斥性。 - 悲觀鎖:適用于單體應用或數據庫層面的并發控制,通常通過數據庫的
SELECT ... FOR UPDATE語句實現,鎖定查詢行,阻止其他事務修改或查詢。核心要求是數據庫事務內的互斥性。
二判:冪等性判斷,識別重復請求
核心目標:在獲取鎖后,通過預先定義的規則判斷當前請求是否為重復請求,若已執行過則直接返回結果,避免重復處理,常見的冪等判斷方式:
基于狀態機判斷:
適用于業務存在明確狀態流轉的場景(如訂單狀態:待支付→已支付→已發貨→已完成)。通過判斷當前業務數據的狀態是否符合執行條件,來識別重復請求。例如:已支付狀態的訂單,若再次收到支付回調請求,則直接返回已處理,不執行后續邏輯。
基于流水表判斷:
適用于需要記錄請求軌跡的場景。通過建立冪等流水表,存儲每次請求的唯一標識(如請求ID、用戶ID + 業務ID),每次請求先查詢流水表:若已存在則為重復請求,若不存在則記錄流水并繼續執行。
基于唯一性索引判斷:
適用于數據庫層面的冪等控制,通過在業務表的關鍵字段上建立唯一索引(如訂單表的用戶ID + 商品ID + 下單時間戳聯合唯一索引),若重復請求執行插入操作,數據庫會拋出唯一鍵沖突異常,捕獲異常后即可判斷為重復請求。
關鍵注意事項:
- 唯一標識需全局唯一:例如用戶
ID+ 業務場景標識 + 請求唯一ID,避免不同業務場景的標識沖突;- 判斷邏輯需高效:若基于數據庫查詢判斷,需確保查詢字段已建立索引,避免全表掃描導致性能問題;
- 結果需緩存(可選):對于高頻重復請求,可將判斷結果緩存至
Redis,減少數據庫查詢次數。
三更新:執行數據更新,確保原子性
核心目標:在確認請求為非重復請求后,執行業務邏輯并更新數據,同時確保更新操作的原子性,避免因中間異常導致數據部分更新。
數據更新的核心要求
- 原子性:更新操作需作為一個整體執行,要么全部成功,要么全部失敗(如創建訂單時,需同時插入訂單表、扣減庫存表,兩步操作需在同一個數據庫事務內);
- 冪等性兼容:即使因極端情況(如鎖過期)導致重復進入更新步驟,也需通過業務邏輯或數據庫約束(如唯一索引)避免數據異常;
- 狀態同步:更新完成后,需同步更新相關狀態(如流水表標記已處理、業務表更新狀態字段),為后續重復請求的判斷提供依據。
常見的更新方式
- 數據庫事務:單體應用或單庫場景下,通過
@Transactional注解或手動開啟事務,確保多表操作的原子性; - 分布式事務:跨庫、跨服務場景下,需通過
TCC、SAGA、Seata等分布式事務框架,保障多節點更新的一致性(需注意分布式事務的性能開銷); - 狀態標記:更新完成后,立即更新業務數據的狀態或流水表的記錄,避免后續請求誤判。
代碼案例
數據庫表結構
-- 訂單表(t_order)
CREATE TABLE `t_order` (
`order_id` bigint NOT NULL AUTO_INCREMENT COMMENT '訂單ID',
`user_id` bigint NOT NULL COMMENT '用戶ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '購買數量',
`total_amount` decimal(10,2) NOT NULL COMMENT '訂單總金額',
`order_status` tinyint NOT NULL DEFAULT '0' COMMENT '訂單狀態:0-待支付,1-已支付,2-已取消,3-已完成',
`identifier` varchar(64) NOT NULL COMMENT '請求唯一標識(冪等判斷)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '邏輯刪除:0-未刪除,1-已刪除',
PRIMARY KEY (`order_id`),
UNIQUE KEY `uk_identifier` (`identifier`) COMMENT '唯一索引:確保同一請求不會重復創建訂單',
KEY `idx_user_id` (`user_id`) COMMENT '用戶ID索引:便于查詢用戶訂單'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='訂單表';
-- 冪等流水表(t_idempotent_flow)
CREATE TABLE `t_idempotent_flow` (
`flow_id` bigint NOT NULL AUTO_INCREMENT COMMENT '流水ID',
`business_scene` varchar(32) NOT NULL COMMENT '業務場景:ORDER-下單,PAY-支付',
`identifier` varchar(64) NOT NULL COMMENT '請求唯一標識',
`user_id` bigint NOT NULL COMMENT '用戶ID',
`handle_status` tinyint NOT NULL DEFAULT '0' COMMENT '處理狀態:0-待處理,1-處理成功,2-處理失敗',
`business_result` varchar(128) DEFAULT NULL COMMENT '業務結果:如訂單ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`flow_id`),
UNIQUE KEY `uk_business_identifier` (`business_scene`,`identifier`) COMMENT '唯一索引:同一業務場景下,請求標識唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='冪等流水表';自定義分布式鎖實現(核心一鎖)
/**
* 自定義分布式鎖注解:用于方法級別的鎖控制
*/
@Target(ElementType.METHOD) // 注解作用于方法
@Retention(RetentionPolicy.RUNTIME) // 運行時生效
@Documented // 生成文檔
public @interface DistributeLock {
/**
* 業務場景標識(如ORDER:下單,PAY:支付)
* 用于區分不同業務的鎖,避免鎖key沖突
*/
String scene();
/**
* 鎖的key表達式(支持SpEL表達式,如#request.identifier)
* 用于動態生成鎖的key,確保同一請求/業務的鎖唯一
*/
String keyExpression();
/**
* 鎖的過期時間(單位:毫秒)
* 避免服務宕機導致鎖永久持有,默認3000ms
*/
long expire() default 3000;
/**
* 獲取鎖的超時時間(單位:毫秒)
* 超過該時間未獲取到鎖,直接返回失敗,默認1000ms
*/
long timeout() default 1000;
}分布式鎖切面
可以使用
Redisson替換案例while循環
/**
* 分布式鎖切面:實現@DistributeLock注解的核心邏輯
*/
@Aspect
@Component
@Slf4j
public class DistributeLockAspect {
// Redis鎖的前綴(避免與其他Redis key沖突)
private static final String LOCK_PREFIX = "distribute:lock:";
// RedisTemplate:操作Redis
@Resource
private StringRedisTemplate stringRedisTemplate;
// SpEL表達式解析器:用于解析keyExpression(如#request.identifier)
private final ExpressionParser parser = new SpelExpressionParser();
/**
* 環繞通知:攔截被@DistributeLock注解的方法
*/
@Around("@annotation(distributeLock)")
public Object around(ProceedingJoinPoint joinPoint, DistributeLock distributeLock) throws Throwable {
// 1. 解析鎖的key(根據scene和keyExpression動態生成)
String lockKey = generateLockKey(joinPoint, distributeLock);
// 2. 生成鎖的value(UUID:用于釋放鎖時校驗,避免誤釋放其他線程的鎖)
String lockValue = UUID.randomUUID().toString();
// 3. 嘗試獲取分布式鎖
boolean lockAcquired = false;
try {
// 計算獲取鎖的超時時間(當前時間 + timeout)
long timeout = distributeLock.timeout();
long endTime = System.currentTimeMillis() + timeout;
// 循環獲取鎖,直到超時
while (System.currentTimeMillis() < endTime) {
// Redis命令:SET lockKey lockValue NX EX expireTime
// NX:只在key不存在時設置(確保互斥);EX:設置過期時間(避免死鎖)
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, distributeLock.expire(), TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(success)) {
lockAcquired = true;
log.info("獲取分布式鎖成功,lockKey:{},lockValue:{}", lockKey, lockValue);
// 4. 獲取鎖成功,執行目標方法(業務邏輯)
return joinPoint.proceed();
}
// 獲取鎖失敗,短暫休眠后重試(避免頻繁請求Redis)
TimeUnit.MILLISECONDS.sleep(100);
}
// 5. 超時未獲取到鎖,拋出異常
log.error("獲取分布式鎖超時,lockKey:{},超時時間:{}ms", lockKey, timeout);
throw new RuntimeException("系統繁忙,請稍后再試");
} finally {
// 6. 釋放鎖(僅釋放當前線程持有的鎖,避免誤釋放)
if (lockAcquired) {
releaseLock(lockKey, lockValue);
}
}
}
/**
* 生成分布式鎖的key:lockPrefix + scene + ":" + dynamicKey
*/
private String generateLockKey(ProceedingJoinPoint joinPoint, DistributeLock distributeLock) {
// 解析SpEL表達式,獲取動態key(如#request.identifier的值)
String dynamicKey = parseSpelExpression(joinPoint, distributeLock.keyExpression());
// 拼接完整的lockKey
return LOCK_PREFIX + distributeLock.scene() + ":" + dynamicKey;
}
/**
* 解析SpEL表達式:根據方法參數動態生成key(如#request.identifier)
*/
private String parseSpelExpression(ProceedingJoinPoint joinPoint, String keyExpression) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Object[] args = joinPoint.getArgs(); // 方法參數數組
String[] paramNames = signature.getParameterNames(); // 方法參數名數組
// 構建SpEL上下文:將參數名和參數值綁定,便于表達式解析
EvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < paramNames.length; i++) {
context.setVariable(paramNames[i], args[i]);
}
// 解析SpEL表達式,獲取動態key
Expression expression = parser.parseExpression(keyExpression);
return expression.getValue(context, String.class);
}
/**
* 釋放分布式鎖:僅當鎖的value與當前線程的lockValue一致時才釋放(避免誤釋放)
*/
private void releaseLock(String lockKey, String lockValue) {
try {
// 獲取Redis中當前鎖的value
String currentValue = stringRedisTemplate.opsForValue().get(lockKey);
// 校驗value:只有當前線程持有該鎖時,才刪除鎖
if (lockValue.equals(currentValue)) {
stringRedisTemplate.delete(lockKey);
log.info("釋放分布式鎖成功,lockKey:{},lockValue:{}", lockKey, lockValue);
} else {
log.warn("釋放分布式鎖失敗:鎖已被其他線程持有,lockKey:{},當前value:{},期望value:{}",
lockKey, currentValue, lockValue);
}
} catch (Exception e) {
log.error("釋放分布式鎖異常,lockKey:{},lockValue:{}", lockKey, lockValue, e);
}
}
}Service 層實現(核心二判、三更新)
/**
* 訂單服務實現類
*/
@Service
@Slf4j
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Resource
private OrderMapper orderMapper;
@Resource
private IdempotentFlowMapper idempotentFlowMapper;
/**
* 二判:冪等判斷(查詢訂單是否已存在)
* 邏輯:通過“商品ID + 請求標識(identifier)”查詢,確保同一請求不會重復創建訂單
*/
@Override
public OrderDTO queryOrder(Long productId, String identifier) {
// 1. 構建查詢條件(商品ID + 標識 + 未刪除)
LambdaQueryWrapper<Order> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(Order::getProductId, productId)
.eq(Order::getIdentifier, identifier)
.eq(Order::getIsDeleted, 0); // 邏輯刪除:僅查詢未刪除的訂單
// 2. 執行查詢
Order order = orderMapper.selectOne(queryWrapper);
if (order == null) {
log.info("冪等判斷:未查詢到訂單,productId:{},identifier:{}", productId, identifier);
return null;
}
// 3. 轉換為DTO返回
OrderDTO orderDTO = new OrderDTO();
BeanUtils.copyProperties(order, orderDTO);
log.info("冪等判斷:查詢到已存在訂單,orderId:{},identifier:{}", order.getOrderId(), identifier);
return orderDTO;
}
/**
* 三更新:創建訂單(事務原子性保障)
* 邏輯:1. 扣減庫存(此處簡化,實際需調用庫存服務);2. 創建訂單;3. 記錄冪等流水
*/
@Override
@Transactional(rollbackFor = Exception.class) // 事務:任何異常都回滾
public OrderDTO order(OrderRequest request) {
try {
// 1. 業務參數校驗(簡化:實際需更詳細的校驗,如庫存是否充足)
validateOrderRequest(request);
// 2. 扣減庫存(此處為簡化邏輯,實際應調用庫存服務,且需確保庫存服務的冪等性)
boolean stockDeducted = deductStock(request.getProductId(), request.getQuantity());
if (!stockDeducted) {
throw new RuntimeException("庫存不足,商品ID:" + request.getProductId());
}
// 3. 創建訂單(插入t_order表)
Order order = buildOrder(request);
orderMapper.insert(order);
log.info("訂單創建成功,orderId:{},userId:{}", order.getOrderId(), request.getUserId());
// 4. 記錄冪等流水(插入t_idempotent_flow表,便于后續追溯)
recordIdempotentFlow(request, order.getOrderId());
// 5. 轉換為DTO返回
OrderDTO orderDTO = new OrderDTO();
BeanUtils.copyProperties(order, orderDTO);
return orderDTO;
} catch (Exception e) {
log.error("創建訂單失敗,identifier:{},原因:{}", request.getIdentifier(), e.getMessage(), e);
// 拋出異常,觸發事務回滾
throw new RuntimeException("創建訂單失敗:" + e.getMessage());
}
}
/**
* 構建訂單實體(將Request轉換為Order)
*/
private Order buildOrder(OrderRequest request) {
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
// 計算總金額:數量 * 單價
BigDecimal totalAmount = request.getPrice().multiply(new BigDecimal(request.getQuantity()));
order.setTotalAmount(totalAmount);
order.setOrderStatus(0); // 初始狀態:待支付
order.setIdentifier(request.getIdentifier());
return order;
}
/**
* 記錄冪等流水(用于后續追溯和冪等判斷)
*/
private void recordIdempotentFlow(OrderRequest request, Long orderId) {
IdempotentFlow flow = new IdempotentFlow();
flow.setBusinessScene("ORDER"); // 業務場景:下單
flow.setIdentifier(request.getIdentifier());
flow.setUserId(request.getUserId());
flow.setHandleStatus(1); // 處理狀態:成功
flow.setBusinessResult("orderId:" + orderId); // 業務結果:訂單ID
idempotentFlowMapper.insert(flow);
log.info("冪等流水記錄成功,flowId:{},orderId:{}", flow.getFlowId(), orderId);
}
/**
* 扣減庫存(簡化邏輯:實際需調用庫存服務,且需加鎖避免超賣)
*/
private boolean deductStock(Long productId, Integer quantity) {
// 此處為模擬扣減:實際應操作庫存表(如t_stock),通過UPDATE t_stock SET stock = stock - quantity WHERE product_id = ? AND stock >= quantity
log.info("扣減庫存成功,productId:{},扣減數量:{}", productId, quantity);
returntrue;
}
/**
* 訂單請求參數校驗
*/
private void validateOrderRequest(OrderRequest request) {
if (request.getQuantity() <= 0) {
throw new IllegalArgumentException("購買數量必須大于0");
}
if (request.getPrice().compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("商品單價必須大于0");
}
}
}落地實踐
@Service
public class OrderApplyService {
private final OrderService orderService;
// 構造函數注入訂單服務
public OrderApplyService(OrderService orderService) {
this.orderService = orderService;
}
/**
* 下單接口:嚴格遵循“一鎖、二判、三更新”
* @param request 下單請求(包含用戶ID、商品ID、請求唯一標識identifier等)
* @return 下單響應
*/
// 一鎖:加Redis分布式鎖,鎖的場景為“ORDER”,鎖的key為請求唯一標識(確保同一請求僅一個線程執行)
@DistributeLock(
scene = "ORDER", // 鎖的業務場景標識
keyExpression = "#request.identifier", // 鎖的key:使用請求中的唯一標識(如UUID)
expire = 3000 // 鎖的過期時間:3秒(避免死鎖)
)
public OrderResponse apply(OrderRequest request) {
OrderResponse response = new OrderResponse();
// 二判:冪等性判斷(查詢是否已有該請求對應的訂單)
// 此處通過“商品ID+請求唯一標識”查詢,確保同一請求不會重復創建訂單
OrderDTO orderDTO = orderService.queryOrder(
request.getProductId(), // 商品ID
request.getIdentifier() // 請求唯一標識(如前端生成的UUID)
);
// 若查詢到訂單,說明是重復請求,直接返回“已處理”結果
if (orderDTO != null) {
response.setSuccess(true);
response.setResponseCode("DUPLICATED"); // 重復請求編碼
response.setMessage("訂單已存在,無需重復創建");
response.setOrderId(orderDTO.getOrderId()); // 返回已存在的訂單ID
return response;
}
// 三更新:執行下單業務邏輯(創建訂單、扣減庫存等,確保原子性)
// 此處orderService.order()方法內部需開啟數據庫事務,保障多表操作的原子性
OrderDTO createdOrder = orderService.order(request);
// 組裝并返回成功響應
response.setSuccess(true);
response.setResponseCode("SUCCESS");
response.setMessage("訂單創建成功");
response.setOrderId(createdOrder.getOrderId());
return response;
}
}
































