精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

帶你漲姿勢的認(rèn)識一下Kafka Consumer

開發(fā) 架構(gòu) Kafka
之前我們介紹過了 Kafka 整體架構(gòu),Kafka 生產(chǎn)者,Kafka 生產(chǎn)的消息最終流向哪里呢?當(dāng)然是需要消費(fèi)了,要不只產(chǎn)生一系列數(shù)據(jù)沒有任何作用啊。

之前我們介紹過了 Kafka 整體架構(gòu),Kafka 生產(chǎn)者,Kafka 生產(chǎn)的消息最終流向哪里呢?當(dāng)然是需要消費(fèi)了,要不只產(chǎn)生一系列數(shù)據(jù)沒有任何作用啊,如果把 Kafka 比作餐廳的話,那么生產(chǎn)者就是廚師的角色,消費(fèi)者就是客人,只有廚師的話,那么炒出來的菜沒有人吃也沒有意義,如果只有客人沒有廚師的話,誰會去這個店吃飯呢?!所以如果你看完前面的文章意猶未盡的話,可以繼續(xù)讓你爽一爽。如果你沒看過前面的文章,那就從現(xiàn)在開始讓你爽。

Kafka 消費(fèi)者概念

應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來自這些主題的消息,然后再把他們保存起來。應(yīng)用程序首先需要創(chuàng)建一個 KafkaConsumer 對象,訂閱主題并開始接受消息,驗(yàn)證消息并保存結(jié)果。一段時間后,生產(chǎn)者往主題寫入的速度超過了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這時候該如何處理?如果只使用單個消費(fèi)者的話,應(yīng)用程序會跟不上消息生成的速度,就像多個生產(chǎn)者像相同的主題寫入消息一樣,這時候就需要多個消費(fèi)者共同參與消費(fèi)主題中的消息,對消息進(jìn)行分流處理。

Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個群組中的消費(fèi)者訂閱的都是相同的主題,每個消費(fèi)者接收主題一部分分區(qū)的消息。下面是一個 Kafka 分區(qū)消費(fèi)示意圖

上圖中的主題 T1 有四個分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個消費(fèi)者群組1,消費(fèi)者群組中只有一個消費(fèi)者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個消費(fèi)者處理四個生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D

這樣一來,消費(fèi)者的消費(fèi)能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費(fèi)者吃不消,那就繼續(xù)增加消費(fèi)者。

如上圖所示,每個分區(qū)所產(chǎn)生的消息能夠被每個消費(fèi)者群組中的消費(fèi)者消費(fèi),如果向消費(fèi)者群組中增加更多的消費(fèi)者,那么多余的消費(fèi)者將會閑置,如下圖所示

向群組中增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。總而言之,我們可以通過增加消費(fèi)組的消費(fèi)者來進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜淼南M(fèi)者是空閑的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個消息。換句話說,每個應(yīng)用都可以讀到全量的消息。為了使得每個應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對于上面的例子,假如我們新增了一個新的消費(fèi)組 G2,而這個消費(fèi)組有兩個消費(fèi)者,那么就演變?yōu)橄聢D這樣

在這個場景中,消費(fèi)組 G1 和消費(fèi)組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。

總結(jié)起來就是如果應(yīng)用需要讀取全量消息,那么請為該應(yīng)用設(shè)置一個消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個消費(fèi)組里增加消費(fèi)者。

消費(fèi)者組和分區(qū)重平衡

消費(fèi)者組是什么

消費(fèi)者組(Consumer Group)是由一個或多個消費(fèi)者實(shí)例(Consumer Instance)組成的群組,具有可擴(kuò)展性和可容錯性的一種機(jī)制。消費(fèi)者組內(nèi)的消費(fèi)者共享一個消費(fèi)者組ID,這個ID 也叫做 Group ID,組內(nèi)的消費(fèi)者共同對一個主題進(jìn)行訂閱和消費(fèi),同一個組中的消費(fèi)者只能消費(fèi)一個分區(qū)的消息,多余的消費(fèi)者會閑置,派不上用場。

