精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Kafka消費位點管理沒你想的那么簡單

云計算 Kafka
熟悉RocketMQ的小伙伴都知道RocketMQ已經(jīng)默認幫我實現(xiàn)好了消息消費失敗重試,消費位點自動提交,死信隊列等功能,那么kafka是否也是如此呢?

背景

如果你習慣了使用RocketMQ這種自動擋管理消費位點,消息失敗重試的方式。你再來使用kafka,會發(fā)現(xiàn)kafka這種手動擋的消費位點管理就沒那么容易了

熟悉RocketMQ的小伙伴都知道RocketMQ已經(jīng)默認幫我實現(xiàn)好了消息消費失敗重試,消費位點自動提交,死信隊列等功能,那么kafka是否也是如此呢?

kafka消費位點管理

kafka消費位點有兩種管理方式

  1. 手動提交消費位點
  2. 自動提交消費位點

自動提交消費位點

想要設置自動提交消費位點我們只需要設置兩個屬性

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 自動提交消費位點
  2. ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG 自動提交消費位點的時間間隔

一個簡單的消費代碼如下

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 自動提交消費位點的時間間隔
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            try {
                handlerMessage(record);
            } catch (Exception e) {
                log.error("處理消息異常: {}", record, e);
                // 循環(huán)繼續(xù)
            }

        }
    }

自動提交消費位點有幾個缺點

  1. 會出現(xiàn)重復消費:比如Consumer每5秒自動提交一次位移,如果在第4秒時,消費了消息,但是還沒有提交位移,此時Consumer掛掉了,那么下次Consumer啟動時,會從上次提交的位移開始消費,這樣就會導致消息重復消費。 當然比如出現(xiàn)Rebalance也是會出現(xiàn)重復消費的情況
  2. 無法精準控制消費位點

手動提交消費位點

手動提交消費位點又分兩種

  1. 同步提交(commitSync)
  2. 異步提交(commitAsync)

同步提交(commitSync)

同步提交的方式很簡單,就是每次消費完通過調(diào)用API consumer.commitSync。

相關(guān)的代碼如下:

Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            // 注意這里消費業(yè)務邏輯上消費失敗后的消息處理
            handlerMessage(records);
            try {
                // 消費成功后手動提交位點
                consumer.commitSync();
            } catch (CommitFailedException e) {
                // 消費位點提交失敗異常處理
                handleError(e); 
            }
        }

同步提交的方式有一個缺點,調(diào)用commitSync()時,Consumer會處于阻塞狀態(tài),直到broker返回提交成功,嚴重影響消費性能。

異步提交(commitAsync)

異步提交的方式很簡單,就是每次消費完通過調(diào)用API consumer.commitAsync。

Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            handlerMessage(records); // 處理消息
            consumer.commitAsync((offsets, exception) -> {
                if (exception != null)
                    handleError(exception);
            });
        }

commitAsync主要是提供了異步回調(diào),通過回調(diào)來通知消費位點是否提交成功。

異步提交消費位點也有一些缺點,比如消費位點不能重復提交。因為提交位點失敗后,重新提交位點可能更晚的消費位點已經(jīng)提交了,這里提交已經(jīng)是沒有意義的了。

spring-kafka消息消費

可以看到不管是同步提交消費位點還是異步提交消費位點,都有一些問題,想要寫出生產(chǎn)可用的消費代碼,需要注意的細節(jié)非常多。

比如消費失敗后的消息如何處理,是停止消費跳出循環(huán),還是說記錄消費失敗的消息,人工處理等。

這里我們可以簡單看看spring-kafka是如何消費消息的。

我們簡單看看主流程代碼:

圖片圖片

這里我們忽略源碼的一些其他細節(jié)。只分析主要的消費流程。

  • invokeOnMessage(cRecord); 處理消息

可以看到invokeOnMessage是被整個try-catch包裹的,這樣就保證了消費失敗后不會影響整個消費流程。

具體我們先看看消息正常處理的邏輯。

