Spring Boot + Redis Streams :構(gòu)建高效消息系統(tǒng)
前言
在現(xiàn)代微服務(wù)架構(gòu)中,可靠的消息處理系統(tǒng)是保證系統(tǒng)高可用性和擴(kuò)展性的關(guān)鍵。Redis Streams作為Redis 5.0引入的強(qiáng)大功能,提供了一種日志數(shù)據(jù)結(jié)構(gòu),能夠高效地處理消息隊(duì)列和流數(shù)據(jù)。
簡介
圖片
Redis Streams是一種日志數(shù)據(jù)結(jié)構(gòu),類似于Apache Kafka中的分區(qū)日志,提供了持久化、可回溯、消息分組等特性。它支持生產(chǎn)者消費(fèi)者模型,允許生產(chǎn)者將消息追加到流的末尾,消費(fèi)者從流中讀取消息進(jìn)行處理。
Redis Streams的主要特性包括:
- 消息持久化:消息存儲(chǔ)在
Redis內(nèi)存中,并可通過持久化策略(如RDB、AOF)保證數(shù)據(jù)不丟失。 - 消息分組:支持將消費(fèi)者劃分為不同的分組,每個(gè)分組可以獨(dú)立消費(fèi)消息,實(shí)現(xiàn)消息的并行處理。
- 消息確認(rèn)機(jī)制:消費(fèi)者處理完消息后,可以向流發(fā)送確認(rèn)消息,確保消息不會(huì)被重復(fù)處理。
- 消息回溯:可以從任意位置讀取消息,支持歷史消息的查詢和重放。
效果圖
圖片
消息生產(chǎn)與消費(fèi)實(shí)踐
創(chuàng)建消息實(shí)體類
@Data
public class Message implements Serializable {
private String id;
private String content;
}生產(chǎn)者服務(wù)
@Service
public class MessageProducer {
private static final String STREAM_KEY = "message-stream";
private final RedisTemplate<String, Object> redisTemplate;
public MessageProducer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public RecordId sendMessage(Message message) {
StreamOperations<String, Object, Object> streamOps = redisTemplate.opsForStream();
Map<String, Object> messageMap = new HashMap<>();
messageMap.put("id", message.getId());
messageMap.put("content", message.getContent());
return streamOps.add(MapRecord.create(STREAM_KEY, messageMap));
}
}配置消費(fèi)者(組)
@Slf4j
@Service
public class MessageConsumer implements StreamListener<String, MapRecord<String, String, String>> {
private static final String STREAM_KEY = "message-stream";
private static final String GROUP_NAME = "message-group";
private static final String CONSUMER_NAME = "consumer-1";
private final RedisTemplate<String, Object> redisTemplate;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@PostConstruct
public void init() {
String script = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
" return 1 " +
"else " +
" return 0 " +
"end";
RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(streamKey));
if (result != null && result == 1) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
log.info("消費(fèi)者組 {} 創(chuàng)建成功", groupName);
} else {
log.info("消費(fèi)者組 {} 已存在", groupName);
}
// 配置消息監(jiān)聽容器
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.pollTimeout(Duration.ofMillis(100))
.build();
container = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(), options);
/**
* ReadOffset.latest():指定組的起始位置為 “當(dāng)前最新消息”
* ReadOffset.lastConsumed():從消費(fèi)者組的最后確認(rèn)位置開始讀取。如果是新組,默認(rèn)從$(最新位置)開始,確保消息至少被消費(fèi)一次(At Least Once)
* ReadOffset.from(String id):從指定的消息 ID 開始讀取
* id="0-0":從流的起始位置(第一條消息)開始讀取所有歷史消息
* id="$":等價(jià)于ReadOffset.latest(),從尾部開始讀取新消息
* id="具體消息ID":從指定 ID 的下一條消息開始讀取
**/
container.receive(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),this);
container.start();
// 驗(yàn)證容器是否啟動(dòng)成功
if (container.isRunning()) {
log.info("消息監(jiān)聽容器已啟動(dòng)");
} else {
log.warn("消息監(jiān)聽容器啟動(dòng)失敗");
}
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
try {
Map<String, String> value = message.getValue();
String id = value.get("id");
String content = value.get("content");
// 處理消息
System.out.println("收到消息: ID=" + id + ", 內(nèi)容=" + content);
// 業(yè)務(wù)處理邏輯...
// 確認(rèn)消息處理完成
redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
} catch (Exception e) {
// 處理異常,可以記錄日志或?qū)崿F(xiàn)重試邏輯
System.err.println("消息處理失敗: " + e.getMessage());
}
}
@PreDestroy
public void destroy() {
if (container != null) {
container.stop();
}
}
public void consumeMessages() {
StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();
List<MapRecord<String, String, String>> messages = streamOps.read(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamReadOptions.empty().count(10).block(Duration.ofSeconds(10)),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
);
messages.forEach(message -> {
Map<String, String> value = message.getValue();
String id = value.get("id");
String content = value.get("content");
System.out.println("Received message: id=" + id + ", cnotallow=" + content);
// 確認(rèn)消息已處理
streamOps.acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
});
}
}檢查流和消費(fèi)者組狀態(tài)
# 查看流信息
XLEN message-stream
# 查看消費(fèi)者組信息
XINFO GROUPS message-stream
# 查看組消費(fèi)情況
XINFO CONSUMERS message-stream message-group注意:
- 在創(chuàng)建
Redis Streams消費(fèi)者組時(shí),不能使用ReadOffset.lastConsumed(),當(dāng)你創(chuàng)建一個(gè)新的消費(fèi)者組時(shí),Redis要求你明確指定組的初始讀取位置(即從哪個(gè)消息ID開始消費(fèi))組的狀態(tài)尚未初始化:新組沒有任何消費(fèi)記錄,
lastConsumed()無法確定起始位置
Redis API設(shè)計(jì):創(chuàng)建組的命令(XGROUP CREATE)必須包含一個(gè)固定的偏移量參數(shù)(如0-0或$)
Redis Streams 和 Redis Pub-Sub 之間的主要區(qū)別
特性 | Redis Streams | Redis Pub-Sub |
消息持久性 | 支持 | 不支持 |
投遞保證 | 即使消費(fèi)者離線也能投遞 | 無(未被消費(fèi)的消息會(huì)丟失) |
重放能力 | 支持(可通過 ID 讀取歷史消息) | 不支持(僅支持實(shí)時(shí)消息) |
消息有序性 | 有保證(基于消息 ID) | 無保證 |
消費(fèi)者協(xié)調(diào) | 支持(通過消費(fèi)者組實(shí)現(xiàn)) | 不支持 |
多消費(fèi)者支持 | 支持(通過消費(fèi)者組實(shí)現(xiàn)并發(fā)消費(fèi)) | 支持(消息廣播至所有訂閱者) |

