我們在上面提到了兩種消費(fèi)方式

  •  一個消費(fèi)者群組消費(fèi)一個主題中的消息,這種消費(fèi)模式又稱為點(diǎn)對點(diǎn)的消費(fèi)方式,點(diǎn)對點(diǎn)的消費(fèi)方式又被稱為消息隊(duì)列
  •  一個主題中的消息被多個消費(fèi)者群組共同消費(fèi),這種消費(fèi)模式又稱為發(fā)布-訂閱模式

消費(fèi)者重平衡

我們從上面的消費(fèi)者演變圖中可以知道這么一個過程:最初是一個消費(fèi)者訂閱一個主題并消費(fèi)其全部分區(qū)的消息,后來有一個消費(fèi)者加入群組,隨后又有更多的消費(fèi)者加入群組,而新加入的消費(fèi)者實(shí)例分?jǐn)偭俗畛跸M(fèi)者的部分消息,這種把分區(qū)的所有權(quán)通過一個消費(fèi)者轉(zhuǎn)到其他消費(fèi)者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它為消費(fèi)者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費(fèi)者或移除消費(fèi)者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費(fèi)者無法讀取消息,造成整個消費(fèi)者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個消費(fèi)者時,消息當(dāng)前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會拖慢應(yīng)用程序。

消費(fèi)者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護(hù)自己是消費(fèi)者組的一員并確認(rèn)其擁有的分區(qū)。對于不同不的消費(fèi)群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費(fèi)者定期發(fā)送心跳,就會認(rèn)為消費(fèi)者是存活的并處理其分區(qū)中的消息。當(dāng)消費(fèi)者檢索記錄或者提交它所消費(fèi)的記錄時就會發(fā)送心跳。

如果過了一段時間 Kafka 停止發(fā)送心跳了,會話(Session)就會過期,組織協(xié)調(diào)者就會認(rèn)為這個 Consumer 已經(jīng)死亡,就會觸發(fā)一次重平衡。如果消費(fèi)者宕機(jī)并且停止發(fā)送消息,組織協(xié)調(diào)者會等待幾秒鐘,確認(rèn)它死亡了才會觸發(fā)重平衡。在這段時間里,死亡的消費(fèi)者將不處理任何消息。在清理消費(fèi)者時,消費(fèi)者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會觸發(fā)一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費(fèi)者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(diǎn)(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。

重平衡的過程對消費(fèi)者組有極大的影響。因?yàn)槊看沃仄胶膺^程中都會導(dǎo)致萬物靜止,參考 JVM 中的垃圾回收機(jī)制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機(jī)》中 p76 關(guān)于 Serial 收集器的描述):

更重要的是它在進(jìn)行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結(jié)束。Stop The World 這個名字聽起來很帥,但這項(xiàng)工作實(shí)際上是由虛擬機(jī)在后臺自動發(fā)起并完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應(yīng)用來說都是難以接受的。

也就是說,在重平衡期間,消費(fèi)者組中的消費(fèi)者實(shí)例都會停止消費(fèi),等待重平衡的完成。而且重平衡這個過程很慢......

創(chuàng)建消費(fèi)者

上面的理論說的有點(diǎn)多,下面就通過代碼來講解一下消費(fèi)者是如何消費(fèi)的

在讀取消息之前,需要先創(chuàng)建一個 KafkaConsumer 對象。創(chuàng)建 KafkaConsumer 對象與創(chuàng)建 KafkaProducer 對象十分相似 --- 把需要傳遞給消費(fèi)者的屬性放在 properties 對象中,后面我們會著重討論 Kafka 的一些配置,這里我們先簡單的創(chuàng)建一下,使用3個屬性就足矣,分別是 bootstrap.server,key.deserializer,value.deserializer 。

這三個屬性我們已經(jīng)用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢是認(rèn)識一下Kafka Producer

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬于哪個消費(fèi)者群組。創(chuàng)建不屬于任何一個群組的消費(fèi)者也是可以的 

  1. Properties properties = new Properties();  
  2.         properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
  3.  KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); 

