精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

品 RocketMQ 源碼,學習并發編程三大神器

開發 開發工具
異步是更細粒度的使用系統資源的一種方式,在異步消息處理的過程中,通過 CompletableFuture 這個神器,各個線程各司其職,優雅且高效的提升了 RocketMQ 的性能。

筆者是 RocketMQ 的忠實粉絲,在閱讀源碼的過程中,學習到了很多編程技巧。

這篇文章,筆者結合 RocketMQ 源碼,分享并發編程三大神器的相關知識點。

圖片

1 CountDownLatch 實現網絡同步請求

CountDownLatch 是一個同步工具類,用來協調多個線程之間的同步,它能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續執行。

下圖是 CountDownLatch 的核心方法:

圖片

我們可以認為它內置一個計數器,構造函數初始化計數值。每當線程執行 countDown 方法,計數器的值就會減一,當計數器的值為 0 時,表示所有的任務都執行完成,然后在 CountDownLatch 上等待的線程就可以恢復執行接下來的任務。

舉例,數據庫有100萬條數據需要處理,單線程執行比較慢,我們可以將任務分為5個批次,線程池按照每個批次執行,當5個批次整體執行完成后,打印出任務執行的時間 。

 long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(10);
int batchSize = 5;
CountDownLatch countDownLatch = new CountDownLatch(batchSize);
for (int i = 0; i < batchSize; i++) {
final int batchNumber = i;
executorService.execute(new Runnable() {
@Override
public void run(){
try {
doSomething(batchNumber);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
System.out.println("任務執行耗時:" + (System.currentTimeMillis() - start) + "毫秒");

溫習完 CountDownLatch 的知識點,回到 RocketMQ 源碼。

筆者在沒有接觸網絡編程之前,一直很疑惑,網絡同步請求是如何實現的?

同步請求指:客戶端線程發起調用后,需要在指定的超時時間內,等到響應結果,才能完成本次調用。如果超時時間內沒有得到結果,那么會拋出超時異常。

RocketMQ 的同步發送消息接口見下圖:

圖片

追蹤源碼,真正發送請求的方法是通訊模塊的同步請求方法 invokeSyncImpl 。

圖片

整體流程:

發送消息線程 Netty channel 對象調用 writeAndFlush 方法后 ,它的本質是通過 Netty 的讀寫線程將數據包發送到內核 , 這個過程本身就是異步的;

ResponseFuture 類中內置一個 CountDownLatch 對象 ,responseFuture 對象調用 waitRepsone 方法,發送消息線程會阻塞 ;

圖片

客戶端收到響應命令后, 執行 processResponseCommand 方法,核心邏輯是執行 ResponseFuture 的 putResponse 方法。

圖片

該方法的本質就是填充響應對象,并調用 countDownLatch 的 countDown 方法 , 這樣發送消息線程就不再阻塞。

CountDownLatch 實現網絡同步請求是非常實用的技巧,在很多開源中間件里,比如 Metaq ,Xmemcached 都有類似的實現。

2 ReadWriteLock 名字服務路由管理

讀寫鎖是一把鎖分為兩部分:讀鎖和寫鎖,其中讀鎖允許多個線程同時獲得,而寫鎖則是互斥鎖。

它的規則是:讀讀不互斥,讀寫互斥,寫寫互斥,適用于讀多寫少的業務場景。

我們一般都使用 ReentrantReadWriteLock ,該類實現了 ReadWriteLock 。ReadWriteLock 接口也很簡單,其內部主要提供了兩個方法,分別返回讀鎖和寫鎖 。

 public interface ReadWriteLock {
//獲取讀鎖
Lock readLock();
//獲取寫鎖
Lock writeLock();
}

讀寫鎖的使用方式如下所示:

創建 ReentrantReadWriteLock 對象 , 當使用 ReadWriteLock 的時候,并不是直接使用,而是獲得其內部的讀鎖和寫鎖,然后分別調用 lock / unlock 方法 ;

private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

讀取共享數據 ;

Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
// TODO 查詢共享數據
} finally {
readLock.unlock();
}

寫入共享數據;

Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
// TODO 修改共享數據
} finally {
writeLock.unlock();
}

RocketMQ架構上主要分為四部分,如下圖所示 :

圖片

Producer :消息發布的角色,Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。

Consumer :消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。

BrokerServer :Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證。

NameServer :名字服務是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的zookeeper,支持Broker的動態注冊與發現。

NameServer 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。Broker 啟動之后會向所有 NameServer 定期(每 30s)發送心跳包(路由信息),NameServer 會定期掃描 Broker 存活列表,如果超過 120s 沒有心跳則移除此 Broker 相關信息,代表下線。

那么 NameServer 如何保存路由信息呢?

圖片

路由信息通過幾個 HashMap 來保存,當 Broker 向 Nameserver 發送心跳包(路由信息),Nameserver 需要對 HashMap 進行數據更新,但我們都知道 HashMap 并不是線程安全的,高并發場景下,容易出現 CPU 100% 問題,所以更新 HashMap 時需要加鎖,RocketMQ 使用了  JDK 的讀寫鎖 ReentrantReadWriteLock 。

更新路由信息,操作寫鎖

圖片

查詢主題信息,操作讀鎖

圖片

讀寫鎖適用于讀多寫少的場景,比如名字服務,配置服務等。

3 CompletableFuture 異步消息處理

RocketMQ 主從架構中,主節點與從節點之間數據同步/復制的方式有同步雙寫和異步復制兩種模式。

異步復制是指消息在主節點落盤成功后就告訴客戶端消息發送成功,無需等待消息從主節點復制到從節點,消息的復制由其他線程完成。

同步雙寫是指主節點將消息成功落盤后,需要等待從節點復制成功,再告訴客戶端消息發送成功。

同步雙寫模式是阻塞的,筆者按照 RocketMQ 4.6.1 源碼,整理出主節點處理一個發送消息的請求的時序圖。

圖片

整體流程:

生產者將消息發送到 Broker , Broker 接收到消息后,發送消息處理器 SendMessageProcessor 的執行線程池SendMessageExecutor 線程池來處理發送消息命令;

執行 ComitLog 的 putMessage 方法;

ComitLog 內部先執行 appendMessage 方法;

然后提交一個 GroupCommitRequest 到同步復制服務 HAService  ,等待 HAService 通知 GroupCommitRequest 完成;

返回寫入結果并響應客戶端 。

我們可以看到:發送消息的執行線程需要等待消息復制從節點 , 并將消息返回給生產者才能開始處理下一個消息。

RocketMQ 4.6.1 源碼中,執行線程池的線程數量是 1 ,假如線程處理主從同步速度慢了,系統在這一瞬間無法處理新的發送消息請求,造成 CPU 資源無法被充分利用 , 同時系統的吞吐量也會降低。

那么優化同步雙寫呢 ?

從 RocketMQ 4.7 開始,RocketMQ 引入了 CompletableFuture 實現了異步消息處理 。

發送消息的執行線程不再等待消息復制到從節點后再處理新的請求,而是提前生成 CompletableFuture 并返回 ;

HAService 中的線程在復制成功后,調用 CompletableFuture 的 complete 方法,通知 remoting 模塊響應客戶端(線程池:PutMessageExecutor ) 。

我們分析下 RocketMQ 4.9.4 核心代碼:

Broker 接收到消息后,發送消息處理器 SendMessageProcessor 的執行線程池SendMessageExecutor 線程池來處理發送消息命令;

調用 SendMessageProcessor 的 asyncProcessRequest 方法;

圖片

調用 Commitlog 的 aysncPutMessage 方法寫入消息 ;

圖片

這段代碼中,當 commitLog 執行完 appendMessage 后, 需要執行刷盤任務和同步復制兩個任務。但這兩個任務并不是同步執行,而是異步的方式。

復制線程復制消息后,喚醒 future ;

圖片

組裝響應命令 ,并將響應命令返回給客戶端。

為了便于理解這一段消息發送處理過程的線程模型,筆者在 RocketMQ 源碼中做了幾處埋點,修改 Logback 的日志配置,發送一條普通的消息,觀察服務端日志。

圖片

從日志中,我們可以觀察到:

發送消息的執行線程(圖中紅色)在執行完創建刷盤 Future 和同步復制 future 之后,并沒有等待這兩個任務執行完成,而是在結束 asyncProcessRequest 方法后就可以處理發送消息請求了 ;

刷盤線程和復制線程執行完各自的任務后,喚醒 future,然后通過刷盤線程組裝存儲結果,最后通過 PutMessageExecutor 線程池(圖中黃色)將響應命令返回給客戶端。

筆者一直認為:異步是更細粒度的使用系統資源的一種方式,在異步消息處理的過程中,通過 CompletableFuture  這個神器,各個線程各司其職,優雅且高效的提升了 RocketMQ 的性能。

責任編輯:武曉燕 來源: 勇哥java實戰分享
相關推薦

2020-09-29 07:38:22

Python裝飾器框架

2022-07-02 08:40:00

并發編程

2016-09-13 19:21:07

CTO管理技術

2018-06-08 10:18:22

Python裝飾器迭代器

2023-12-06 07:36:27

前端開發

2022-10-17 08:07:13

Go 語言并發編程

2021-03-18 00:14:29

JavaCyclicBarri高并發

2021-03-04 07:24:24

JavaSemaphore高并發

2021-03-11 00:05:55

Java高并發編程

2021-01-31 20:51:55

PuppeteerNode核心

2021-09-13 09:28:10

PuppeteerNode 庫DevTools 協議

2021-05-27 12:10:42

前端puppeteer代碼

2023-03-30 19:17:54

語言編程

2024-04-29 09:06:46

線程初始化源碼

2023-05-05 07:12:09

GPT產品主題

2017-01-05 14:01:38

linux密碼高強度

2025-08-15 11:33:09

2019-07-17 10:55:40

Kubernetes工具Katacoda

2015-07-03 10:46:26

PHP程序員工作高效

2024-11-22 08:00:00

Netty開發
點贊
收藏

51CTO技術棧公眾號

91高清在线观看视频| 久久精品国产亚洲av香蕉| 欧美日韩国产网站| 中文字幕制服丝袜成人av| 亚洲精品日韩av| 国产午夜视频在线播放| 国产亚洲一区二区三区啪| 欧美军同video69gay| 亚洲精品蜜桃久久久久久| 日av在线播放| 黄页网站大全一区二区| 韩国三级电影久久久久久| 精品人妻一区二区三区蜜桃视频 | 国产精品久久色| 国产欧美高清在线| a黄色片在线观看| 97久久人人超碰| 成人黄色大片在线免费观看| 国产成人亚洲精品自产在线| 日韩国产一区二区| 欧美成人bangbros| 天堂av在线网站| www.综合网.com| 国产精品久久久久影院| 激情一区二区三区| 99久久精品日本一区二区免费| 一本色道88久久加勒比精品| 久久天天躁夜夜躁狠狠躁2022| 精品久久久久久中文字幕人妻最新| 国产精品久久久久久久久久久久久久久| 第一福利永久视频精品 | 神马亚洲视频| 国产精品亚洲第一 | 色婷婷综合久久久久中文| 亚洲在线观看一区| 男女视频在线观看免费| 成人性色生活片| 91在线观看网站| 一区二区日韩在线观看| 日韩成人一区二区| 欧洲日韩成人av| 欧美日韩精品区| 韩日精品在线| 欧美国产日韩视频| 麻豆亚洲av熟女国产一区二| 亚洲国产一区二区三区在线播放 | 蜜桃传媒一区二区亚洲| 日韩高清影视在线观看| 日韩成人在线播放| 91视频在线免费| 国产精品三p一区二区| 日韩三级中文字幕| 黄色片子免费看| 99re视频这里只有精品| 亚洲片av在线| 亚洲黄色在线网站| 欧美一级全黄| 亚洲国产毛片完整版| 五月天丁香社区| 成人看片黄a免费看视频| 日韩精品一区二| 91亚洲一线产区二线产区| 视频二区欧美毛片免费观看| 欧美一卡在线观看| 超碰人人cao| 凹凸av导航大全精品| 亚洲国产成人精品电影| 国产毛片毛片毛片毛片毛片毛片| 国产精品久av福利在线观看| 亚洲第一区中文99精品| 少妇精品一区二区| 久久99影视| 色久欧美在线视频观看| 粉嫩av性色av蜜臀av网站| 欧美日韩亚洲国产精品| 97国产在线视频| 久久青青草原亚洲av无码麻豆| 久久这里只有| 成人黄色片在线| 亚洲欧美另类视频| 91亚洲精品久久久蜜桃| 婷婷五月色综合| av网站导航在线观看免费| 亚洲综合视频网| 哪个网站能看毛片| 日韩久久一区| 精品国产乱码久久久久久久久| 91精品小视频| 成人在线免费视频观看| 久久97精品久久久久久久不卡 | 亚洲影视在线播放| 国产综合av在线| 国产美女久久| 精品国产伦一区二区三区观看方式 | 日韩欧美激情视频| 日产欧产美韩系列久久99| 91精品视频专区| 天堂成人在线| 国产精品国产三级国产专播品爱网| www婷婷av久久久影片| a一区二区三区| 91精品国产综合久久久久久久 | 国产午夜精品福利| 国产女主播av| 成人日韩精品| 亚洲第一综合天堂另类专| xxxxx99| 亚洲啪啪91| 国产精品视频在线观看| 隣の若妻さん波多野结衣| 久久精品视频在线免费观看| 九九久久九九久久| 婷婷午夜社区一区| 精品国产乱码久久久久久1区2区| 嘿嘿视频在线观看| 99精品视频免费观看| 国产色综合天天综合网| 日本亚洲一区| 亚洲丰满少妇videoshd| 五月花丁香婷婷| 久久99国产成人小视频| 国内精品在线一区| 国产绳艺sm调教室论坛| 亚洲国产精品精华液ab| 成人一对一视频| 精品久久久久久久久久岛国gif| 亚洲人免费视频| 日韩精品――中文字幕| 国产精品一二三在| 五月天色婷婷综合| 欧美aaa级| 伊人成人开心激情综合网| 国产在线观看黄色| caoporm超碰国产精品| 国产精品无码电影在线观看 | 国产精品二区三区| 黄色免费在线观看| 欧美日韩精品一区二区在线播放| 成人午夜剧场视频网站| 亚洲另类黄色| 999国产在线| 91三级在线| 欧美一区二区三区思思人| 91 在线视频| 精品在线亚洲视频| 中文字幕一区二区三区最新| 久久99久久久精品欧美| 中文字幕亚洲精品| 91av久久久| 亚洲视频一二区| 亚洲黄色av片| 欧美在线影院| 成人欧美一区二区三区视频xxx| 成人在线观看免费网站| 欧美一级夜夜爽| 欧美精品久久久久性色| 国产福利电影一区二区三区| www.国产亚洲| 成人线上播放| 91福利视频在线观看| 日韩a在线看| 欧美性生活影院| 永久免费观看片现看| 久久99久久99| 国产激情片在线观看| 高清精品视频| 日本电影亚洲天堂| av在线电影观看| 欧美高清www午色夜在线视频| 91精品少妇一区二区三区蜜桃臀| 韩国一区二区三区| 妺妺窝人体色777777| 亚洲欧洲美洲国产香蕉| 国产精品久久久久久超碰| 中文日本在线观看| 日韩午夜激情视频| 亚洲精品午夜国产va久久成人| 久久久精品人体av艺术| 亚洲精品20p| 极品尤物久久久av免费看| 久久精品一二三区| 日韩欧美激情| 97色在线视频观看| h视频在线播放| 日韩欧美在线影院| 9i精品福利一区二区三区| 国产精品久久久久三级| 无码人妻丰满熟妇啪啪网站| 羞羞视频在线观看欧美| 致1999电视剧免费观看策驰影院| 大香伊人久久精品一区二区| 国产成人精品免高潮费视频| 成人午夜在线影视| 精品一区二区亚洲| 国产视频一区二区三| 欧美性jizz18性欧美| 久草视频手机在线| 久久综合九色欧美综合狠狠 | 99久久精品国产一区| 男女啪啪网站视频| 国内在线观看一区二区三区| 日韩.欧美.亚洲| 国产精品一区二区精品| 91精品国产精品| 黄色视屏免费在线观看| 亚洲精品一区二区三区不| 国产女人18毛片水真多| 欧美日韩亚洲天堂| 久久黄色免费视频| 国产精品免费aⅴ片在线观看| 午夜福利三级理论电影| 奇米综合一区二区三区精品视频| 久久久性生活视频| 精品国产不卡| 精品视频一区二区三区四区| 欧美男女视频| 国产精品99一区| ririsao久久精品一区| 久久中文字幕一区| 在线看的av网站| 国产午夜精品理论片a级探花| 亚洲第一天堂在线观看| 在线播放日韩导航| 成年人视频免费| 欧美日韩在线看| 国产精品1000| 一区二区成人在线| 全网免费在线播放视频入口| 国产精品免费丝袜| 中文字幕免费视频| 久久久综合视频| 内射中出日韩无国产剧情| 风间由美性色一区二区三区| 麻豆网站免费观看| 国产麻豆精品在线观看| 99九九99九九九99九他书对| 麻豆91在线看| gogogo高清免费观看在线视频| 日韩高清不卡在线| 国产成人精品无码播放| 久久久久综合| 国产av无码专区亚洲精品| 一区二区亚洲| 免费一级特黄毛片| 夜夜夜久久久| 国产熟女高潮视频| 日韩成人伦理电影在线观看| 激情网站五月天| 日韩 欧美一区二区三区| 在线免费视频一区| 久久91精品久久久久久秒播| 中文字幕视频三区| 国产美女一区二区三区| 中文字幕第22页| 国产福利一区在线| 国产黄色片在线| 日韩亚洲精品视频| 国产婷婷一区二区三区久久| 欧美日韩久久久久久| 国产一区二区在线视频观看| 51精品视频一区二区三区| 国产精品无码AV| 日韩欧美一级二级三级久久久 | 欧美午夜精品一区二区三区| 免费黄色一级大片| 欧美人与性动xxxx| 性猛交xxxx乱大交孕妇印度| 精品sm捆绑视频| 日本一本草久在线中文| 中文字幕av一区二区| 韩国中文字幕在线| 欧美激情精品久久久久久大尺度| а√天堂8资源中文在线| 欧美一级片免费在线| 电影一区电影二区| 91夜夜揉人人捏人人添红杏| a看欧美黄色女同性恋| 欧美日韩亚洲一区二区三区在线观看| 欧美综合另类| av在线免费观看国产| 99热免费精品在线观看| 日本美女高潮视频| 国产成人午夜电影网| 30一40一50老女人毛片| 亚洲欧洲日本在线| 日本一区二区欧美| 欧美日韩亚洲另类| 国产18精品乱码免费看| 亚洲午夜av久久乱码| 特级毛片在线| 国产精品99蜜臀久久不卡二区| 亚洲乱码一区| 日韩一二三区不卡在线视频| 欧美成人一品| 国产成人精品无码播放| 国产成人av福利| 日本乱子伦xxxx| 亚洲成人久久影院| 中文字幕av免费观看| 亚洲高清在线观看| 黄色av免费在线| 国产成人在线一区二区| 日韩精品中文字幕吗一区二区| 欧美在线视频二区| 激情欧美亚洲| www.com久久久| 久久久久国产精品人| 久久久久久久久久久网 | 国产999精品久久久| 秋霞影院一区| 亚洲一区二区三区欧美| 亚洲欧美卡通另类91av| 波多野结衣三级视频| 国产精品家庭影院| 99精品人妻国产毛片| 精品国产凹凸成av人网站| 日本中文字幕在线播放| 欧美一级视频在线观看| 日韩在线精品强乱中文字幕| 亚洲国产高清国产精品| 午夜一区不卡| 久久久午夜精品福利内容| 亚洲精品国产一区二区精华液 | 日韩va亚洲va欧洲va国产| av中文字幕在线播放| 国产精品美女在线| 国产不卡av一区二区| 久久精品免费一区二区| 成人av电影在线网| 国产奶水涨喷在线播放| 日韩欧美国产一区二区在线播放| 黄在线免费观看| 成人欧美在线观看| 天天综合网网欲色| 亚欧激情乱码久久久久久久久| 国产亚洲综合在线| 无码人妻一区二区三区线| 日韩精品在线视频观看| 欧美裸体视频| 久久大片网站| 国产日韩综合| 熟女少妇一区二区三区| 日韩欧美成人网| 国产中文字幕在线观看| 国产成人精品视频在线| 黄色不卡一区| 超碰在线播放91| 国产精品少妇自拍| 一区二区三区亚洲视频| 久久天堂电影网| 2020国产精品极品色在线观看| 久久亚洲a v| 成人av电影在线播放| 国产手机在线视频| 国产偷亚洲偷欧美偷精品| gogo亚洲高清大胆美女人体| 日本一区二区精品| 久久精品国内一区二区三区| frxxee中国xxx麻豆hd| 欧美一卡二卡三卡| 俺来俺也去www色在线观看| 国产中文一区二区| 乱人伦精品视频在线观看| 欧美激情 一区| 欧美裸体一区二区三区| 青青青国内视频在线观看软件| 国产精品国产三级欧美二区| 在线视频精品| 三级黄色片在线观看| 日韩一区二区精品葵司在线| 51精品在线| 偷拍视频一区二区| 国产精品1区二区.| 91视频免费网址| 中文字幕久久久| 视频在线一区| 妺妺窝人体色www在线小说| 国产偷v国产偷v亚洲高清| 国产精品乱码一区二区| 97免费视频在线| 日韩精品久久| 日本泡妞xxxx免费视频软件| 欧美视频中文字幕在线| 欧美一区二区三区在线观看免费| 91精品国产高清久久久久久91裸体 | 国产日韩综合av| 国产成人精品无码高潮| 91av在线免费观看| 希岛爱理一区二区三区| 国产在线观看无码免费视频| 欧美日韩亚洲不卡| av资源中文在线| 亚洲一二三区在线| eeuss影院一区二区三区| 在线观看亚洲国产| 97成人超碰免| 亚洲精品一区二区在线看| 日韩网站在线播放| 日韩色视频在线观看| 国产精品亚洲d|