精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

4 張圖,9 個維度告訴你怎么做能確保 RocketMQ 不丟失消息

開發 架構
引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

大家好,我是君哥。

引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

1 RocketMQ 簡介

RocketMQ 是阿里巴巴開源的分布式消息中間件,整體架構如下圖:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同時 Name Server 進行集群注冊管理和保存元數據。

2 消息不丟失

要想保證消息不丟失,需要從以下幾個方面考慮:

  • Producer 發送消息
  • Broker 保存消息
  • Consumer 消費消息
  • Broker 主從切換

維度 1:同步發送,代碼如下:

public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}

同步發送會返回 4 個狀態碼:

  • SEND_OK:消息發送成功。需要注意的是,消息發送到 broker 后,還有兩個操作:消息刷盤和消息同步到 slave 節點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功。
  • FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時。
  • FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時。
  • SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用。

根據返回的狀態碼,可以做消息重試,這里設置的重試次數是 3。

消息重試時,消費端一定要做好冪等處理。

維度 2:異步發送,代碼如下:

public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {

}

@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}

異步發送,可以重寫回調函數,回調函數捕獲到 Exception 時表示發送失敗,這時可以進行重試,這里設置的重試次數是 3。

維度 3:刷盤策略

  • 異步刷盤:默認。消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。
  • 同步刷盤:消息寫入內存后,立刻請求刷盤線程進行刷盤,如果消息未在約定的時間內(默認 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:
flushDiskType=SYNC_FLUSH

維度 4:Broker 多副本和高可用

Broker 為了保證高可用,采用一主多從的方式部署。如下圖:

消息發送到 master 節點后,slave 節點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節點復制消息就直接給 Producer 返回成功。

這樣會有一個問題,如果 slave 節點還沒有完成消息復制,這時 master 宕機了,進行主備切換后就會有消息丟失。為了避免這個問題,可以采用 slave 節點同步復制消息,即等 slave 節點復制消息成功后再給 Producer 返回發送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改為同步復制后,消息復制流程如下:

  • slave 初始化后,跟 master 建立連接并向 master 發送自己的 offset;
  • master 收到 slave 發送的 offset 后,將 offset 后面的消息批量發送給 slave;
  • slave 把收到的消息寫入 commitLog 文件,并給 master 發送新的 offset;
  • master 收到新的 offset 后,如果 offset >= producer 發送消息后的 offset,給 Producer 返回 SEND_OK。

維度 5:消息確認

Consumer 消費消息的代碼如下:

public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}

如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。

維度 6:Consumer 重試

Consumer 消費失敗,這里有 3 種情況:

  • 返回 RECONSUME_LATER
  • 返回 null
  • 拋出異常

Broker 收到這個響應后,會把這條消息放入重試隊列,重新發送給 Consumer。

注意:

  • Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊列,Consumer 可以訂閱死信隊列進行消費。
  • 重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。
  • Consumer 端一定要做好冪等處理。

其實重試 3 次都失敗就可以說明代碼有問題,這時 Consumer 可以把消息存入本地,給 Broker 返回CONSUME_SUCCESS 來結束重試。代碼如下:

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息寫入本地存儲
System.out.println("重試次數超過3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

維度7:事務消息

RocketMQ支持事務消息,整體流程如下圖:

  • Producer 發送 half 消息;
  • Broker 先把消息寫入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的隊列,之后給 Producer 返回成功;
  • Producer 執行本地事務,成功后給 Broker 發送 commit 命令(本地事務執行失敗則發送 rollback);
  • Broker 收到 commit 請求后把消息狀態更改為成功并把消息推到真正的 topic;
  • Consumer 拉取消息進行消費。

代碼如下:

public class ProducerTransactionListenerImpl implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 這里執行本地事務,執行成功返回LocalTransactionState.COMMIT_MESSAGE,執行失敗返回
* LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
* Broker會回來查詢,所以需要記錄事務執行狀態
*/
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
/**
* 這里查詢事務執行狀態,根據事務狀態返回LocalTransactionState.COMMIT_MESSAGE或
* LocalTransactionState.ROLLBACK_MESSAGE,如果沒有查詢到返回LocalTransactionState.UNKNOW,
* Broker會再次查詢,可以記錄查詢次數,超過次數后返回ROLLBACK_MESSAGE
*/
return LocalTransactionState.UNKNOW;
}
}