主題訂閱

創(chuàng)建好消費(fèi)者之后,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為參數(shù),使用起來比較簡單 

  1. consumer.subscribe(Collections.singletonList("customerTopic")); 

為了簡單我們只訂閱了一個主題 customerTopic,參數(shù)傳入的是一個正則表達(dá)式,正則表達(dá)式可以匹配多個主題,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么會立即觸發(fā)一次重平衡,消費(fèi)者就可以讀取新的主題。

要訂閱所有與 test 相關(guān)的主題,可以這樣做 

  1. consumer.subscribe("test.*"); 

輪詢

我們知道,Kafka 是支持訂閱/發(fā)布模式的,生產(chǎn)者發(fā)送數(shù)據(jù)給 Kafka Broker,那么消費(fèi)者是如何知道生產(chǎn)者發(fā)送了數(shù)據(jù)呢?其實(shí)生產(chǎn)者產(chǎn)生的數(shù)據(jù)消費(fèi)者是不知道的,KafkaConsumer 采用輪詢的方式定期去 Kafka Broker 中進(jìn)行數(shù)據(jù)的檢索,如果有數(shù)據(jù)就用來消費(fèi),如果沒有就再繼續(xù)輪詢等待,下面是輪詢等待的具體實(shí)現(xiàn) 

  1. try {  
  2.   while (true) {  
  3.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));  
  4.     for (ConsumerRecord<String, String> record : records) {  
  5.       int updateCount = 1 
  6.       if (map.containsKey(record.value())) {  
  7.         updateCount = (int) map.get(record.value() + 1);  
  8.       }  
  9.       map.put(record.value(), updateCount);  
  10.     }  
  11.   }  
  12. }finally {  
  13.   consumer.close();  
  •  這是一個無限循環(huán)。消費(fèi)者實(shí)際上是一個長期運(yùn)行的應(yīng)用程序,它通過輪詢的方式向 Kafka 請求數(shù)據(jù)。
  •  第三行代碼非常重要,Kafka 必須定期循環(huán)請求數(shù)據(jù),否則就會認(rèn)為該 Consumer 已經(jīng)掛了,會觸發(fā)重平衡,它的分區(qū)會移交給群組中的其它消費(fèi)者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該參數(shù)被設(shè)置為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數(shù)內(nèi)一直等待 broker 返回?cái)?shù)據(jù)。
  •  poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區(qū)的信息、記錄在分區(qū)中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
  •  在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費(fèi)者。網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉,并立即觸發(fā)一次重平衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認(rèn)定它已經(jīng)死亡。

線程安全性

在同一個群組中,我們無法讓一個線程運(yùn)行多個消費(fèi)者,也無法讓多個線程安全的共享一個消費(fèi)者。按照規(guī)則,一個消費(fèi)者使用一個線程,如果一個消費(fèi)者群組中多個消費(fèi)者都想要運(yùn)行的話,那么必須讓每個消費(fèi)者在自己的線程中運(yùn)行,可以使用 Java 中的 ExecutorService 啟動多個消費(fèi)者進(jìn)行進(jìn)行處理。

消費(fèi)者配置

到目前為止,我們學(xué)習(xí)了如何使用消費(fèi)者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費(fèi)者相關(guān)的配置說明。大部分參數(shù)都有合理的默認(rèn)值,一般不需要修改它們,下面我們就來介紹一下這些參數(shù)。

  •  fetch.min.bytes

