消息隊(duì)列 CMQ 七大功能實(shí)踐案例
背景
消息隊(duì)列,在業(yè)務(wù)解耦、削峰填谷、流量控制、廣播消息等場(chǎng)景下都有很好的應(yīng)用,已經(jīng)成為很多企業(yè)IT系統(tǒng)內(nèi)部通信重要手段。
現(xiàn)有常用的開源消息中間件有RabbitMQ、Kafka、RocketMQ等,但各自有著不同的應(yīng)用場(chǎng)景和特點(diǎn),例如,Kafka注重的是消息的吞吐量,不保證消息存儲(chǔ)的可靠性以及一致性,因此多用于日志系統(tǒng)數(shù)據(jù)的上報(bào);RabbitMQ能保證消息可靠存儲(chǔ)投遞,但性能較差。
CMQ(Cloud Message Queue)是騰訊云開發(fā)的一款高可靠、高可用、高性能的分布式消息隊(duì)列服務(wù),具有低耦合、消息可靠、強(qiáng)一致性、可擴(kuò)展性等特點(diǎn),支持Push/Pull消費(fèi)模型、消息回溯、延時(shí)消息、發(fā)布訂閱、路由廣播、消息加密等一系列功能,以滿足更多的mq應(yīng)用場(chǎng)景。
相對(duì)Kafka,CMQ更多注重消息高可靠的應(yīng)用場(chǎng)景,例如金融、交易、訂單等業(yè)務(wù);相比RabbitMQ,CMQ在可用性和性能上做了很大的優(yōu)化和提升。更詳細(xì)的對(duì)比,請(qǐng)參考官網(wǎng)介紹。
本文先簡(jiǎn)單介紹CMQ底層的架構(gòu)實(shí)現(xiàn),然后著重結(jié)合CMQ的功能特點(diǎn)來介紹CMQ的實(shí)踐案例,讓大家快速理解和上手CMQ的開發(fā)。
底層架構(gòu)
CMQ整體架構(gòu)如上圖所示,每個(gè)set由三個(gè)broker節(jié)點(diǎn)副本組成,保證消息的可靠存儲(chǔ)以及高可用性,且基于raft算法保證數(shù)據(jù)的一致性。CMQ單個(gè)set 在CAP理論中優(yōu)先保證了CP,當(dāng)SET中過半數(shù)節(jié)點(diǎn)都正常工作時(shí),才能進(jìn)行消息的生產(chǎn)消費(fèi)。
實(shí)踐案例
一、廣播拉取消息模型
CMQ支持隊(duì)列(queue)和主題(topic)兩種模型,如下所示:
其中,queue模型是一對(duì)一的消息拉取(pull)模式,client端主動(dòng)pull消息;而topic模型,也稱發(fā)布/訂閱模型,是一對(duì)多的消息推送(push)模式,CMQ服務(wù)端廣播消息時(shí),根據(jù)各個(gè)訂閱地址主動(dòng)推送消息給client。兩種模型基本能滿足大部分應(yīng)用場(chǎng)景了,對(duì)比如下:
- queue模型,client端可以靈活根據(jù)自身能力去消費(fèi)pull消息,消息實(shí)時(shí)性依賴client的消費(fèi)速度,如果消費(fèi)速度比生產(chǎn)速度慢,會(huì)引起大量消息堆積。
- topic模型,服務(wù)端主動(dòng)推送消息,消息實(shí)時(shí)性比較高,但要求client性能上能及時(shí)處理大量推送過來的消息,并且在client發(fā)生故障的時(shí)候可能會(huì)導(dǎo)致丟消息(有消息重發(fā)策略做基本保障)。
對(duì)于topic模型,有以下特殊場(chǎng)景需求:
- client端想根據(jù)自身能力去pull消息
- 創(chuàng)建訂閱的時(shí)候需要暴露client端的接收消息的地址,但在一些企業(yè)內(nèi)網(wǎng)、vpc網(wǎng)絡(luò)等特殊情況下,CMQ無法推送到,只能用pull方式獲取消息。
針對(duì)以上特殊場(chǎng)景,CMQ結(jié)合queue和topic兩種模型實(shí)現(xiàn)了一對(duì)多的廣播拉取消息模型,如下所示:
topic的訂閱者可以是一個(gè)queue實(shí)例,topic發(fā)布消息后,會(huì)自動(dòng)將消息推送到queue,然后client和使用queue模型一樣去消費(fèi)消息即可。
- # python sdk demo code: create subscription of queue protocal
- my_sub = my_account.get_subscription(topic_name, subscription_name)
- subscription_meta = SubscriptionMeta()
- subscription_meta.Endpoint = "queue1"
- subscription_meta.Protocal = "queue"
- my_sub.create(subscription_meta)
二、Pull長(zhǎng)輪詢
對(duì)于Queue模型,消費(fèi)者需要pull獲取消息,但問題是:消費(fèi)者不知道隊(duì)列什么時(shí)候有消息,只能不停輪詢請(qǐng)求去pull,如果輪詢間隔時(shí)間短,在隊(duì)列長(zhǎng)時(shí)間沒有消息時(shí)會(huì)耗費(fèi)消費(fèi)者請(qǐng)求資源且效率低,如果輪詢間隔時(shí)間長(zhǎng),則消費(fèi)速度慢,消息實(shí)時(shí)性低,且造成消息大量堆積。
針對(duì)以上問題,CMQ解決方案是設(shè)計(jì)了長(zhǎng)輪詢功能。例如,假設(shè)設(shè)置隊(duì)列長(zhǎng)輪詢時(shí)間為10s
- 當(dāng)消費(fèi)者pull消息時(shí),如果隊(duì)列中有消息則馬上返回
- 如果隊(duì)列暫時(shí)沒有消息,消費(fèi)者pull請(qǐng)求不會(huì)馬上返回,而是會(huì)等待阻塞10s:當(dāng)10s內(nèi)有新的生產(chǎn)消息到達(dá)隊(duì)列,CMQ會(huì)馬上將消息投遞給正在阻塞等待的消費(fèi)者,消費(fèi)者端感知就是阻塞的pull請(qǐng)求被喚醒并且收到消息返回;當(dāng)10s內(nèi)隊(duì)列都沒有消息,則請(qǐng)求返回告訴消費(fèi)者當(dāng)前隊(duì)列沒有消息。
- # python sdk demo code: receive message through long polling
- pollingWaitSeconds = 3
- recv_msg = my_queue.receive_message(pollingWaitSeconds)
三、延時(shí)消息
CMQ提供延時(shí)消息功能:消息發(fā)送到隊(duì)列后,從入隊(duì)時(shí)間算起,消息在設(shè)置的延時(shí)時(shí)間后才對(duì)消費(fèi)者可見,即才能被消費(fèi)者消費(fèi)到。延時(shí)消息功能可以很輕松實(shí)現(xiàn)一些定時(shí)任務(wù)的應(yīng)用場(chǎng)景。
如上圖所示,根據(jù)CMQ延遲消息功能實(shí)現(xiàn)的定時(shí)任務(wù)檢查告警系統(tǒng)。
- # python sdk demo code: send delayed message
- msg_body = "I am delay message"
- msg = Message(msg_body)
- delaySeconds = 3
- my_queue.send_message(msg, delaySeconds)
四、消息回溯
CMQ提供類似于Kafka的消息回溯能力,已經(jīng)消費(fèi)刪除的消息是可以通過回溯來重新消費(fèi)的。目前支持指定回溯時(shí)間點(diǎn),在這個(gè)時(shí)間點(diǎn)開始被刪除的消息可以重新消費(fèi)到。此功能在一些金融業(yè)務(wù)對(duì)賬、業(yè)務(wù)系統(tǒng)重試等場(chǎng)景下有很好的實(shí)用性。
***可回溯時(shí)間點(diǎn) = 當(dāng)前時(shí)間 - 設(shè)置的可回溯時(shí)長(zhǎng)。消息生產(chǎn)時(shí)間在這個(gè)值之前的不可回溯,之后的可回溯,如下圖所示:
- # python sdk demo code: rewind the queue
- # backtrack one hour
- backTrackingTime = int(time.time()) - 3600
- my_queue.rewindQueue(backTrackingTime)
五、Topic路由匹配
CMQ topic模型提供類似于RabbitMQ的消息路由匹配功能,在消息廣播基礎(chǔ)上實(shí)現(xiàn)了消息的自動(dòng)分發(fā)。
訂閱者可以指定bindingKey,即路由規(guī)則,如上所示,*(星號(hào))可以匹配一個(gè)單詞,#(井號(hào))可以匹配一個(gè)或多個(gè)單詞。例如,生產(chǎn)者發(fā)布一個(gè)消息,且消息的路由鍵(routingKey)是”quick.orange.elephant”,那么該消息只會(huì)推送給消費(fèi)者C1;如果routingKey=”quick.orange.rabbit”,則消息會(huì)推送給C1和C2;如果routingKey=”lazy.brown.fox”,則消息只會(huì)推送給C2。
- # python sdk demo code: set topic-subscription route-rule
- my_sub = my_account.get_subscription(topic_name, subscription_name)
- subscription_meta = SubscriptionMeta()
- subscription_meta.Endpoint = "http://test.com"
- subscription_meta.Protocal = "http"
- subscription_meta.bindingKey = ['*.*.rabbit','lazy.#']
- my_sub.create(subscription_meta)
- message = Message()
- message.msgBody = "route msg test"
- my_topic.publish_message(message, 'quick.orange.rabbit')
六、超大消息傳輸
目前CMQ的隊(duì)列消息大小***限制為1MB,而當(dāng)消息大小不超過64KB時(shí),收發(fā)消息的***QPS限制分別為正常的5k(有特殊需求可調(diào)整),當(dāng)消息大小超過64KB而小于1MB時(shí),CMQ不保證收發(fā)消息的QPS性能。因此,支持大于64KB的消息只是為了考慮業(yè)務(wù)偶爾傳輸少量大消息且不想做消息分片的應(yīng)用場(chǎng)景。
一般來說,64KB的消息限制大小基本能滿足大部分業(yè)務(wù)場(chǎng)景需求了,但在某些特殊場(chǎng)景下,消息數(shù)據(jù)大于64KB甚至大于1MB時(shí),業(yè)務(wù)和CMQ如何支持這種超大消息的傳輸呢?這里有兩種解決方案:
1.消息分片。類似IP數(shù)據(jù)包分片傳輸原理,生產(chǎn)者對(duì)消息分片標(biāo)記后分別發(fā)送到隊(duì)列,消費(fèi)者從隊(duì)列取出所有分片消息進(jìn)行組裝。個(gè)人方案如下:
- 每個(gè)消息body分為header和data兩部分。其中,data就是原消息分片后的內(nèi)容,header包含三個(gè)標(biāo)記:業(yè)務(wù)指定消息的ID號(hào),唯一記錄一個(gè)消息的ID值,具有同一個(gè)ID號(hào)的消息分片才會(huì)在消費(fèi)端重新組裝;分片序號(hào)(從1開始),記錄一個(gè)消息分片的次序編號(hào),消費(fèi)端依據(jù)分片序號(hào)依次組裝消息;下一分片是否存在的標(biāo)記,如果是,說明消息包還不完整,否則消息組裝完畢。
- 由于可能存在多個(gè)消費(fèi)者client,不同分片可能被不同client接收到,為了能夠組裝分片,需要一個(gè)集中式的地方存儲(chǔ)所有分片并最終組裝成完整的消息包,但無疑大大增加了系統(tǒng)設(shè)計(jì)的復(fù)雜度。
2.COS代理存儲(chǔ)(COS是騰訊云的對(duì)象存儲(chǔ)服務(wù))。類似編程中的指針原理,方案如下(具體代碼實(shí)現(xiàn)參考附件):
- 生產(chǎn)者先把超大消息的數(shù)據(jù)以文件形式上傳到COS,并返回消息文件的COS URL地址;
- 生產(chǎn)者將URL地址作為消息發(fā)送到CMQ隊(duì)列中;
- 消費(fèi)者從CMQ隊(duì)列中讀取消息,判斷消息內(nèi)容是否是COS的URL地址信息,如果是,則根據(jù)URL地址從COS下載相應(yīng)的消息文件,并從文件中讀取出超大消息的數(shù)據(jù)。
七、消息加密傳輸
騰訊云提供秘鑰管理服務(wù)KMS,能對(duì)數(shù)據(jù)進(jìn)行安全加密。CMQ消息加密功能有以下兩種方案:
1.CMQ SDK客戶端加密方案。客戶端發(fā)送消息時(shí),根據(jù)設(shè)置的CMK(KMS的秘鑰ID)調(diào)用KMS生成數(shù)據(jù)秘鑰接口,會(huì)返回?cái)?shù)據(jù)秘鑰的明文key以及加密后的密文key,使用明文key對(duì)消息進(jìn)行本地加密,然后將加密的數(shù)據(jù)和密文key作為消息 發(fā)送給CMQ;消費(fèi)者接收消息時(shí),先獲取消息中的密文key,調(diào)用KMS接口解密(不必每次均調(diào)用,可做緩存)得到對(duì)應(yīng)的明文key,***根據(jù)明文key本地解密密文數(shù)據(jù)即可。具體代碼實(shí)現(xiàn)參考附件。
2.CMQ服務(wù)端加密方案。該方案,由CMQ服務(wù)端和KMS服務(wù)打通,CMQ自動(dòng)對(duì)消息加解密,用戶無感知,例如,用戶通過https接口發(fā)送消息,由CMQ自動(dòng)加密后存儲(chǔ),通過https接口接收消息時(shí),CMQ對(duì)消息自動(dòng)解密后返回給用戶。此功能正在開發(fā)中。
結(jié)語
CMQ更多功能正在開發(fā)中,例如,死信隊(duì)列、FIFO順序消息等,歡迎體驗(yàn):)
原文鏈接:https://cloud.tencent.com/community/article/211497,作者:莊秋濤
【本文是51CTO專欄作者“騰訊云技術(shù)社區(qū)”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)通過51CTO聯(lián)系原作者獲取授權(quán)】
































