Redis Stream 數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)原理真的很強(qiáng)

你好,我是碼哥,一個(gè)擁抱硬核技術(shù)和對(duì)象,面向人民幣編程的男人,設(shè)置星標(biāo)不迷路。
我在【Redis 使用 List 實(shí)現(xiàn)消息隊(duì)列的利與弊】說過使用 List 實(shí)現(xiàn)消息隊(duì)列有很多局限性。
- 沒有 ACK 機(jī)制。
- 沒有類似 Kafka 的 ConsumerGroup 消費(fèi)組概念。
- 消息堆積。
- List 是線性結(jié)構(gòu),查詢指定數(shù)據(jù)需要遍歷整個(gè)列表。
1、是什么
Stream 是 Redis 5.0 版本專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,借鑒了 Kafka 的 Consume Group 設(shè)計(jì)思路,提供了消費(fèi)組概念。
同時(shí)提供了消息的持久化和主從復(fù)制機(jī)制,客戶端可以訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,從而保證消息不丟失。
以下幾個(gè)是 Stream 類型的主要特性。
- 使用 Radix Tree 和 listpack 結(jié)構(gòu)來存儲(chǔ)消息。
- 消息 ID 序列化生成。
- 借鑒 Kafka Consume Group 的概念,多個(gè)消費(fèi)者劃分到不同的 Consume Group 中,消費(fèi)同一個(gè) Streams,同一個(gè) Consume Group 的多個(gè)消費(fèi)者可以一起并行但不重復(fù)消費(fèi),提升消費(fèi)能力。
- 支持多播(多對(duì)多),阻塞和非阻塞讀取。
- ACK 確認(rèn)機(jī)制,保證了消息至少被消費(fèi)一次。
- 可設(shè)置消息保存上限閾值,我會(huì)把歷史消息丟棄,防止內(nèi)存占用過大。
需要注意的是,Redis Stream 是一種超輕量級(jí)的 MQ,并沒有完全實(shí)現(xiàn)消息隊(duì)列的所有設(shè)計(jì)要點(diǎn),所以它的使用場(chǎng)景需要考慮業(yè)務(wù)的數(shù)據(jù)量和對(duì)性能、可靠性的需求。
適合系統(tǒng)消息量不大,容忍數(shù)據(jù)丟失,使用 Redis Stream 作為消息隊(duì)列就能享受高性能快速讀寫消息的優(yōu)勢(shì)。
2、修煉心法
每個(gè) Stream 都有一個(gè)唯一的名稱,作為 Stream 在 Redis 的 key,在首次使用 xadd 指令添加消息的時(shí)候會(huì)自動(dòng)創(chuàng)建。
可以看到 Stream 在一個(gè) Redix Tree 樹上,樹上存儲(chǔ)的是消息 ID,每個(gè)消息 ID 對(duì)應(yīng)的消息通過一個(gè)指針指向 listpack。
Stream 流就像是一個(gè)僅追加內(nèi)容的消息鏈表,把消息一個(gè)個(gè)串起來,每個(gè)消息都有一個(gè)唯一的 ID 和消息內(nèi)容,消息內(nèi)容則由多個(gè) field/value 鍵值對(duì)組成。底層使用 Radix Tree 和 listpack 數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)數(shù)據(jù)。
為了便于理解,我畫了一張圖,并對(duì) Radix Tree 的存儲(chǔ)數(shù)據(jù)做了下變形,使用列表來體現(xiàn) Stream 中消息的邏輯有序性。