該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載,因?yàn)樗鼈冊谥黝}使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費(fèi)者的數(shù)量比較多,把該屬性的值調(diào)大可以降低 broker 的工作負(fù)載。

  •  fetch.max.wait.ms

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時才會把它返回給消費(fèi)者。而 fetch.max.wait.ms 則用于指定 broker 的等待時間,默認(rèn)是 500 毫秒。如果沒有足夠的數(shù)據(jù)流入 kafka 的話,消費(fèi)者獲取的最小數(shù)據(jù)量要求就得不到滿足,最終導(dǎo)致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數(shù)值設(shè)置的小一些。如果 fetch.max.wait.ms 被設(shè)置為 100 毫秒的延遲,而 fetch.min.bytes 的值設(shè)置為 1MB,那么 Kafka 在收到消費(fèi)者請求后,要么返回 1MB 的數(shù)據(jù),要么在 100 ms 后返回所有可用的數(shù)據(jù)。就看哪個條件首先被滿足。

  •  max.partition.fetch.bytes

該屬性指定了服務(wù)器從每個分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。它的默認(rèn)值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個主題有20個分區(qū)和5個消費(fèi)者,那么每個消費(fèi)者需要至少4 MB的可用內(nèi)存來接收記錄。在為消費(fèi)者分配內(nèi)存時,可以給它們多分配一些,因?yàn)槿绻航M里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置大),否則消費(fèi)者可能無法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試。 在設(shè)置該屬性時,另外一個考量的因素是消費(fèi)者處理數(shù)據(jù)的時間。消費(fèi)者需要頻繁的調(diào)用 poll() 方法來避免會話過期和發(fā)生分區(qū)再平衡,如果單次調(diào)用poll() 返回的數(shù)據(jù)太多,消費(fèi)者需要更多的時間進(jìn)行處理,可能無法及時進(jìn)行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  •  session.timeout.ms

這個屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時間,默認(rèn)是 3s。如果消費(fèi)者沒有在 session.timeout.ms 指定的時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就會被認(rèn)定為死亡,協(xié)調(diào)器就會觸發(fā)重平衡。把它的分區(qū)分配給消費(fèi)者群組中的其它消費(fèi)者,此屬性與 heartbeat.interval.ms 緊密相關(guān)。heartbeat.interval.ms 指定了 poll() 方法向群組協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費(fèi)者可以多久不發(fā)送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s。把 session.timeout.ms 值設(shè)置的比默認(rèn)值小,可以更快地檢測和恢復(fù)崩憤的節(jié)點(diǎn),不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的重平衡。把該屬性的值設(shè)置得大一些,可以減少意外的重平衡,不過檢測節(jié)點(diǎn)崩潰需要更長的時間。

  •  auto.offset.reset

該屬性指定了消費(fèi)者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下的該如何處理。它的默認(rèn)值是 latest,意思指的是,在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費(fèi)者將從起始位置處開始讀取分區(qū)的記錄。

  •  enable.auto.commit

我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費(fèi)者是否自動提交偏移量,默認(rèn)值是 true,為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,由自己控制何時提交偏移量。如果把它設(shè)置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率

  •  partition.assignment.strategy

我們知道,分區(qū)會分配給群組中的消費(fèi)者。PartitionAssignor 會根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個消費(fèi)者,Kafka 有兩個默認(rèn)的分配策略Range 和 RoundRobin

  •  client.id

該屬性可以是任意字符串,broker 用他來標(biāo)識從客戶端發(fā)送過來的消息,通常被用在日志、度量指標(biāo)和配額中

  •  max.poll.records

該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢中需要處理的數(shù)據(jù)量。

  •  receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費(fèi)者在每次調(diào)用poll() 方法進(jìn)行定時輪詢的時候,會返回由生產(chǎn)者寫入 Kafka 但是還沒有被消費(fèi)者消費(fèi)的記錄,因此我們可以追蹤到哪些記錄是被群組里的哪個消費(fèi)者讀取的。消費(fèi)者可以使用 Kafka 來追蹤消息在分區(qū)中的位置(偏移量)

消費(fèi)者會向一個叫做 _consumer_offset 的特殊主題中發(fā)送消息,這個主題會保存每次所發(fā)送消息中的分區(qū)偏移量,這個主題的主要作用就是消費(fèi)者觸發(fā)重平衡后記錄偏移使用的,消費(fèi)者每次向這個主題發(fā)送消息,正常情況下不觸發(fā)重平衡,這個主題是不起作用的,當(dāng)觸發(fā)重平衡后,消費(fèi)者停止工作,每個消費(fèi)者可能會分到對應(yīng)的分區(qū),這個主題就是讓消費(fèi)者能夠繼續(xù)處理消息所設(shè)置的。