維度 8:消息索引

我們知道,RocketMQ 核心的數據文件有 3 個:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一個索引文件,結構如下圖:

查找消息時,首先根據消息 key 的 hashcode 計算出 Hash 槽的位置,然后讀取 Hash 槽的值計算 Index 條目的位置,從Index 條目位置讀取到消息在 CommitLog 文件中的 offset,從而查找到消息。

在 Producer 發送消息時,可以指定一個 key,代碼如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

這樣可以通過 RocketMQ 提供的命令或者管理控制臺來查詢消息是否發送成功。

維度 9:極端情況

如果對消息丟失零容忍,我們必須要考慮極端情況,比如整個 RocketMQ 集群掛了,這時 Producer 端發送消息一定會失敗,可以考慮在 Producer 端做降級,把要發送的消息保存到本地數據庫或磁盤,等 RocketMQ 恢復以后再把本地消息推送出去。

3 總結

在一些特殊的業務場景,比如支付、銀行核算等,需要確保消息不丟失,但是同時也要看到,消息不丟失的方案會大大降低 RocketMQ 的吞吐量,需要綜合考慮。


責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2022-09-26 10:43:13

RocketMQ保存消息

2022-08-01 10:43:11

RocketMQZookeeper注冊中心

2022-08-15 10:45:34

RocketMQ消息隊列

2024-08-06 09:55:25

2020-10-09 06:55:23

監控告警日志

2022-09-26 11:32:14

用戶分層服務業務

2021-03-18 12:16:44

用戶分層業務

2022-03-31 08:26:44

RocketMQ消息排查

2022-07-11 11:06:11

RocketMQ函數.消費端

2022-06-13 11:05:35

RocketMQ消費者線程

2022-12-16 17:15:33

MQRabbitMQ

2022-12-19 17:44:25

MQ技術RabbitMQ

2022-04-25 15:01:07

系統程序員調度

2022-07-04 11:06:02

RocketMQ事務消息實現

2022-06-27 11:04:24

RocketMQ順序消息

2022-09-16 15:42:00

數據Kafka

2020-04-06 14:53:05

MySQL數據庫字符串

2021-04-13 15:51:46

服務治理流量

2021-04-13 18:16:07

多線程安全代碼

2012-07-20 17:24:51

HTML5
點贊
收藏

51CTO技術棧公眾號