這張圖涉及很多概念,但是你不要慌。我一步步拆開說,最后你再回頭看就懂了。
先帶你屢下全局思路。
- Consumer Group:消費(fèi)組,每個(gè)消費(fèi)組可以有一個(gè)或者多個(gè)消費(fèi)者,消費(fèi)者之間是競(jìng)爭關(guān)系。不同消費(fèi)組的消費(fèi)者之間無任何關(guān)系。
- *pel,全稱是 Pending Entries List,記錄了當(dāng)前被客戶端讀取但是還沒有 ack(Acknowledge character 確認(rèn)字符)的消息。如果客戶端沒有 ack,這個(gè)變量的消息 ID 會(huì)越來越多。這是一個(gè)核心數(shù)據(jù)結(jié)構(gòu),用來確保客戶端至少消費(fèi)消息一次。
Stream 結(jié)構(gòu)
Streams 結(jié)構(gòu)的源碼定義在 stream.h 源碼中的 stream 結(jié)構(gòu)體中。
typedef struct stream {
rax *rax;
uint64_t length;
streamID last_id;
streamID first_id;
streamID max_deleted_entry_id;
uint64_t entries_added;
rax *cgroups;
} stream;
typedef struct streamID {
uint64_t ms;
uint64_t seq;
} streamID;- *rax,是一個(gè) rax 的指針,指向一個(gè) Radix Tree,key 存儲(chǔ)消息 ID,value 實(shí)際上指向一個(gè) listpack 數(shù)據(jù)結(jié)構(gòu),存儲(chǔ)了多條消息,每條消息的 ID 都大于等于 這個(gè) key 的消息 ID。
- length,該 Stream 的消息條數(shù)。
- streamID結(jié)構(gòu)體,消息 ID 抽象,一共占 128 位,內(nèi)部維護(hù)了毫秒時(shí)間戳(字段 ms);一個(gè)毫秒內(nèi)的自增序號(hào)(字段 seq),用于區(qū)分同一毫秒內(nèi)插入多條消息。
- last_id,當(dāng)前 Stream 最后一條消息的 ID。
- first_id,當(dāng)前 Stream 第一條消息的 ID。
- max_deleted_entry_id,當(dāng)前 Stream 被刪除的最大的消息 ID。
- entries_added,總共有多少條消息添加到 Stream 中,entries_added = 已刪除消息條數(shù) + 未刪除消息條數(shù)。
- *cgroups,rax 指針,也指向一個(gè) Radix Tree ,記錄當(dāng)前 Stream 的所有 Consume Group,每個(gè) Consume Group 的名稱都是唯一標(biāo)識(shí),作為 Radix Tree 的 key,Consumer Group 實(shí)例作為 value。
Consumer Group
Consumer Group 由 streamCG 結(jié)構(gòu)體定義,每個(gè) Stream 可以有多個(gè) Consumer Group,一個(gè)消費(fèi)組可以有多個(gè)消費(fèi)者同時(shí)對(duì)組內(nèi)消息進(jìn)行消費(fèi)。
/* Consumer group. */
typedef struct streamCG {
streamID last_id;
long long entries_read;
rax *pel;
rax *consumers;
} streamCG;- last_id,表示該消費(fèi)組的消費(fèi)者已經(jīng)讀取但還未 ACK 的最后一條消息 ID。
- *pel,是 pending entries list 簡寫,指向一個(gè) Radix Tree 的指針,保存著 Consumer group 中所有消費(fèi)者讀取但還未 ACK 確認(rèn)的消息,就是這玩意實(shí)現(xiàn)了 ACK 機(jī)制。該樹的 key 是消息 ID,value 關(guān)聯(lián)一個(gè) streamNACK 實(shí)例。
- *consumers, Radix Tree 指針,表示消費(fèi)組中的所有消費(fèi)者,key 是消費(fèi)者名稱,value 指向一個(gè) streamConsumer 實(shí)例。
streamNACK
streamCG -> *pel 對(duì)應(yīng)的 value 是一個(gè) streamNACK 實(shí)例,用于抽象消費(fèi)者已經(jīng)讀取,但是未 ACK 的消息 ID 相關(guān)信息。
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
mstime_t delivery_time;
uint64_t delivery_count;
streamConsumer *consumer;
} streamNACK;- delivery_time,該消息最后一次推送給 Consumer 的時(shí)間戳。
- delivery_count,消息被推送次數(shù)。
- *consumer,消息推送的 Consumer 客戶端。
streamConsumer
Consumer Group 中對(duì) Consumer 的抽象。
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time;
sds name;
rax *pel;
} streamConsumer;- seen_time,消費(fèi)者最近一次被激活的時(shí)間戳。
- name,消費(fèi)者名稱。
- *pel, Radix Tree 指針,對(duì)于同一個(gè)消息而言,``streamCG -> pel與streamConsumer -> pel的streamNACK` 實(shí)例是同一個(gè)。
最后來一張圖,便于你理解。

肖材積:“Redis 你好,Stream 如何結(jié)合 Radix Tree 和 listpack 結(jié)構(gòu)來存儲(chǔ)消息?為什么不使用散列表來存儲(chǔ),消息 ID 作為散列表的 key,散列表的 value 存儲(chǔ)消息鍵值對(duì)內(nèi)容。’”
在回答之前,先插入幾條消息到 Stream,讓你對(duì) Stream 消息的存儲(chǔ)格式有個(gè)大體認(rèn)知。
該命令的語法如下。
XADD key id field value [field value ...]Stream 中的每個(gè)消息可以包含不同數(shù)量的多個(gè)鍵值對(duì),寫入消息成功后,我會(huì)把消息的 ID 返回給客戶端。
執(zhí)行如下指令把用戶購買書籍的下單消息存放到 hotlist:books隊(duì)列,消息內(nèi)容主要由 payerID、amount 和 orderID。
> XADD hotlist:books * payerID 1 amount 69.00 orderID 9
1679218539571-0
> XADD hotlist:books * payerID 1 amount 36.00 orderID 15
1679218572182-0
> XADD hotlist:books * payerID 2 amount 99.00 orderID 88
1679218588426-0
> XADD hotlist:books * payerID 3 amount 68.00 orderID 80
1679218604492-0hotlist:books 是 Stream 的名稱,后面的 “*” 表示讓 Redis 為插入的消息自動(dòng)生成一個(gè)唯一 ID,你也可以自定義。
消息 ID 由兩部分組成。
- 當(dāng)前毫秒內(nèi)的時(shí)間戳。
- 順序編號(hào)。從 0 為起始值,用于區(qū)分同一時(shí)間內(nèi)產(chǎn)生的多個(gè)命令。
肖材積:“如何理解 Stream 是一種只執(zhí)行追加操作(append only)的數(shù)據(jù)結(jié)構(gòu)?”
通過將元素 ID 與時(shí)間進(jìn)行關(guān)聯(lián),并強(qiáng)制要求新元素的 ID 必須大于舊元素的 ID, Redis 從邏輯上將 Stream 變成了一種只執(zhí)行追加操作(append only)的數(shù)據(jù)結(jié)構(gòu)。
用戶可以確信,新的消息和事件只會(huì)出現(xiàn)在已有消息和事件之后,就像現(xiàn)實(shí)世界里新事件總是發(fā)生在已有事件之后一樣,一切都是有序進(jìn)行的。
?
肖材積:“插入的消息 ID 大部分相同,比如這四條消息的 ID 都是 1679218 前綴。另外,每條消息鍵值對(duì)的鍵通常都是一樣的,比如這四條消息的鍵都是 payerID、amount 和 orderID。使用散列表存儲(chǔ)的話會(huì)很多冗余數(shù)據(jù),你這么摳門,所以不使用散列表對(duì)不對(duì)?”
沒毛病,小老弟很聰明。為了節(jié)省內(nèi)存,我使用了 Radix Tree 和 listpack。Radix Tree 的 key 存儲(chǔ)消息 ID,value 使用 listpack 數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)多個(gè)消息, listapck 中的消息 ID 都大于等于 key 存儲(chǔ)的消息 ID。
我在前面已經(jīng)講過 listpack,這是一個(gè)緊湊型列表,非常節(jié)省內(nèi)存。而 Radix Tree 數(shù)據(jù)結(jié)構(gòu)的最大特點(diǎn)是適合保存具有相同前綴的數(shù)據(jù),從而達(dá)到節(jié)省內(nèi)存。
到底 Radix Tree 是怎樣的數(shù)據(jù)結(jié)構(gòu),繼續(xù)往下看。
Radix Tree
Radix Tree,也被稱為 Radix Trie,或者 Compact Prefix Tree),用于高效地存儲(chǔ)和查找字符串集合。它將字符串按照前綴拆分成一個(gè)個(gè)字符,并將每個(gè)字符作為一個(gè)節(jié)點(diǎn)存儲(chǔ)在樹中。
當(dāng)插入一個(gè)鍵值對(duì)時(shí),Redis 會(huì)將鍵按照字符拆分成一個(gè)個(gè)字符,并根據(jù)字符在 Radix tree 中的位置找到合適的節(jié)點(diǎn),如果該節(jié)點(diǎn)不存在,則創(chuàng)建新節(jié)點(diǎn)并添加到 Radix tree 中。
當(dāng)所有字符都添加完畢后,將值對(duì)象指針保存到最后一個(gè)節(jié)點(diǎn)中。當(dāng)查詢一個(gè)鍵時(shí),Redis 按照字符順序遍歷 Radix tree,如果發(fā)現(xiàn)某個(gè)字符不存在于樹中,則鍵不存在;否則,如果最后一個(gè)節(jié)點(diǎn)表示一個(gè)完整的鍵,則返回對(duì)應(yīng)的值對(duì)象。
如下圖展示一個(gè)簡單的前綴樹,將根節(jié)點(diǎn)到葉子節(jié)點(diǎn)的路徑對(duì)應(yīng)字符拼接起來,就得到了兩個(gè) key(“他說碉堡了”、“他說碉炸了”)。

你應(yīng)該發(fā)現(xiàn)了,這兩個(gè) key 擁有公共前綴(他說碉),前綴樹實(shí)現(xiàn)了共享使用,這樣就可以避免相同字符串重復(fù)存儲(chǔ)。如果采用散列表的保存方式,那個(gè) key 的相同前綴就會(huì)被多次存儲(chǔ),導(dǎo)致內(nèi)存浪費(fèi)。
Radix Tree 改進(jìn)
每個(gè)節(jié)點(diǎn)只保存一個(gè)字符,一是會(huì)浪費(fèi)內(nèi)存空間,二是在進(jìn)行查詢時(shí),還需要逐一匹配每個(gè)節(jié)點(diǎn)表示的字符,對(duì)查詢性能也會(huì)造成影響。
所以,Redis 并沒有直接使用標(biāo)準(zhǔn)前綴樹,而是做了一次變種——Compact Prefix Tree(壓縮前綴樹)。通俗來說,當(dāng)多個(gè) key 具有相同的前綴時(shí),那就將相同前綴的字符串合并在一個(gè)共享節(jié)點(diǎn)中,從而減少存儲(chǔ)空間。
如下幾個(gè) key(test、toaster、toasting、slow、slowly)在 Radix Tree 上的布局。

由于 Compact Prefix Tree 可以共享相同前綴的節(jié)點(diǎn),所以在存儲(chǔ)一組具有相同前綴的鍵時(shí),Redis 的 Radix tree 比其他數(shù)據(jù)結(jié)構(gòu)(如哈希表)具有更低的空間消耗和更快的查詢速度。
Radix Tree 節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)由 rax.h文件中的 raxNode 定義。
typedef struct raxNode {
uint32_t iskey:1;
uint32_t isnull:1;
uint32_t iscompr:1;
uint32_t size:29;
unsigned char data[];
} raxNode;- iskey:從 Radix Tree 根節(jié)點(diǎn)到當(dāng)前節(jié)點(diǎn)組成的字符串是否是一個(gè)完整的 key。是的話 iskey 的值為 1。
- isnull:當(dāng)前節(jié)點(diǎn)是否為空節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)是空節(jié)點(diǎn)的話,就不需要為該節(jié)點(diǎn)分配指向 value 的指針內(nèi)存。
- iscompr,是否為壓縮節(jié)點(diǎn)。
- size,當(dāng)前節(jié)點(diǎn)的大小,具體指會(huì)根據(jù)節(jié)點(diǎn)類型而改變。如果是壓縮節(jié)點(diǎn),該值表示壓縮數(shù)據(jù)的長度;如果是非壓縮節(jié)點(diǎn),該值表示節(jié)點(diǎn)的子節(jié)點(diǎn)個(gè)數(shù)。
- data[],實(shí)際存儲(chǔ)的數(shù)據(jù),根據(jù)節(jié)點(diǎn)類型不同而有所不同。
- 壓縮節(jié)點(diǎn),data 數(shù)據(jù)包括子節(jié)點(diǎn)對(duì)應(yīng)的字符、指向子節(jié)點(diǎn)的指針,節(jié)點(diǎn)為最終 key 對(duì)應(yīng)的 value 指針。
- 壓縮節(jié)點(diǎn),data 數(shù)據(jù)包含子節(jié)點(diǎn)對(duì)應(yīng)的合并字符串、指向子節(jié)點(diǎn)的指針,以及節(jié)點(diǎn)為最終 key 的 value 指針。
- value 指針指向一個(gè) listpack 實(shí)例,里面保存了消息實(shí)際內(nèi)容。
Radix Tree 最大的特點(diǎn)就是適合保存具有相同前綴的數(shù)據(jù),實(shí)現(xiàn)節(jié)省內(nèi)存的目標(biāo),以及支持范圍查找。而這個(gè)就是 Stream 采用 Radix Tree 作為底層數(shù)據(jù)結(jié)構(gòu)的原因。




