private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {

   if (cRecord.value() instanceof DeserializationException ex) {
    throw ex;
   }
   if (cRecord.key() instanceof DeserializationException ex) {
    throw ex;
   }
   if (cRecord.value() == null && this.checkNullValueForExceptions) {
    checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
   }
   if (cRecord.key() == null && this.checkNullKeyForExceptions) {
    checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
   }
   doInvokeOnMessage(cRecord);
   if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
    ackCurrent(cRecord);
   }
   if (this.isCountAck || this.isTimeOnlyAck) {
    doProcessCommits();
   }
  }

這里主要是一些異常校驗,然后就是判斷是否可以提交消費位點。如果可以則調(diào)用doProcessCommits()進行正常的消費位點提交。

  • doProcessCommits() 消費位點處理

如果消費位點提交失敗也會進行一些異常處理。

private void doProcessCommits() {
   if (!this.autoCommit && !this.isRecordAck) {
    try {
     processCommits();
    }
    catch (CommitFailedException cfe) {
     if (this.remainingRecords != null && !this.isBatchListener) {
      ConsumerRecords<K, V> pending = this.remainingRecords;
      this.remainingRecords = null;
      List<ConsumerRecord<?, ?>> records = new ArrayList<>();
      for (ConsumerRecord<K, V> kvConsumerRecord : pending) {
       records.add(kvConsumerRecord);
      }
      this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
        KafkaMessageListenerContainer.this.thisOrParentContainer);
     }
    }
   }
  }

如果消費位點提交失敗則會調(diào)用commonErrorHandler進行異常處理。

commonErrorHandler有多個實現(xiàn)類,有一個默認實現(xiàn)DefaultErrorHandler

  • 消息消費失敗異常處理

如果消息消費失敗,也提供了一個異常處理擴展invokeErrorHandler(cRecord, iterator, e);

里面實際使用的也是DefaultErrorHandler

核心的處理邏輯主要還是在SeekUtils中封裝

  • DefaultErrorHandler
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
   Consumer<?, ?> consumer, MessageListenerContainer container) {

  SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
    getFailureTracker(), this.logger, getLogLevel());
 }
  • SeekUtils
public static void seekOrRecover(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
    Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered,
    RecoveryStrategy recovery, LogAccessor logger, Level level) {}

可以看到有一個RecoveryStrategy參數(shù),這個是消息消費失敗如何恢復,比如我們需要手動增加一個類似死信隊列的topic,這里消息消費失敗就會自動發(fā)送到我們的死信隊列

死信隊列的topic名字生成規(guī)則主要是topicName + -dlt

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
  DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "-dlt", cr.partition());

總結(jié)

可以看到如果我們單純的使用kafka-client原生的sdk來進行消息消費,是非常容易出現(xiàn)問題的。

我們需要很多細節(jié),比如

  1. 消息消費失敗了如何處理,是否需要重試,如果重試還是失敗怎么辦?丟掉還是手動處理丟到自己創(chuàng)建的死信隊列中。
  2. 消費位點提交失敗了如何處理。
  3. 消費位點是使用同步提交還是異步提交?或者混合提交?

所以如果spring boot項目還是建議使用spring相關(guān)已經(jīng)封裝好的kafka sdk。

非必要盡量不要使用原生的kafka-client sdk。

責任編輯:武曉燕 來源: 小奏技術(shù)
相關(guān)推薦

2015-04-30 10:12:13

開源云平臺OpenStack

2017-08-09 14:49:03

WebHTTPS瀏覽器

2014-08-25 10:17:54

數(shù)據(jù)中心管理

2021-03-29 13:00:50

代碼替換開發(fā)

2020-03-26 10:41:02

API網(wǎng)關(guān)大公司

2014-03-14 09:35:56

內(nèi)存優(yōu)化軟件內(nèi)存優(yōu)化

2015-06-24 10:32:13

訊鳥云計算會展

2016-01-07 10:17:48

2025-08-05 07:58:28

2021-08-02 15:24:19

Windows 11Windows微軟

2014-03-21 15:30:06

產(chǎn)品經(jīng)理PM能力

2023-12-28 12:07:21

2016-07-25 12:58:07

SDN路由故障排查

2013-01-15 10:09:43

Windows Ser

2014-07-09 09:06:33

SDN自動化

2010-08-04 09:20:31

JavaScript

2009-06-22 14:02:00

2019-05-17 09:33:50

圖像識別三維重建文本識別

2020-01-03 08:44:05