二区三区在线| av图片在线观看| 日韩免费成人| 亚洲一区二区精品久久av| 国产精品一区二区三区不卡| 日韩不卡在线播放| 日韩中文欧美| 亚洲精品一线二线三线无人区| 91视频最新入口| 在线观看a视频| 成人黄色av电影| 国产精品精品国产| 精品无码人妻一区二区三区| 国产伦一区二区三区| 欧美一级夜夜爽| 岳毛多又紧做起爽| 操你啦在线视频| 91女厕偷拍女厕偷拍高清| 国产精品999| 久久久久无码国产精品| av亚洲免费| 精品999在线播放| 91制片厂毛片| 欧亚av在线| 一区二区三区蜜桃网| 日本一区二区三区四区在线观看 | 欧美一区久久| 亚洲欧洲黄色网| 亚洲天堂小视频| 成人国产在线| 色香色香欲天天天影视综合网| 中国一级大黄大黄大色毛片| 精品av中文字幕在线毛片| 国产不卡免费视频| 91精品国产综合久久久久久久久| 精品欧美一区二区三区免费观看 | 久久视频免费在线播放| 亚洲第一成人网站| 国产成人av毛片| 欧美一级视频精品观看| 精品免费国产一区二区| av最新在线| 亚洲综合自拍偷拍| 中文字幕の友人北条麻妃| h网站在线免费观看| 91在线播放网址| 国产美女精品在线观看| 国产乱子伦精品无码码专区| 免费成人av在线播放| 日本精品中文字幕| 天堂在线免费观看视频| 99国产精品自拍| 久久久久久久香蕉网| 妺妺窝人体色www在线下载| 亚洲精品99| 久久视频这里只有精品| 国产精品白丝喷水在线观看| 国产韩日影视精品| 日韩视频在线观看免费| 貂蝉被到爽流白浆在线观看 | 国产一区二区不卡在线| 国产九九精品视频| 一区二区三区黄色片| 免费在线观看成人| 国产人妖伪娘一区91| 一级黄色大毛片| 韩国欧美一区二区| 亚洲一区精品电影| 国产99999| 成人丝袜视频网| 国内精品国语自产拍在线观看| 欧美 日韩 中文字幕| av一二三不卡影片| 欧美一区二区福利| 91精品专区| 亚洲精品一二三区| 成人性生活视频免费看| 无遮挡在线观看| 欧洲中文字幕精品| 亚洲综合激情视频| 亚洲精品午夜| 日韩国产在线看| 一级二级黄色片| 欧美黄色一区| 欧美亚洲另类在线| 在线观看免费视频a| 国产精品一区二区果冻传媒| 好吊色欧美一区二区三区视频| 日韩av免费观影| 欧美国产日韩a欧美在线观看| 亚洲一区二区三区四区中文| av网址在线播放| 黑人巨大精品欧美一区二区免费| 在线观看高清免费视频| 久久久久久久久久久久电影| 亚洲精品国产精品国自产在线| 变态另类ts人妖一区二区| 亚洲91精品| 91高清免费视频| 91尤物国产福利在线观看| 高清国产午夜精品久久久久久| 蜜桃av噜噜一区二区三区| 幼a在线观看| 午夜日韩在线观看| 五月天中文字幕在线| theporn国产在线精品| 亚洲欧美综合另类中字| 青青草手机在线观看| 日韩国产精品91| **亚洲第一综合导航网站| 日本韩国精品一区二区| 亚洲精品国产成人久久av盗摄 | 国产精品丝袜一区二区| 欧美亚洲免费| 99国产视频| 超碰国产在线观看| 黄网站色欧美视频| 色男人天堂av| 精品国产一区二区三区久久久樱花| 欧美成人精品在线观看| 午夜精品免费观看| 成人一道本在线| 国产又大又长又粗又黄| 卡通欧美亚洲| 亚洲成人av资源网| 亚洲怡红院在线观看| 久久精品亚洲一区二区| 国产精品久久九九| 麻豆视频在线播放| 欧美午夜电影在线播放| 噜噜噜在线视频| 亚洲成人最新网站| 国产一区私人高清影院| 美女欧美视频在线观看免费| 五月婷婷另类国产| 国产综合内射日韩久| 亚洲精品一二三区区别| 国产日韩欧美影视| 95在线视频| 欧美亚洲图片小说| 日本一区二区视频在线播放| 老鸭窝毛片一区二区三区| 国模一区二区三区私拍视频| 在线看女人毛片| 日韩亚洲欧美在线观看| 99久久久免费精品| 久久99热这里只有精品| 亚洲欧洲日夜超级视频| 日韩不卡视频在线观看| 国产亚洲欧洲高清一区| 波多野结衣视频观看| 26uuu色噜噜精品一区二区| 亚洲精品久久久久久久蜜桃臀| 日韩欧美一级| 欧美黑人xxx| 免费看黄色一级视频| 亚洲国产精品嫩草影院| 亚洲熟女一区二区| 国产一区二区三区成人欧美日韩在线观看 | 91视频青青草| 国内精品伊人久久久久av影院 | 91小视频xxxx网站在线| 欧美一区2区视频在线观看| 国产探花在线播放| 成人97人人超碰人人99| 尤物av无码色av无码| 五月国产精品| 国产精品久久电影观看| 欧美一区二区三区在线观看免费| 欧美精选在线播放| 青草草在线视频| 成人精品国产免费网站| 女人喷潮完整视频| 精品久久电影| 91精品久久久久久久久久入口| 动漫一区在线| 精品国产制服丝袜高跟| 国产精品视频一区在线观看| 亚洲国产精品激情在线观看| 超碰成人在线播放| 国产精品大片免费观看| 欧美精品免费观看二区| 久久精品国产福利| 欧美国产日韩在线| 韩国福利在线| 欧美一区永久视频免费观看| 国产无精乱码一区二区三区| 久久久午夜精品| 午夜av中文字幕| 日韩视频久久| 亚洲午夜精品久久| 亚洲五码在线| 国产高清在线不卡| 亚洲综合图区| 亚洲午夜精品久久久久久久久久久久| 97国产精品久久久| 欧美日韩黄色大片| 天天色天天综合| 97精品视频在线观看自产线路二| 五月天婷婷激情视频| 国产一区日韩一区| 视频一区视频二区视频| 国产精品一区二区中文字幕| 国产精品亚发布| 丰满诱人av在线播放| 在线播放亚洲激情| 免费a视频在线观看| 在线欧美小视频| 国产精品白浆一区二小说| 亚洲国产电影在线观看| 国产污在线观看| 久久99国产精品成人| 国产免费黄视频| 欧美96在线丨欧| 亚洲精品乱码久久久久久蜜桃91 | 国产suv精品一区二区68| 99国产精品99久久久久久| 超碰在线免费av| 日韩va欧美va亚洲va久久| 国产freexxxx性播放麻豆| 热久久天天拍国产| 精品欧美国产一区二区三区不卡| 9999在线精品视频| 国产精品久久久久久久久免费看 | 中文字幕55页| 久久精品噜噜噜成人av农村| 国产xxxxx在线观看| 亚洲日本成人| 国产一级做a爰片久久毛片男| 97久久视频| 亚洲视频电影| 水蜜桃久久夜色精品一区| 欧美综合77777色婷婷| 国产精品无码乱伦| 日韩影视在线观看| 国产伦一区二区三区色一情| 亚洲精品18| 7777精品久久久大香线蕉小说| 国产原创一区| 国产精品一区二区三区在线播放| 欧美成人h版| 欧美中文在线视频| 在线天堂新版最新版在线8| 97国产精品免费视频| 草草影院在线| 欧美激情中文字幕乱码免费| av在线网址观看| 欧美精品性视频| 青春草视频在线| 欧美日韩成人黄色| 国产桃色电影在线播放| 欧美极品少妇xxxxx| 欧美xxxxhdvideosex| 久久久久久尹人网香蕉| 91探花在线观看| 欧美一级片久久久久久久| 中文字幕在线视频久| 国产成人精品av在线| 电影亚洲一区| 国产日韩中文字幕| 国产精品色婷婷在线观看| 91av一区二区三区| 精品网站aaa| 日本精品一区二区三区不卡无字幕| 国产欧美日韩免费观看| 亚洲狠狠婷婷综合久久久| 99久久精品费精品国产| 三上悠亚免费在线观看| 国产一区日韩欧美| 无码aⅴ精品一区二区三区浪潮 | www.cao超碰| 国产高清在线精品| 亚洲国产精品无码久久久久高潮| 91小视频在线观看| 99国产精品免费| 一区二区三区四区不卡在线 | 91视频免费在观看| 亚洲欧美中日韩| 日韩激情一区二区三区| 色综合久久久久综合99| 国产精品一区二区黑人巨大| 精品sm在线观看| 成人午夜影视| 欧美日韩成人在线视频| 青青青免费在线视频| 国产精品嫩草影院久久久| 日韩一级淫片| 激情丁香综合五月| 亚洲三级视频网站| 国产一区二区三区在线观看免费视频 | 日韩精品中文字幕在线一区| 香港三日本三级少妇66| 色琪琪综合男人的天堂aⅴ视频| 亚洲丝袜精品| 国产精品第10页| 成人在线视频中文字幕| 日韩中文一区| 亚洲精品少妇| 超碰在线资源站| 久久久久久亚洲综合影院红桃| 91高清免费观看| 欧美日韩视频免费播放| 国产美女免费视频| 亚洲欧洲日产国产网站| 丝袜美女在线观看| 国产乱肥老妇国产一区二| a看欧美黄色女同性恋| 亚洲一区二区三区乱码| 亚洲综合激情| 中文写幕一区二区三区免费观成熟| 91视频观看视频| 久草免费在线观看视频| 欧美影院精品一区| 日韩av免费观影| 久久欧美在线电影| 精品一区二区三区中文字幕视频| 欧美精品一区二区三区在线看午夜| 综合激情婷婷| 亚洲精品久久久久久宅男| 久久久99久久| 久久99精品波多结衣一区| 日韩欧美自拍偷拍| 免费a级毛片在线播放| 91麻豆精品国产自产在线观看一区| 又大又硬又爽免费视频| 亚洲免费网址| 视频免费在线观看| 一区二区三区在线播| 这里只有精品6| 国产一区二区三区视频 | 日韩成人影院| 国产xxxxx视频| 久久一区二区三区国产精品| 日韩污视频在线观看| 精品国产乱码久久久久久浪潮 | 2018国产精品视频| 51亚洲精品| 天堂а√在线中文在线| 国产中文字幕一区| 黄视频网站免费看| 欧美精品乱码久久久久久按摩| 97视频精彩视频在线观看| 国产成人精品免高潮在线观看 | av激情成人网| 色噜噜一区二区| 老司机午夜精品视频在线观看| 无码人妻aⅴ一区二区三区| 香蕉加勒比综合久久| 五月婷婷六月丁香| 欧美亚洲一区在线| 久操国产精品| 午夜免费精品视频| 欧美国产日韩精品免费观看| 日本欧美www| 中文字幕在线国产精品| 久久久久久久性潮| 黑人巨大国产9丨视频| 国产乱人伦精品一区二区在线观看 | 亚洲天堂中文字幕在线| www.精品av.com| 久久国产精品美女| 妞干网在线观看视频| 久久综合成人精品亚洲另类欧美 | 久久综合网络一区二区| 天天躁夜夜躁狠狠是什么心态| 欧美在线观看一区二区| 日韩免费啪啪| 国产日韩欧美在线视频观看| 久久精品亚洲人成影院| 国产xxx在线观看| 色综合久久综合网欧美综合网| 第一福利在线| 114国产精品久久免费观看| 精品动漫3d一区二区三区免费| 激情综合激情五月| 日韩欧美第一页| 婷婷在线视频观看| eeuss一区二区三区| 国产毛片一区| 女人黄色一级片| 日韩欧美亚洲国产精品字幕久久久| av在线小说| 亚洲高清乱码| 国产a区久久久| www.久久网| 欧美福利小视频| 国产综合久久久| 男生和女生一起差差差视频| 欧美日韩中文字幕| 午夜免费视频在线国产| 国产成人免费观看| 日韩高清中文字幕一区| 麻豆91精品91久久久| 亚洲男人天堂2019| 精品视频91| 青青在线免费观看视频| 一区二区三区色| 能在线看的av| 岛国一区二区三区高清视频| 日本欧美大码aⅴ在线播放|