Redis + MQ:高并發(fā)秒殺的技術(shù)方案與實(shí)現(xiàn)
前言
在電商秒殺場(chǎng)景中,瞬間爆發(fā)的海量請(qǐng)求往往成為系統(tǒng)的生死考驗(yàn)。當(dāng)并發(fā)量達(dá)到數(shù)萬(wàn)甚至數(shù)十萬(wàn)QPS時(shí),傳統(tǒng)數(shù)據(jù)庫(kù)單表架構(gòu)難以支撐,而Redis與消息隊(duì)列(MQ)的組合憑借其高性能與可靠性,成為應(yīng)對(duì)高并發(fā)秒殺的黃金方案。
方案總覽
用戶請(qǐng)求 → 前端生成Token → Redis執(zhí)行Lua腳本(預(yù)扣減+防重+流水)→ 發(fā)送RocketMQ事務(wù)消息 →
[本地事務(wù)校驗(yàn)Redis結(jié)果] → MQ消息確認(rèn)(COMMIT/ROLLBACK)→ 消費(fèi)者消費(fèi)消息 → MySQL扣減庫(kù)存+記錄訂單秒殺系統(tǒng)的核心訴求是抗并發(fā)、防超賣、保一致。Redis+MQ 方案通過(guò) “前端攔截 - 中間緩沖 - 后端落地” 的三層架構(gòu)實(shí)現(xiàn)這一目標(biāo):
- 前端攔截:
Redis通過(guò)Lua腳本原子性處理庫(kù)存預(yù)扣減,過(guò)濾無(wú)效請(qǐng)求; - 中間緩沖:
MQ(如RocketMQ)通過(guò)事務(wù)消息削峰填谷,確保流量平穩(wěn)進(jìn)入數(shù)據(jù)庫(kù); - 后端落地:
MySQL最終存儲(chǔ)庫(kù)存與訂單數(shù)據(jù),通過(guò)事務(wù)消息保障與Redis的一致性。
流程拆解(示例代碼)
Redis 庫(kù)存預(yù)扣減
預(yù)扣減流程
開(kāi)始
│
├─ 生成Token(前端)
│
├─ 前端攜帶Token請(qǐng)求秒殺
│
├─ 執(zhí)行Lua腳本
│ │
│ ├─ 檢查T(mén)oken是否存在(Hash結(jié)構(gòu))
│ │ ├─ 存在 → 返回“重復(fù)提交”
│ │ └─ 不存在 → 繼續(xù)
│ │
│ ├─ 獲取Redis庫(kù)存(String結(jié)構(gòu))
│ │ ├─ 庫(kù)存不足 → 返回“庫(kù)存不足”
│ │ └─ 庫(kù)存充足 → 繼續(xù)
│ │
│ ├─ 扣減Redis庫(kù)存并更新
│ │
│ └─ 記錄流水到Hash結(jié)構(gòu)
│
├─ 返回扣減結(jié)果(成功/失敗)
│
結(jié)束Lua 腳本
-- 啟用Redis命令復(fù)制,確保腳本在集群環(huán)境中正確同步
redis.replicate_commands()
-- 1. 防重提交校驗(yàn):通過(guò)用戶ID+Token判斷是否重復(fù)提交
-- KEYS[2]為用戶ID(uid),ARGV[2]為本次請(qǐng)求的Token
if redis.call('hexists', KEYS[2], ARGV[2]) == 1 then
return redis.error_reply('repeat submit') -- 重復(fù)提交,返回錯(cuò)誤
end
-- 2. 庫(kù)存充足性校驗(yàn)
local product_id = KEYS[1] -- 商品ID
local stock = redis.call('get', KEYS[1]) -- 獲取當(dāng)前庫(kù)存
if not stock then -- 庫(kù)存不存在(如商品未上架)
return redis.error_reply('product not found')
end
if tonumber(stock) < tonumber(ARGV[1]) then -- 庫(kù)存不足
return redis.error_reply('stock is not enough')
end
-- 3. 執(zhí)行庫(kù)存扣減
local remaining_stock = tonumber(stock) - tonumber(ARGV[1])
redis.call('set', KEYS[1], tostring(remaining_stock)) -- 更新庫(kù)存
-- 4. 記錄交易流水(用于后續(xù)一致性校驗(yàn))
local time = redis.call('time') -- 獲取當(dāng)前時(shí)間(秒+微秒)
local currentTimeMillis = (time[1] * 1000) + math.floor(time[2] / 1000) -- 轉(zhuǎn)換為毫秒時(shí)間戳
-- 存儲(chǔ)流水到Hash結(jié)構(gòu):用戶ID → Token → 流水詳情
redis.call('hset', KEYS[2], ARGV[2],
cjson.encode({
action = '扣減庫(kù)存',
product = product_id,
from = stock, -- 扣減前庫(kù)存
to = remaining_stock, -- 扣減后庫(kù)存
change = ARGV[1], -- 扣減數(shù)量
token = ARGV[2],
timestamp = currentTimeMillis
})
)
return remaining_stock -- 返回扣減后庫(kù)存Java 調(diào)用 Lua
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
// 加載Lua腳本
private DefaultRedisScript<Long> stockScript;
@PostConstruct
public void init() {
stockScript = new DefaultRedisScript<>();
stockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("seckill.lua")));
stockScript.setResultType(Long.class);
}
/**
* 執(zhí)行Redis庫(kù)存預(yù)扣減
* @param productId 商品ID
* @param uid 用戶ID
* @param quantity 購(gòu)買數(shù)量
* @param token 防重Token
* @return 扣減后庫(kù)存(-1表示失敗)
*/
public Long preDeductStock(String productId, String uid, Integer quantity, String token) {
try {
// 執(zhí)行Lua腳本:KEYS = [商品ID, 用戶ID],ARGV = [數(shù)量, Token]
return redisTemplate.execute(
stockScript,
Arrays.asList(productId, uid),
quantity.toString(),
token
);
} catch (Exception e) {
log.error("Redis預(yù)扣減失敗", e);
return -1L;
}
}
}MySQL 庫(kù)存扣減
扣減流程圖
開(kāi)始
│
├─ 發(fā)送半消息到RocketMQ
│
├─ 執(zhí)行本地事務(wù)
│ │
│ ├─ 檢查Redis流水是否存在
│ │ ├─ 存在 → 提交消息(COMMIT)
│ │ └─ 不存在 → 回滾消息(ROLLBACK)
│ │
│ └─ 未知狀態(tài) → 等待回查
│
├─ RocketMQ回查機(jī)制
│ ├─ 有流水 → 提交消息
│ └─ 無(wú)流水 → 回滾消息
│
├─ 消息被消費(fèi)
│ │
│ ├─ 查詢數(shù)據(jù)庫(kù)當(dāng)前版本號(hào)(樂(lè)觀鎖)
│ │
│ ├─ 執(zhí)行庫(kù)存扣減(WHERE version = 當(dāng)前版本)
│ │ ├─ 扣減成功 → 記錄數(shù)據(jù)庫(kù)流水
│ │ └─ 扣減失敗 → 拋出異常(觸發(fā)重試)
│ │
├─ 結(jié)束發(fā)送半消息
系統(tǒng)首先向RocketMQ發(fā)送一條半消息(Half Message)。此時(shí)消息處于不可消費(fèi)狀態(tài),需等待生產(chǎn)者確認(rèn)本地事務(wù)執(zhí)行結(jié)果后,才會(huì)被消費(fèi)者處理。
// 發(fā)送半消息
public void sendHalfMessage(String productId, String uid, String token, Integer quantity) {
// 構(gòu)建消息
Message message = new Message(
"seckill_topic", // 主題
"stock_deduct", // 標(biāo)簽
JSON.toJSONString(new SeckillMessage(productId, uid, token, quantity)).getBytes()
);
// 發(fā)送事務(wù)消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"seckill_producer_group", // 生產(chǎn)者組
message,
null // 本地事務(wù)參數(shù)(可傳遞上下文)
);
log.info("半消息發(fā)送結(jié)果:{}", result.getSendStatus());
}本地事務(wù)校驗(yàn)
本地事務(wù)的核心是判斷Redis預(yù)扣減是否成功:
- 若
Redis的Lua腳本執(zhí)行成功(即庫(kù)存預(yù)扣減完成且流水已記錄),則向RocketMQ返回 提交(COMMIT)指令,消息變?yōu)榭上M(fèi)狀態(tài); - 若
Redis預(yù)扣減失敗(如庫(kù)存不足或重復(fù)提交),則返回回滾(ROLLBACK)指令,消息被丟棄。 - 若
RocketMQ長(zhǎng)時(shí)間未收到本地事務(wù)結(jié)果(如生產(chǎn)者宕機(jī)),會(huì)觸發(fā)消息回查。此時(shí)系統(tǒng)通過(guò)檢查Redis中是否存在對(duì)應(yīng)交易流水,判斷是否需要提交消息:若流水存在,則提交;否則回滾。
@Component
public class SeckillTransactionListener implements TransactionListener {
@Autowired
private StringRedisTemplate redisTemplate;
// 執(zhí)行本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
// 檢查Redis中是否存在對(duì)應(yīng)流水(驗(yàn)證預(yù)扣減成功)
Boolean flag = redisTemplate.opsForHash().hasKey(
message.getUid(), // Hash key:用戶ID
message.getToken() // Hash field:Token
);
return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
return RocketMQLocalTransactionState.UNKNOWN; // 未知狀態(tài),觸發(fā)回查
}
}
// 消息回查(解決超時(shí)未確認(rèn)問(wèn)題)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
// 回查邏輯:再次檢查流水是否存在
Boolean flag = redisTemplate.opsForHash().hasKey(message.getUid(), message.getToken());
return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}消費(fèi)消息并扣減 MySQL 庫(kù)存
消費(fèi)者監(jiān)聽(tīng)消息,執(zhí)行數(shù)據(jù)庫(kù)扣減(需保證冪等性): 消費(fèi)者接收到可消費(fèi)的消息后,執(zhí)行MySQL庫(kù)存扣減操作,并同步記錄數(shù)據(jù)庫(kù)中的交易流水。為確保消費(fèi)成功,需利用MQ的重試機(jī)制:若消費(fèi)失敗(如數(shù)據(jù)庫(kù)暫時(shí)不可用),MQ會(huì)自動(dòng)重試,直至消費(fèi)成功或達(dá)到最大重試次數(shù)(此時(shí)需人工介入處理)。
@Component
@RocketMQMessageListener(
topic = "seckill_topic",
consumerGroup = "seckill_consumer_group",
messageModel = MessageModel.CLUSTERING
)
public class SeckillConsumer implements RocketMQListener<MessageExt> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void onMessage(MessageExt message) {
SeckillMessage msg = JSON.parseObject(new String(message.getBody()), SeckillMessage.class);
String productId = msg.getProductId();
int quantity = msg.getQuantity();
// 數(shù)據(jù)庫(kù)扣減(使用樂(lè)觀鎖防超賣)
String sql = "UPDATE product_stock " +
"SET stock = stock - ?, version = version + 1 " +
"WHERE product_id = ? AND stock >= ? AND version = ?";
// 1. 查詢當(dāng)前版本號(hào)
Integer version = jdbcTemplate.queryForObject(
"SELECT version FROM product_stock WHERE product_id = ?",
Integer.class,
productId
);
// 2. 執(zhí)行扣減(樂(lè)觀鎖保證原子性)
int rows = jdbcTemplate.update(sql, quantity, productId, quantity, version);
if (rows > 0) {
// 扣減成功:記錄數(shù)據(jù)庫(kù)流水
jdbcTemplate.update(
"INSERT INTO stock_flow (product_id, quantity, op_type, create_time) " +
"VALUES (?, ?, 'SECKILL', NOW())",
productId, quantity
);
// 確認(rèn)消費(fèi)成功(返回ACK)
} else {
// 扣減失敗:觸發(fā)重試(MQ默認(rèn)重試機(jī)制)
throw new RuntimeException("數(shù)據(jù)庫(kù)扣減失敗,觸發(fā)重試");
}
}
}一致性保障
為防止Redis與MySQL數(shù)據(jù)不一致(如Redis扣減成功但MySQL扣減失敗),需定期對(duì)賬:
@Scheduled(cron = "0 0 */1 * * ?") // 每小時(shí)執(zhí)行一次
public void reconcileStock() {
// 1. 掃描Redis中未同步到MySQL的流水
Set<String> uids = redisTemplate.keys("uid:*"); // 假設(shè)用戶ID前綴為uid:
for (String uid : uids) {
Map<Object, Object> tokenMap = redisTemplate.opsForHash().entries(uid);
for (Map.Entry<Object, Object> entry : tokenMap.entrySet()) {
String token = (String) entry.getKey();
String flowJson = (String) entry.getValue();
SeckillFlow flow = JSON.parseObject(flowJson, SeckillFlow.class);
// 2. 檢查MySQL是否有對(duì)應(yīng)訂單
Integer count = jdbcTemplate.queryForObject(
"SELECT COUNT(1) FROM orders WHERE product_id = ? AND uid = ? AND token = ?",
Integer.class,
flow.getProduct(), flow.getUid(), token
);
if (count == 0) {
// 3. 未找到訂單 → 人工介入或自動(dòng)回滾Redis庫(kù)存
log.warn("發(fā)現(xiàn)不一致:Redis有流水但MySQL無(wú)訂單,product={}, uid={}", flow.getProduct(), uid);
// redisTemplate.opsForValue().increment(flow.getProduct(), Integer.parseInt(flow.getChange()));
}
}
}
}系統(tǒng)可通過(guò)定時(shí)任務(wù)對(duì)比Redis流水、MySQL庫(kù)存流水與訂單表數(shù)據(jù):若Redis流水存在但訂單表無(wú)對(duì)應(yīng)記錄,說(shuō)明訂單生成失敗,需人工介入補(bǔ)單或回滾Redis庫(kù)存,避免少賣;若訂單表有記錄但MySQL庫(kù)存未扣減,則需觸發(fā)庫(kù)存補(bǔ)扣,避免多賣。
總結(jié)
Redis + MQ 方案通過(guò)預(yù)扣減 + 事務(wù)消息 + 對(duì)賬三重機(jī)制,完美解決了高并發(fā)秒殺的核心痛點(diǎn):
Redis承擔(dān)高并發(fā)讀寫(xiě),通過(guò)Lua腳本確保原子性,防止超賣;MQ事務(wù)消息保障Redis與MySQL的最終一致性,避免數(shù)據(jù)斷層;- 流水對(duì)賬作為最后一道防線,及時(shí)發(fā)現(xiàn)并修復(fù)異常。




