TCP網(wǎng)絡協(xié)議三次握手

2013-02-19 09:21:01

Win 8
點贊
收藏

51CTO技術(shù)棧公眾號

中文字幕国产精品| 一本色道久久综合亚洲精品按摩| 91沈先生作品| 久久精品国产亚洲av香蕉| 国内精品国产成人国产三级粉色| 精品久久久香蕉免费精品视频| 日本一区二区三区视频在线播放| 国产乱码精品一区二区| 亚洲国产日本| 一区二区在线视频播放| 免费观看黄网站| 免费观看欧美大片| 亚洲精品一二三区| 久久成人资源| 99久久婷婷国产一区二区三区| 99国产精品| www.亚洲一区| 午夜理伦三级做爰电影| 免费欧美网站| 日本丰满少妇一区二区三区| 国产精品88久久久久久妇女| 你懂的视频在线免费| 日韩高清在线一区| 亚洲97在线观看| 国产激情无码一区二区三区| 神马久久一区二区三区| 精品国内片67194| 免费一区二区三区在线观看| 涩涩av在线| 一区二区三区精品| 伊人av成人| 国产在线一在线二| 99re成人在线| 成人动漫视频在线观看完整版| 在线播放成人av| 日韩精品一级二级 | 欧美精品黑人性xxxx| 国产主播在线看| 午夜影院免费在线| 亚洲少妇30p| 一区二区三区免费看| 精品av中文字幕在线毛片| 成人av电影在线| 国产99在线免费| a天堂视频在线| 极品少妇xxxx精品少妇偷拍| 国产精品久久久久一区二区| 久久久久久久久久久久久av| 夜夜嗨一区二区三区| 九九久久精品一区| 紧身裙女教师波多野结衣| 欧美在线电影| 中文字幕日韩电影| jizzjizz日本少妇| 91一区二区三区四区| 一区二区三区黄色| 国产精品情侣呻吟对白视频| 国产精品探花在线观看| 亚洲免费精彩视频| 欧美熟妇一区二区| 一本色道久久综合亚洲精品酒店| 日韩精品免费在线| 国产特黄级aaaaa片免| 日本在线中文字幕一区| 日韩精品中文字幕视频在线| 精品少妇人妻一区二区黑料社区| 神马午夜久久| 国产亚洲精品va在线观看| www.av天天| 色爱综合网欧美| 超碰精品一区二区三区乱码| 国产稀缺精品盗摄盗拍| 欧美精品啪啪| 国模视频一区二区| 国产专区第一页| 日韩 欧美一区二区三区| 国产欧美日韩精品专区| 97在线视频人妻无码| 国产98色在线|日韩| 久久久com| 免费人成在线观看网站| 国产精品视频你懂的| 91精品一区二区三区四区| av中文字幕在线观看| 亚洲不卡在线观看| 免费激情视频在线观看| www.久久热| 亚洲国产一区自拍| 免费看的黄色网| 91精品国产91久久久久久密臀 | 亚洲天堂手机| 欧美视频中文字幕| 少妇献身老头系列| 国产伦精品一区二区三区千人斩| 在线视频免费一区二区| 免费在线视频观看| 日韩精品国产欧美| av色综合网| 国产精品久久久久久久龚玥菲| 亚洲精品久久久蜜桃| 国产网站免费在线观看| 国产精品毛片无码| 亚洲美女在线看| 久久这里只有精品国产| 日韩 欧美一区二区三区| 91丝袜脚交足在线播放| 国产在线视频资源| 一区二区久久久| 黑森林精品导航| 丁香婷婷成人| 最好看的2019的中文字幕视频| 91九色视频在线| 99热一区二区| 一区三区自拍| 伊人男人综合视频网| av激情在线观看| 亚洲专区一区| 147欧美人体大胆444| 欧美日韩影视| 亚洲永久精品国产| 色婷婷狠狠18| 牛牛影视一区二区三区免费看| 色综久久综合桃花网| 日韩污视频在线观看| 精品综合久久久久久8888| 久久国产一区二区| 尤物视频在线看| 欧美日韩三级在线| 日韩av一二区| 好看的av在线不卡观看| 成人美女免费网站视频| 国产原创av在线| 精品国产1区2区| 手机在线播放av| 日本久久黄色| 国产极品精品在线观看| 午夜成人鲁丝片午夜精品| 亚洲精品国产一区二区三区四区在线| 中文字幕第80页| 亚洲丝袜美腿一区| 91av视频在线| 日韩在线观看视频一区| 亚洲激情图片小说视频| 999在线精品视频| 日韩综合精品| 成人欧美一区二区三区在线| 在线免费观看的av网站| 在线观看国产精品网站| 成人黄色免费网址| 久久久久99| 欧美亚洲免费高清在线观看 | 欧美情侣性视频| 国产精品乱码一区二区| 一色桃子久久精品亚洲| av噜噜在线观看| 91精品秘密在线观看| 成人在线视频网| av免费在线免费观看| 日韩一区二区在线免费观看| 丰满少妇高潮久久三区| 国产精品一二三四区| 又大又硬又爽免费视频| 国产极品模特精品一二| 69av成年福利视频| 可以在线观看的av| 精品视频资源站| 精品少妇一区二区三区密爱| 国内精品伊人久久久久av影院| 在线免费观看一区二区三区| www久久久| 欧美极品欧美精品欧美视频| 刘玥91精选国产在线观看| 欧美日韩一区二区免费在线观看 | 欧美日韩一二| 国产一区私人高清影院| 2021国产在线| 亚洲精品国产欧美| 久久久久精彩视频| 亚洲欧美日韩国产中文在线| 精品人妻一区二区免费| 久久精品一区二区国产| 亚洲国产精品久久久久婷婷老年| 91精品视频一区二区| 隔壁老王国产在线精品| 久久久久久久久亚洲精品| 欧美亚洲国产bt| 中文字幕av久久爽av| 91在线免费播放| 嫩草影院国产精品| 欧美激情1区2区| 免费在线成人av电影| 亚洲精品成a人ⅴ香蕉片| 欧美日本中文字幕| 国产美女视频一区二区三区| 日韩一区二区三区四区| 亚洲日本韩国在线| 亚洲视频在线一区| 亚洲欧美视频在线播放| 精品一区二区三区的国产在线播放 | 51ⅴ精品国产91久久久久久| 亚洲欧美视频一区二区| 亚洲成avwww人| 欧美日韩 一区二区三区| 一区二区免费在线| 在线观看免费小视频| 福利一区二区在线观看| 538任你躁在线精品免费| 亚洲高清久久| 亚洲在线视频一区二区| 西野翔中文久久精品字幕| 亚洲free嫩bbb| 香蕉成人影院| 青草青草久热精品视频在线观看| 成人免费网址| 中文字幕亚洲一区二区三区五十路| 黄色小视频免费在线观看| 欧美精品xxxxbbbb| 亚洲不卡在线视频| 天天射综合影视| 极品颜值美女露脸啪啪| 中文字幕乱码久久午夜不卡| 国产一级黄色录像| 国产成人综合亚洲91猫咪| 欧美日韩怡红院| 蜜桃久久av| 精品无码一区二区三区在线| 亚洲欧美在线专区| 亚洲精品永久www嫩草| 美女久久99| 国产成人亚洲欧美| 精品视频在线一区| 国产在线视频欧美| 成人午夜在线| 国产精品电影观看| 欧美电影网站| 欧洲精品毛片网站| 小h片在线观看| 69精品小视频| av剧情在线观看| 久久男人av资源网站| 秋霞在线午夜| 欧美精品videos另类日本| 在线视频国产区| 久久6精品影院| 视频在线观看入口黄最新永久免费国产| 日韩在线一区二区三区免费视频| h视频网站在线观看| 亚洲天堂网站在线观看视频| 免费黄色片在线观看| 亚洲欧美国产一本综合首页| 四虎成人免费在线| 日韩av中文字幕在线免费观看| 蜜臀久久精品久久久久| 亚洲第一福利网站| 神马午夜电影一区二区三区在线观看| 精品国产1区二区| 日韩一卡二卡在线| 日韩精品视频在线| 青青草免费观看免费视频在线| 麻豆精品视频在线| 天天综合天天综合色| 精品国产乱码久久久久久免费| a级片免费观看| 日韩精品资源二区在线| 国产高清免费观看| 欧美成人激情免费网| 老司机午夜福利视频| 亚洲精品国产美女| 国产人成在线观看| 日韩有码在线观看| 天天色天天射天天综合网| 久久琪琪电影院| 欧美色999| 国产主播精品在线| 999久久久久久久久6666| 久久福利电影| 色综合色综合| 欧美激情亚洲天堂| 国产精品综合色区在线观看| 成人免费视频久久| 国内久久精品视频| 国产精品无码一区二区三| 国产亚洲精品资源在线26u| 国产精品麻豆免费版现看视频| 亚洲特黄一级片| 日韩男人的天堂| 在线精品观看国产| 国产黄色片免费| 日韩精品免费在线| 看女生喷水的网站在线观看| 久久久久久久成人| 97欧美成人| 成人久久18免费网站漫画| 女人丝袜激情亚洲| 99热都是精品| 午夜亚洲激情| 天天做天天干天天操| 久久综合丝袜日本网| 老熟妇高潮一区二区三区| 欧美日韩精品在线| 国产剧情精品在线| 国产午夜精品视频| 精品日韩av| 国产日韩换脸av一区在线观看| 高清一区二区三区| 一级一片免费播放| 美女久久一区| 免费不卡的av| 1024亚洲合集| 波多野结衣绝顶大高潮| 日韩欧美国产wwwww| 97视频精彩视频在线观看| 国外成人性视频| 欧美日本三级| 夜夜爽99久久国产综合精品女不卡| 亚洲人成久久| 国产毛片久久久久久| 国产视频不卡一区| 女人十八岁毛片| 精品美女在线播放| av网站在线免费| 国产精品自拍偷拍视频| 日韩超碰人人爽人人做人人添| 欧美 另类 交| 美女在线视频一区| 9.1成人看片免费版| 亚洲成人高清在线| 性生活免费网站| 久久精品最新地址| 国产精品久久乐| 少妇精品久久久久久久久久| 亚洲少妇一区| 美女又爽又黄视频毛茸茸| 亚洲一区二区黄色| www.亚洲天堂.com| 欧美成人精品h版在线观看| 欧美一区二区三区婷婷| 亚洲精品成人a8198a| 日韩精品一区第一页| 天堂久久精品忘忧草| 色综合久久88色综合天天免费| 天天干天天干天天干| 韩国福利视频一区| 日韩精品视频一区二区三区| 成年人免费观看的视频| 蓝色福利精品导航| 波兰性xxxxx极品hd| 欧美高清性hdvideosex| 麻豆视频在线免费观看| 国产综合福利在线| 91超碰国产精品| 黄色a级三级三级三级| 亚洲黄色av一区| 黄色a在线观看| 97精品国产91久久久久久| 精品亚洲自拍| 东京热加勒比无码少妇| 久久精品视频一区二区三区| 无码人妻丰满熟妇区五十路| 国产亚洲欧洲在线| 青草综合视频| 国产人妻人伦精品| 成人国产精品免费观看动漫| 天天操天天干视频| 亚洲人av在线影院| 亚洲人成网站在线在线观看| 欧美日韩视频免费在线观看| 国产精品1区2区| 国产成人无码精品亚洲| 亚洲欧美日韩天堂| 国产精品第一| 国产激情片在线观看| bt7086福利一区国产| 中文字幕在线播| 久久精品在线视频| 精品人人人人| 亚洲福利精品视频| 亚洲免费观看视频| 午夜av免费观看| 国产欧美在线视频| 国产精品二区影院| 欧美特黄一区二区三区| 欧美欧美午夜aⅴ在线观看| 制服丝袜中文字幕在线| 久久er99热精品一区二区三区| 青青草原综合久久大伊人精品优势| 污软件在线观看| 精品国产免费视频| 国产亚洲一区二区手机在线观看| av电影一区二区三区| av一二三不卡影片| 亚洲天堂视频网| 国产69精品久久久久9999| 欧美日韩有码| 欧洲成人午夜精品无码区久久| 日韩欧美亚洲一二三区| av免费在线免费观看| 日韩亚洲一区在线播放| 国产乱码精品一区二区三 | 美女做暖暖视频免费在线观看全部网址91|