如果提交的偏移量小于客戶端最后一次處理的偏移量,那么位于兩個偏移量之間的消息就會被重復(fù)處理

如果提交的偏移量大于最后一次消費(fèi)時的偏移量,那么處于兩個偏移量中間的消息將會丟失

既然_consumer_offset 如此重要,那么它的提交方式是怎樣的呢?下面我們就來說一下

提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費(fèi)者自動提交偏移量。如果 enable.auto.commit 被設(shè)置為true,那么每過 5s,消費(fèi)者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認(rèn)是 5s。與消費(fèi)者里的其他東西一樣,自動提交也是在輪詢中進(jìn)行的。消費(fèi)者在每次輪詢中會檢查是否提交該偏移量了,如果是,那么就會提交從上一次輪詢中返回的偏移量。

提交當(dāng)前偏移量

把 auto.commit.offset 設(shè)置為 false,可以讓應(yīng)用程序決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會有丟失消息的風(fēng)險(xiǎn),如果發(fā)生了在均衡,從最近一批消息到發(fā)生在均衡之間的所有消息都將被重復(fù)處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區(qū)別在于異步提交不會進(jìn)行重試,同步提交會一致進(jìn)行重試。

同步和異步組合提交

一般情況下,針對偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會有太大的問題,因?yàn)槿绻峤皇∈且驗(yàn)榕R時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。但是如果在關(guān)閉消費(fèi)者或再均衡前的最后一次提交,就要確保提交成功。

因此,在消費(fèi)者關(guān)閉之前一般會組合使用commitAsync和commitSync提交偏移量。

提交特定的偏移量

消費(fèi)者API允許調(diào)用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。 

 

責(zé)任編輯:龐桂玉 來源: segmentfault
相關(guān)推薦

2022-12-07 08:13:55

CNI抽象接口

2023-05-03 09:09:28

Golang數(shù)組

2022-09-08 13:58:39

Spring高并發(fā)異步

2013-04-17 11:21:59

Windows PhoWindows Pho

2018-12-24 09:51:22

CPU天梯圖Inter

2023-05-29 08:32:40

JAVA重寫重載

2024-05-27 00:00:00

AmpPHP非阻塞

2018-04-02 09:07:36

CIO

2020-10-15 07:13:53

算法監(jiān)控數(shù)據(jù)

2025-08-11 01:00:00

2020-02-10 14:26:10

GitHub代碼倉庫

2020-12-10 08:44:35

WebSocket輪詢Comet

2020-09-25 19:53:39

數(shù)據(jù)

2020-04-26 09:59:00

黑客機(jī)器學(xué)習(xí)網(wǎng)絡(luò)安全

2022-03-07 06:34:22

CQRS數(shù)據(jù)庫數(shù)據(jù)模型

2012-07-12 15:08:59

WebGL

2021-06-29 19:27:53

JAVA方法接口

2022-07-20 08:55:02

區(qū)塊鏈技術(shù)數(shù)據(jù)記錄

2020-02-20 11:32:09

Kafka概念問題

2022-01-17 14:25:14

索引數(shù)據(jù)庫搜索
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

巨乳诱惑日韩免费av| 国产高清亚洲| 97se亚洲国产综合自在线不卡| 91精品国产乱码久久久久久久久 | 国产精品一区二区三区网站| 久久久久久久久久久久av| 亚洲激情视频小说| 不卡一区视频| 欧美午夜电影在线| 裸体裸乳免费看| 免费毛片在线| 国产 日韩 欧美大片| 国产极品jizzhd欧美| 欧美黄色免费看| 欧美精选视频在线观看| 精品少妇一区二区三区| 国产高潮免费视频| 黄视频网站在线观看| 一区在线播放视频| 欧美久久久久久久| 欧美性猛交 xxxx| 久草在线在线精品观看| 欧美一级片在线播放| 日本中文在线视频| av永久不卡| 日韩精品在线电影| www.欧美com| 国产麻豆一区二区三区| 欧美性受xxxx| 97在线免费公开视频| 欧美大片黄色| 亚洲日本在线视频观看| 久久久一本精品99久久精品66| 国产成人毛毛毛片| 激情综合网av| 国产精品久久久久9999| 国产精品视频免费播放| 在线观看日韩av电影| 久久91超碰青草是什么| 国产一区二区播放| 天天天综合网| 俺去啦;欧美日韩| 欧美老女人性生活视频| 精品72久久久久中文字幕| 亚洲欧美国产另类| 少妇大叫太粗太大爽一区二区| 国产伦精品一区二区三区在线播放 | 伊人久久精品| 91精品免费观看| 天天色天天综合网| 电影一区中文字幕| 欧美一级日韩不卡播放免费| 97超碰成人在线| 久久久久久久性潮| 欧美三级电影一区| 99sesese| 国产精区一区二区| 欧美成人精精品一区二区频| 色综合久久久无码中文字幕波多| 免费欧美网站| 精品国产成人系列| 小毛片在线观看| 伊人成综合网yiren22| 亚洲三级黄色在线观看| 欧美激情久久久久久久| 手机在线电影一区| 欧美精品亚州精品| 69精品久久久| 久久婷婷久久| 国产精品丝袜视频| 国产男女无套免费网站| 国产成人精品一区二区三区网站观看| 成人免费在线看片| 亚洲欧美综合在线观看| 国产欧美日韩视频一区二区| 亚洲精品中字| 天天干在线视频论坛| 午夜精品久久久久久久蜜桃app| 天天夜碰日日摸日日澡性色av| 日本美女一区| 7777精品伊人久久久大香线蕉完整版 | www.好吊操| 美女100%一区| 欧美日产在线观看| 美女伦理水蜜桃4| 你微笑时很美电视剧整集高清不卡| 一本色道久久综合亚洲精品小说| 中文字幕电影av| 一本色道久久| 国产在线一区二区三区| 欧洲成人一区二区三区| 日本一区免费视频| av网站大全免费| 粉嫩91精品久久久久久久99蜜桃| 日韩一卡二卡三卡四卡| 无码人妻精品一区二区中文| 女人香蕉久久**毛片精品| 91精品国产91久久久久福利| 亚洲熟女乱色一区二区三区久久久| 丁香五精品蜜臀久久久久99网站| 欧美一区二区视频17c| 成人av免费| 欧美性xxxx在线播放| 九一精品久久久| 日本福利一区| 粗暴蹂躏中文一区二区三区| 欧美一区二区三区网站| 国产成人av一区二区三区在线 | 成人在线视频网| 日韩中文字幕免费在线观看| 国产精品嫩草久久久久| 波多野结衣之无限发射| 久久69av| 自拍偷拍亚洲一区| 日本高清不卡码| 国产成人福利片| 一区二区在线观看网站| 男人皇宫亚洲男人2020| 亚洲国产精品女人久久久| 91麻豆精品久久毛片一级| 另类国产ts人妖高潮视频| 99re视频在线| 免费看a在线观看| 在线精品亚洲一区二区不卡| 中国极品少妇videossexhd| 无码一区二区三区视频| 国产福利视频一区| 日本大片在线观看| 午夜久久久久久久久| 国产在线观看免费播放| 91九色精品国产一区二区| 国产成人jvid在线播放| 青青九九免费视频在线| 午夜电影网一区| 日韩黄色一区二区| 欧美激情视频一区二区三区免费| 国产欧美一区二区三区在线| 触手亚洲一区二区三区| 色哦色哦哦色天天综合| 中文字幕xxx| 99在线精品免费视频九九视 | 亚洲最好看的视频| 午夜精品www| 欧美天堂在线视频| 午夜婷婷国产麻豆精品| 久久精品女同亚洲女同13| 欧美三级午夜理伦三级中文幕| 成人午夜一级二级三级| 超碰在线免费播放| 欧美一区二区三区性视频| 91久久久久久久久久久久久久| 捆绑变态av一区二区三区 | 极品尤物久久久av免费看| av日韩免费电影| www.8ⅹ8ⅹ羞羞漫画在线看| 亚洲国产精品va在线看黑人动漫| 国产无码精品在线观看| 99国产精品国产精品久久| 免费在线a视频| 日韩丝袜视频| 国产成人小视频在线观看| jizz在线免费观看| 3atv一区二区三区| 国产一级黄色av| 99久久精品久久久久久清纯| 国模吧无码一区二区三区| 九九精品在线| 国产精品一区二区女厕厕| 久久国产精品免费观看| 好吊色这里只有精品| 男女裸体影院高潮| 精品蜜桃传媒| 国产精品电影一区| 中文字幕乱伦视频| 国产精品久久久久影院色老大| 三年中国国语在线播放免费| 区一区二视频| 91久久久在线| 草草影院在线| 亚洲欧美日韩国产中文| 艳妇乳肉豪妇荡乳av| 亚洲精品va在线观看| 小毛片在线观看| 日本在线不卡一区| 天天爱天天做天天操| 国产精品超碰| 国产精品福利在线| av毛片在线播放| 日韩精品高清视频| 一级黄色a视频| 亚洲国产cao| 少妇视频在线播放| 成人综合在线观看| 冲田杏梨av在线| 精品成人在线| 亚洲高清视频一区| 好吊妞视频这里有精品| 国产精品高清在线| 久久香蕉av| 最近2019年手机中文字幕| 国产综合无码一区二区色蜜蜜| 色综合av在线| 在线看的片片片免费| 岛国av在线一区| 最新中文字幕免费视频| 激情亚洲网站| 精品少妇人妻av一区二区| 亚洲人成网www| 国产成人一区二区三区免费看| 天堂久久午夜av| 欧美夫妻性生活xx| 免费a在线看| 亚洲人成网站999久久久综合| 国产精品视频第一页| 欧美性少妇18aaaa视频| 免费日韩在线视频| 中文字幕一区不卡| 微拍福利一区二区| 99久久伊人精品| 亚洲成人福利视频| 韩国精品久久久| 我要看一级黄色大片| 久久精品国产清高在天天线| 人人妻人人做人人爽| 亚洲精品国产首次亮相| 性欧美精品一区二区三区在线播放| 欧美亚洲国产日韩| 99久久国产免费免费| 亚洲影视资源| 国产主播欧美精品| 日韩欧美三区| 国产中文字幕91| 欧美v亚洲v综合v国产v仙踪林| 国产精品高潮粉嫩av| 欧美xxx性| 国产国语videosex另类| 成人性教育av免费网址| 国产91成人video| 交100部在线观看| 91精品国产99| 日韩大片免费观看| 国产91精品视频在线观看| 男人的天堂免费在线视频| 性欧美在线看片a免费观看| av漫画网站在线观看| 久久久久国色av免费观看性色 | 99久久婷婷国产综合精品| 亚洲av无码专区在线播放中文| 久久99精品久久久久| 污污的网站免费| 精品无人区卡一卡二卡三乱码免费卡| 色片在线免费观看| 韩国视频一区二区| 免费看91视频| 99精品视频一区二区| 18禁裸乳无遮挡啪啪无码免费| 久久先锋影音av鲁色资源网| 久久久久亚洲av无码a片| 国产精品色哟哟| 午夜精品福利在线视频| 亚洲国产精品久久人人爱蜜臀| 日韩三级免费看| 在线观看网站黄不卡| 曰批又黄又爽免费视频| 91精品国产高清一区二区三区蜜臀| 国产女人高潮毛片| 欧美成人精品3d动漫h| 视频在线观看你懂的| 亚洲网站在线播放| www免费视频观看在线| 欧美国产视频日韩| 中文在线免费二区三区| 国产一区二中文字幕在线看| 涩爱av色老久久精品偷偷鲁| 精品欧美一区二区三区久久久| 国产成人高清| 成人免费看片视频在线观看| 亚洲视频1区| 玖玖爱视频在线| 成人av网址在线| 欧美人与性囗牲恔配| 亚洲猫色日本管| 天天干天天干天天操| 欧美另类变人与禽xxxxx| 黄色av网址在线| 亚洲视频日韩精品| 在线播放蜜桃麻豆| 日韩av大片在线| 欧美成年网站| 欧美一区观看| 欧美网站在线| 九色91popny| 波多野结衣在线一区| 99自拍偷拍视频| 精品毛片网大全| 国产精品伦理一区| 精品偷拍一区二区三区在线看| 77导航福利在线| 午夜精品理论片| 91麻豆精品国产综合久久久| 蜜桃91精品入口| 欧美色图麻豆| 亚洲国产日韩欧美在线观看| 成人美女视频在线观看18| 中文字幕乱码av| 黑人巨大精品欧美一区免费视频| 国产裸体无遮挡| 亚洲一区二区国产| 精品众筹模特私拍视频| 国产日韩欧美91| 国产亚洲一区| 国产一区二区在线视频播放| 国产精品123| 欧美色视频一区二区三区在线观看| 欧美日韩在线影院| 欧美熟妇另类久久久久久不卡| 精品国偷自产在线| 亚洲综合av一区二区三区| 久久精品国产一区二区三区日韩| 欧美aⅴ99久久黑人专区| 精品亚洲一区二区三区四区| 久久精品夜色噜噜亚洲a∨| 日产精品久久久久久久| 日韩一区二区三区免费观看| 免费网站成人| 国产美女主播一区| 欧美一级本道电影免费专区| 69堂免费视频| 播五月开心婷婷综合| 国产精品第一页在线观看| 欧美va亚洲va| 欧洲精品二区| 99国精产品一二二线| 欧美福利一区| 中文字幕avav| 亚洲精品久久久久久国产精华液| 一级特黄色大片| 久久久91精品国产| 99久久999| 91视频成人免费| 国产乱对白刺激视频不卡| 国产97免费视频| 91精品国产综合久久久久久| 黄色精品在线观看| 91超碰rencao97精品| 中出一区二区| 97精品人人妻人人| 午夜欧美视频在线观看| 午夜成人鲁丝片午夜精品| 欧美在线精品免播放器视频| 欧美调教在线| 不卡av免费在线| 亚洲国产精品99久久久久久久久| 91丨九色丨海角社区| 国产一区二区三区18| 久久免费影院| 黄黄视频在线观看| 成人午夜电影小说| www.com国产| 国产一区二区三区在线免费观看| 成人国产激情在线| 超薄肉色丝袜足j调教99| 成人中文字幕电影| 黑人精品无码一区二区三区AV| 亚洲欧洲av一区二区| 久久亚洲精品人成综合网| 美女黄色片网站| 99精品视频中文字幕| 国产精华7777777| 伦伦影院午夜日韩欧美限制| 99香蕉久久| 日韩av播放器| 自拍偷自拍亚洲精品播放| 免费国产羞羞网站视频| 日韩av123| 欧美aⅴ99久久黑人专区| 欧洲一级黄色片| 欧美日韩成人综合| av资源一区| 亚洲电影网站| av在线综合网| 精品乱码一区内射人妻无码| 欧美大片在线看| 国产一区二区三区四区二区| 国产高清av片| 色综合一个色综合亚洲| 黄网页免费在线观看| 久久国产一区| 寂寞少妇一区二区三区| 国产成人亚洲精品自产在线| www国产亚洲精品久久网站| 国产精品极品| 国产美女视频免费看| 亚洲电影激情视频网站| 在线免费av电影| 久久国产精品一区二区三区| 狠狠色丁香久久婷婷综合_中| 五月婷婷激情网| 欧美成人精品一区二区| 精品99久久|