精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spring boot集成Kafka之spring-kafka深入探秘

開發(fā) 后端 Kafka
Spring創(chuàng)建了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項目里快速集成kafka。除了簡單的收發(fā)消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。

前言                    

kafka是一個消息隊列產(chǎn)品,基于Topic partitions的設(shè)計,能達到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項目里快速集成kafka。除了簡單的收發(fā)消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。

項目地址:https://github.com/spring-projects/spring-kafka

簡單集成

引入依賴 

  1. <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency> 

添加配置 

  1. spring.kafka.producer.bootstrap-servers=127.0.0.1:9092 

測試發(fā)送和接收 

  1. /**  
  2.  * @author: kl @kailing.pub  
  3.  * @date: 2019/5/30  
  4.  */  
  5. @SpringBootApplication  
  6. @RestController  
  7. public class Application {  
  8.   private final Logger logger = LoggerFactory.getLogger(Application.class);  
  9.   public static void main(String[] args) {  
  10.     SpringApplication.run(Application.class, args);  
  11.   } 
  12.   @Autowired  
  13.   private KafkaTemplate<Object, Object> template;  
  14.   @GetMapping("/send/{input}")  
  15.   public void sendFoo(@PathVariable String input) {  
  16.     this.template.send("topic_input", input);  
  17.   }  
  18.   @KafkaListener(id = "webGroup"topics = "topic_input" 
  19.   public void listen(String input) {  
  20.     logger.info("input value: {}" , input);  
  21.   }  

啟動應(yīng)用后,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制臺看到有日志輸出了:input value: "kl"。基礎(chǔ)的使用就這么簡單。發(fā)送消息時注入一個KafkaTemplate,接收消息時添加一個@KafkaListener注解即可。

Spring-kafka-test嵌入式Kafka Server

不過上面的代碼能夠啟動成功,前提是你已經(jīng)有了Kafka Server的服務(wù)環(huán)境,我們知道Kafka是由Scala + Zookeeper構(gòu)建的,可以從官網(wǎng)下載部署包在本地部署。但是,我想告訴你,為了簡化開發(fā)環(huán)節(jié)驗證Kafka相關(guān)功能,Spring-Kafka-Test已經(jīng)封裝了Kafka-test提供了注解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文后面的所有測試用例的Kafka都是使用這種嵌入式服務(wù)提供的。

引入依賴 

  1. <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.2.6.RELEASE</version><scope>test</scope></dependency> 

啟動服務(wù)

下面使用Junit測試用例,直接啟動一個Kafka Server服務(wù),包含四個Broker節(jié)點。 

  1. @RunWith(SpringRunner.class)@SpringBootTest(classes = ApplicationTests.class)@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})public class ApplicationTests {@Testpublic void contextLoads()throws IOException {    System.in.read();  }} 

如上:只需要一個注解@EmbeddedKafka即可,就可以啟動一個功能完整的Kafka服務(wù),是不是很酷。默認(rèn)只寫注解不加參數(shù)的情況下,是創(chuàng)建一個隨機端口的Broker,在啟動的日志中會輸出具體的端口以及默認(rèn)的一些配置項。不過這些我們在Kafka安裝包配置文件中的配置項,在注解參數(shù)中都可以配置,下面詳解下@EmbeddedKafka注解中的可設(shè)置參數(shù) :

  •  value:broker節(jié)點數(shù)量
  •  count:同value作用一樣,也是配置的broker的節(jié)點數(shù)量
  •  controlledShutdown:控制關(guān)閉開關(guān),主要用來在Broker意外關(guān)閉時減少此Broker上Partition的不可用時間

Kafka是多Broker架構(gòu)的高可用服務(wù),一個Topic對應(yīng)多個partition,一個Partition可以有多個副本Replication,這些Replication副本保存在多個Broker,用于高可用。但是,雖然存在多個分區(qū)副本集,當(dāng)前工作副本集卻只有一個,默認(rèn)就是首次分配的副本集【首選副本】為Leader,負(fù)責(zé)寫入和讀取數(shù)據(jù)。當(dāng)我們升級Broker或者更新Broker配置時需要重啟服務(wù),這個時候需要將partition轉(zhuǎn)移到可用的Broker。下面涉及到三種情況

  1.   直接關(guān)閉Broker:當(dāng)Broker關(guān)閉時,Broker集群會重新進行選主操作,選出一個新的Broker來作為Partition Leader,選舉時此Broker上的Partition會短時不可用
  2.   開啟controlledShutdown:當(dāng)Broker關(guān)閉時,Broker本身會先嘗試將Leader角色轉(zhuǎn)移到其他可用的Broker上
  3.   使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動觸發(fā)PartitionLeader角色轉(zhuǎn)移
  •  ports:端口列表,是一個數(shù)組。對應(yīng)了count參數(shù),有幾個Broker,就要對應(yīng)幾個端口號
  •  brokerProperties:Broker參數(shù)設(shè)置,是一個數(shù)組結(jié)構(gòu),支持如下方式進行Broker參數(shù)設(shè)置: 
  1. @EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"}) 
  •   kerPropertiesLocation:Broker參數(shù)文件設(shè)置

  功能同上面的brokerProperties,只是Kafka Broker的可設(shè)置參數(shù)達182個之多,都像上面這樣配置肯定不是最優(yōu)方案,所以提供了加載本地配置文件的功能,如: 

  1. @EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties"

默認(rèn)情況下,如果在使用KafkaTemplate發(fā)送消息時,Topic不存在,會創(chuàng)建一個新的Topic,默認(rèn)的分區(qū)數(shù)和副本數(shù)為如下Broker參數(shù)來設(shè)定

創(chuàng)建新的Topic 

  1. num.partitions = 1 #默認(rèn)Topic分區(qū)數(shù)  
  2. num.replica.fetchers = 1 #默認(rèn)副本數(shù) 

程序啟動時創(chuàng)建Topic 

  1. /**  
  2.  * @author: kl @kailing.pub  
  3.  * @date: 2019/5/31  
  4.  */  
  5. @Configuration  
  6. public class KafkaConfig {  
  7.   @Bean  
  8.   public KafkaAdmin admin(KafkaProperties properties){  
  9.     KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());  
  10.     admin.setFatalIfBrokerNotAvailable(true);  
  11.     return admin;  
  12.   }  
  13.   @Bean  
  14.   public NewTopic topic2() {  
  15.     return new NewTopic("topic-kl", 1, (short) 1);  
  16.   }  

如果Kafka Broker支持(1.0.0或更高版本),則如果發(fā)現(xiàn)現(xiàn)有Topic的Partition 數(shù)少于設(shè)置的Partition 數(shù),則會新增新的Partition分區(qū)。關(guān)于KafkaAdmin有幾個常用的用法如下:

setFatalIfBrokerNotAvailable(true):默認(rèn)這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業(yè)務(wù)需要顯示的將這個值設(shè)置為True

setAutoCreate(false) : 默認(rèn)值為True,也就是Kafka實例化后會自動創(chuàng)建已經(jīng)實例化的NewTopic對象

initialize():當(dāng)setAutoCreate為false時,需要我們程序顯示的調(diào)用admin的initialize()方法來初始化NewTopic對象

代碼邏輯中創(chuàng)建

有時候我們在程序啟動時并不知道某個Topic需要多少Partition數(shù)合適,但是又不能一股腦的直接使用Broker的默認(rèn)設(shè)置,這個時候就需要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如: 

  1. @Autowired  
  2.   private KafkaProperties properties;  
  3.   @Test  
  4.   public void testCreateToipc(){  
  5.     AdminClient client = AdminClient.create(properties.buildAdminProperties());  
  6.     if(client !=null){  
  7.       try {  
  8.         Collection<NewTopic> newnewTopics = new ArrayList<>(1);  
  9.         newTopics.add(new NewTopic("topic-kl",1,(short) 1));  
  10.         client.createTopics(newTopics);  
  11.       }catch (Throwable e){  
  12.         e.printStackTrace();  
  13.       }finally {  
  14.         client.close();  
  15.       }  
  16.     }  
  17.   } 

ps:其他的方式創(chuàng)建Topic

上面的這些創(chuàng)建Topic方式前提是你的spring boot版本到2.x以上了,因為spring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補充一種在程序中通過Kafka_2.10創(chuàng)建Topic的方式

引入依賴 

  1. <dependency>  
  2.     <groupId>org.apache.kafka</groupId>  
  3.     <artifactId>kafka_2.10</artifactId>  
  4.     <version>0.8.2.2</version>  
  5. </dependency> 

api方式創(chuàng)建 

  1. @Test  
  2.   public void testCreateTopic()throws Exception{  
  3.     ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)  
  4.     String topicName = "topic-kl" 
  5.     int partitions = 1 
  6.     int replication = 1 
  7.     AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());  
  8.   } 

注意下ZkClient最后一個構(gòu)造入?yún)ⅲ且粋€序列化反序列化的接口實現(xiàn),博主測試如果不填的話,創(chuàng)建的Topic在ZK上的數(shù)據(jù)是有問題的,默認(rèn)的Kafka實現(xiàn)也很簡單,就是做了字符串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經(jīng)實現(xiàn)好的一個接口實例,是一個Scala的伴生對象,在Java中直接調(diào)用點MODULE$就可以得到一個實例

命令方式創(chuàng)建 

  1. @Test  
  2.   public void testCreateTopic(){  
  3.     String [] optionsnew String[]{  
  4.         "--create",  
  5.         "--zookeeper","127.0.0.1:2181",  
  6.         "--replication-factor", "3",  
  7.         "--partitions", "3",  
  8.         "--topic", "topic-kl"  
  9.     };  
  10.     TopicCommand.main(options);  
  11.   } 

消息發(fā)送之KafkaTemplate探秘

獲取發(fā)送結(jié)果

異步獲取 

  1. template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {  
  2.       @Override  
  3.       public void onFailure(Throwable throwable) {  
  4.         ......  
  5.       }  
  6.       @Override  
  7.       public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {  
  8.         ....  
  9.       }  
  10.     }); 

同步獲取 

  1. ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");  
  2.     try {  
  3.       SendResult<Object,Object> result = future.get();  
  4.     }catch (Throwable e){  
  5.       e.printStackTrace();  
  6.     } 

kafka事務(wù)消息

默認(rèn)情況下,Spring-kafka自動生成的KafkaTemplate實例,是不具有事務(wù)消息發(fā)送能力的。需要使用如下配置激活事務(wù)特性。事務(wù)激活后,所有的消息發(fā)送只能在發(fā)生事務(wù)的方法內(nèi)執(zhí)行了,不然就會拋一個沒有事務(wù)交易的異常 

  1. spring.kafka.producer.transaction-id-prefix=kafka_tx

當(dāng)發(fā)送消息有事務(wù)要求時,比如,當(dāng)所有消息發(fā)送成功才算成功,如下面的例子:假設(shè)第一條消費發(fā)送后,在發(fā)第二條消息前出現(xiàn)了異常,那么第一條已經(jīng)發(fā)送的消息也會回滾。而且正常情況下,假設(shè)在消息一發(fā)送后休眠一段時間,在發(fā)送第二條消息,消費端也只有在事務(wù)方法執(zhí)行完成后才會接收到消息 

  1. @GetMapping("/send/{input}")  
  2.   public void sendFoo(@PathVariable String input) {  
  3.     template.executeInTransaction(t -> 
  4.       t.send("topic_input","kl");  
  5.       if("error".equals(input)){  
  6.         throw new RuntimeException("failed");  
  7.       }  
  8.       t.send("topic_input","ckl");  
  9.       return true;  
  10.     });  
  11.   } 

當(dāng)事務(wù)特性激活時,同樣,在方法上面加@Transactional注解也會生效 

  1. @GetMapping("/send/{input}")  
  2.   @Transactional(rollbackFor = RuntimeException.class)  
  3.   public void sendFoo(@PathVariable String input) { 
  4.      template.send("topic_input", "kl");  
  5.     if ("error".equals(input)) {  
  6.       throw new RuntimeException("failed");  
  7.     }  
  8.     template.send("topic_input", "ckl");  
  9.   } 

Spring-Kafka的事務(wù)消息是基于Kafka提供的事務(wù)消息功能的。而Kafka Broker默認(rèn)的配置針對的三個或以上Broker高可用服務(wù)而設(shè)置的。這邊在測試的時候為了簡單方便,使用了嵌入式服務(wù)新建了一個單Broker的Kafka服務(wù),出現(xiàn)了一些問題:如

1、事務(wù)日志副本集大于Broker數(shù)量,會拋如下異常: 

  1. Number of alive brokers '1' does not meet the required replication factor '3'   
  2. for the transactions state topic (configured via 'transaction.state.log.replication.factor'). 
  3. This error can be ignored if the cluster is starting up and not all brokers are up yet. 

默認(rèn)Broker的配置transaction.state.log.replication.factor=3,單節(jié)點只能調(diào)整為1

2、副本數(shù)小于副本同步隊列數(shù)目,會拋如下異常 

  1. Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2] 

默認(rèn)Broker的配置transaction.state.log.min.isr=2,單節(jié)點只能調(diào)整為1

ReplyingKafkaTemplate獲得消息回復(fù)

ReplyingKafkaTemplate是KafkaTemplate的一個子類,除了繼承父類的方法,新增了一個方法sendAndReceive,實現(xiàn)了消息發(fā)送\回復(fù)語義 

  1. RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record); 

也就是我發(fā)送一條消息,能夠拿到消費者給我返回的結(jié)果。就像傳統(tǒng)的RPC交互那樣。當(dāng)消息的發(fā)送者需要知道消息消費者的具體的消費情況,非常適合這個api。如,一條消息中發(fā)送一批數(shù)據(jù),需要知道消費者成功處理了哪些數(shù)據(jù)。下面代碼演示了怎么集成以及使用ReplyingKafkaTemplate 

  1. /**  
  2.  * @author: kl @kailing.pub  
  3.  * @date: 2019/5/30  
  4.  */  
  5. @SpringBootApplication  
  6. @RestController 
  7.  public class Application {  
  8.   private final Logger logger = LoggerFactory.getLogger(Application.class);  
  9.   public static void main(String[] args) {  
  10.     SpringApplication.run(Application.class, args);  
  11.   }  
  12.   @Bean  
  13.   public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {  
  14.     ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");  
  15.     repliesContainer.getContainerProperties().setGroupId("repliesGroup");  
  16.     repliesContainer.setAutoStartup(false);  
  17.     return repliesContainer;  
  18.   }  
  19.   @Bean  
  20.   public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) { 
  21.      return new ReplyingKafkaTemplate(pf, repliesContainer);  
  22.   }  
  23.   @Bean  
  24.   public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {  
  25.     return new KafkaTemplate(pf);  
  26.   }  
  27.   @Autowired  
  28.   private ReplyingKafkaTemplate template;  
  29.   @GetMapping("/send/{input}")  
  30.   @Transactional(rollbackFor = RuntimeException.class)  
  31.   public void sendFoo(@PathVariable String input) throws Exception {  
  32.     ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);  
  33.     RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);  
  34.     ConsumerRecord<String, String> consumerRecord = replyFuture.get();  
  35.     System.err.println("Return value: " + consumerRecord.value());  
  36.   }  
  37.   @KafkaListener(id = "webGroup"topics = "topic-kl" 
  38.   @SendTo  
  39.   public String listen(String input) {  
  40.     logger.info("input value: {}", input);  
  41.     return "successful";  
  42.   }  

Spring-kafka消息消費用法探秘

@KafkaListener的使用

前面在簡單集成中已經(jīng)演示過了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:

  •  顯示的指定消費哪些Topic和分區(qū)的消息,
  •  設(shè)置每個Topic以及分區(qū)初始化的偏移量,
  •  設(shè)置消費線程并發(fā)度
  •  設(shè)置消息異常處理器 
  1. @KafkaListener(id = "webGroup"topicPartitions = {  
  2.       @TopicPartition(topic = "topic1"partitions = {"0", "1"}),  
  3.           @TopicPartition(topic = "topic2"partitions = "0" 
  4.               partitionOffsets = @PartitionOffset(partition = "1"initialOffset = "100"))  
  5.       },concurrency = "6",errorHandler = "myErrorHandler" 
  6.   public String listen(String input) {  
  7.     logger.info("input value: {}", input);  
  8.     return "successful";  
  9.   } 

其他的注解參數(shù)都很好理解,errorHandler需要說明下,設(shè)置這個參數(shù)需要實現(xiàn)一個接口KafkaListenerErrorHandler。而且注解里的配置,是你自定義實現(xiàn)實例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應(yīng)該存在這樣一個實例: 

  1. /**  
  2.  * @author: kl @kailing.pub  
  3.  * @date: 2019/5/31  
  4.  */  
  5. @Service("myErrorHandler")  
  6. public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {  
  7.   Logger logger =LoggerFactory.getLogger(getClass()); 
  8.    @Override  
  9.   public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {  
  10.     logger.info(message.getPayload().toString());  
  11.     return null;  
  12.   }  
  13.   @Override  
  14.   public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<??> consumer) {  
  15.     logger.info(message.getPayload().toString());  
  16.     return null;  
  17.   }  

手動Ack模式

手動ACK模式,由業(yè)務(wù)邏輯控制提交偏移量。比如程序在消費時,有這種語義,特別異常情況下不確認(rèn)ack,也就是不提交偏移量,那么你只能使用手動Ack模式來做了。開啟手動首先需要關(guān)閉自動提交,然后設(shè)置下consumer的消費模式 

  1. spring.kafka.consumer.enable-auto-commit=false  
  2. spring.kafka.listener.ack-mode=manual 

上面的設(shè)置好后,在消費時,只需要在@KafkaListener監(jiān)聽方法的入?yún)⒓尤階cknowledgment 即可,執(zhí)行到ack.acknowledge()代表提交了偏移量 

  1. @KafkaListener(id = "webGroup"topics = "topic-kl" 
  2.   public String listen(String input, Acknowledgment ack) {  
  3.     logger.info("input value: {}", input);  
  4.     if ("kl".equals(input)) {  
  5.       ack.acknowledge();  
  6.     }  
  7.     return "successful";  
  8.   } 

@KafkaListener注解監(jiān)聽器生命周期

@KafkaListener注解的監(jiān)聽器的生命周期是可以控制的,默認(rèn)情況下,@KafkaListener的參數(shù)autoStartup = "true"。也就是自動啟動消費,但是也可以同過KafkaListenerEndpointRegistry來干預(yù)他的生命周期。KafkaListenerEndpointRegistry有三個動作方法分別如:start(),pause(),resume()/啟動,停止,繼續(xù)。如下代碼詳細(xì)演示了這種功能。 

  1. /**  
  2.  * @author: kl @kailing.pub  
  3.  * @date: 2019/5/30  
  4.  */  
  5. @SpringBootApplication  
  6. @RestController  
  7. public class Application {  
  8.   private final Logger logger = LoggerFactory.getLogger(Application.class);  
  9.   public static void main(String[] args) {  
  10.     SpringApplication.run(Application.class, args);  
  11.   }  
  12.   @Autowired  
  13.   private KafkaTemplate template;  
  14.   @GetMapping("/send/{input}")  
  15.   @Transactional(rollbackFor = RuntimeException.class)  
  16.   public void sendFoo(@PathVariable String input) throws Exception {  
  17.     ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);  
  18.     template.send(record);  
  19.   }  
  20.   @Autowired  
  21.   private KafkaListenerEndpointRegistry registry;  
  22.   @GetMapping("/stop/{listenerID}")  
  23.   public void stop(@PathVariable String listenerID){  
  24.     registry.getListenerContainer(listenerID).pause();  
  25.   }  
  26.   @GetMapping("/resume/{listenerID}")  
  27.   public void resume(@PathVariable String listenerID){  
  28.     registry.getListenerContainer(listenerID).resume();  
  29.   }  
  30.   @GetMapping("/start/{listenerID}")  
  31.   public void start(@PathVariable String listenerID){  
  32.     registry.getListenerContainer(listenerID).start();  
  33.   }  
  34.   @KafkaListener(id = "webGroup"topics = "topic-kl",autoStartup = "false" 
  35.   public String listen(String input) {  
  36.     logger.info("input value: {}", input);  
  37.     return "successful";  
  38.   }  

在上面的代碼中,listenerID就是@KafkaListener中的id值“webGroup”。項目啟動好后,分別執(zhí)行如下url,就可以看到效果了。

先發(fā)送一條消息:http://localhost:8081/send/ckl。因為autoStartup = "false",所以并不會看到有消息進入監(jiān)聽器。

接著啟動監(jiān)聽器:http://localhost:8081/start/webGroup。可以看到有一條消息進來了。

暫停和繼續(xù)消費的效果使用類似方法就可以測試出來了。

SendTo消息轉(zhuǎn)發(fā)

前面的消息發(fā)送響應(yīng)應(yīng)用里面已經(jīng)見過@SendTo,其實除了做發(fā)送響應(yīng)語義外,@SendTo注解還可以帶一個參數(shù),指定轉(zhuǎn)發(fā)的Topic隊列。常見的場景如,一個消息需要做多重加工,不同的加工耗費的cup等資源不一致,那么就可以通過跨不同Topic和部署在不同主機上的consumer來解決了。如: 

  1. @KafkaListener(id = "webGroup"topics = "topic-kl" 
  2.   @SendTo("topic-ckl")  
  3.   public String listen(String input) {  
  4.     logger.info("input value: {}", input);  
  5.     return input + "hello!";  
  6.   }  
  7.   @KafkaListener(id = "webGroup1"topics = "topic-ckl" 
  8.   public void listen2(String input) {  
  9.     logger.info("input value: {}", input);  
  10.   } 

消息重試和死信隊列的應(yīng)用

除了上面談到的通過手動Ack模式來控制消息偏移量外,其實Spring-kafka內(nèi)部還封裝了可重試消費消息的語義,也就是可以設(shè)置為當(dāng)消費數(shù)據(jù)出現(xiàn)異常時,重試這個消息。而且可以設(shè)置重試達到多少次后,讓消息進入預(yù)定好的Topic。也就是死信隊列里。下面代碼演示了這種效果: 

  1. @Autowired  
  2.   private KafkaTemplate template;  
  3.   @Bean  
  4.   public ConcurrentKafkaListenerContainerFactory<??> kafkaListenerContainerFactory(  
  5.       ConcurrentKafkaListenerContainerFactoryConfigurer configurer,  
  6.       ConsumerFactory<Object, Object> kafkaConsumerFactory,  
  7.       KafkaTemplate<Object, Object> template) {  
  8.     ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();  
  9.     configurer.configure(factory, kafkaConsumerFactory);  
  10.     //最大重試三次  
  11.     factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));  
  12.     return factory;  
  13.   }  
  14.   @GetMapping("/send/{input}")  
  15.   public void sendFoo(@PathVariable String input) {  
  16.     template.send("topic-kl", input);  
  17.   }  
  18.   @KafkaListener(id = "webGroup"topics = "topic-kl" 
  19.   public String listen(String input) {  
  20.     logger.info("input value: {}", input);  
  21.     throw new RuntimeException("dlt");  
  22.   }  
  23.   @KafkaListener(id = "dltGroup"topics = "topic-kl.DLT" 
  24.   public void dltListen(String input) {  
  25.     logger.info("Received from DLT: " + input);  
  26.   } 

上面應(yīng)用,在topic-kl監(jiān)聽到消息會,會觸發(fā)運行時異常,然后監(jiān)聽器會嘗試三次調(diào)用,當(dāng)?shù)竭_最大的重試次數(shù)后。消息就會被丟掉重試死信隊列里面去。死信隊列的Topic的規(guī)則是,業(yè)務(wù)Topic名字+“.DLT”。如上面業(yè)務(wù)Topic的name為“topic-kl”,那么對應(yīng)的死信隊列的Topic就是“topic-kl.DLT”

文末結(jié)語

最近業(yè)務(wù)上使用了kafka用到了Spring-kafka,所以系統(tǒng)性的探索了下Spring-kafka的各種用法,發(fā)現(xiàn)了很多好玩很酷的特性,比如,一個注解開啟嵌入式的Kafka服務(wù)、像RPC調(diào)用一樣的發(fā)送\響應(yīng)語義調(diào)用、事務(wù)消息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。 

 

責(zé)任編輯:龐桂玉 來源: Hollis
相關(guān)推薦

2024-08-05 08:45:35

SpringKafkaSCRAM

2020-04-23 15:59:04

SpringKafka集群

2025-05-06 03:10:00

KEDASpringRocketMQ

2024-02-22 18:12:18

微服務(wù)架構(gòu)設(shè)計模式

2023-06-07 15:25:19

Kafka版本日志

2023-06-07 15:34:21

架構(gòu)層次結(jié)構(gòu)

2023-10-11 14:37:21

工具開發(fā)

2023-01-10 07:52:15

2009-06-15 15:57:21

Spring工作原理

2025-07-25 09:24:16

2017-04-26 11:00:34

Spring BootHelloWorld詳解

2018-11-02 15:45:41

Spring BootRedis數(shù)據(jù)庫

2020-07-14 11:00:12

Spring BootRedisJava

2022-09-29 09:19:04

線程池并發(fā)線程

2025-01-23 08:53:15

2020-09-02 17:28:26

Spring Boot Redis集成

2024-01-05 13:26:00

KafkaTopicSpring

2024-01-26 07:48:10

SpringKafka提升

2021-12-28 11:13:05

安全認(rèn)證 Spring Boot

2021-01-05 05:36:39

設(shè)計Spring Boot填充
點贊
收藏

51CTO技術(shù)棧公眾號

欧美人xxxx| 99久久综合国产精品| 在线播放日韩欧美| 在线观看岛国av| 午夜伦理大片视频在线观看| 91在线你懂得| 成人伊人精品色xxxx视频| 免费在线视频一区二区| 国产一区二区欧美| 日韩丝袜美女视频| 已婚少妇美妙人妻系列| 2024最新电影免费在线观看 | 456亚洲影院| 蜜桃av免费在线观看| 中文字幕一区二区三区中文字幕| 91久久精品网| 欧美激情亚洲天堂| 免费在线看黄色| 久久综合给合久久狠狠狠97色69| 亚洲专区中文字幕| 特级西西444www大胆免费看| 精品动漫一区| 久久精品影视伊人网| 色一情一交一乱一区二区三区| 日韩一区二区三区色| 欧美视频自拍偷拍| 精品视频一区二区在线| 国产高清在线a视频大全| 国产精品女主播av| 欧美精品一区二区三区在线看午夜 | 天天综合天天做天天综合| www亚洲国产| 国产youjizz在线| 91麻豆蜜桃一区二区三区| 成人黄视频免费| 国产精品欧美久久久久天天影视| 久久精品首页| 91av视频导航| av资源吧首页| 欧美午夜视频| 欧美成人一二三| 成年人二级毛片| 日韩精品免费| 中文字幕精品一区久久久久| 成年人在线观看av| 欧美日韩一本| 亚洲加勒比久久88色综合 | 国产成人在线看| 成人h视频在线观看播放| 在线观看国产精品视频| 日本欧美一区二区三区乱码| 国产999精品| 久久久精品福利| 欧美亚洲一区| 日本中文字幕成人| 午夜精品免费观看| 久久亚洲一区| 国产精品va在线播放| 日韩熟女一区二区| 肉丝袜脚交视频一区二区| 国产精品成人在线| 亚洲一区精品在线观看| 美女在线视频一区| 成人精品视频99在线观看免费| 一区二区日韩在线观看| 激情久久五月天| 亚洲tv在线观看| 色一情一乱一伦一区二区三欧美 | 日本一区二区三区视频在线观看 | 日本丰满少妇做爰爽爽| 久热精品在线| 国产欧美在线视频| 91丨九色丨丰满| 国产精品66部| 国产在线精品二区| 国产一级免费在线观看| 国产精品剧情在线亚洲| 特级西西人体www高清大胆| 欧美videos另类精品| 午夜婷婷国产麻豆精品| 成人精品视频一区二区| 视频91a欧美| 精品日韩在线观看| 可以直接看的无码av| 色综合中文网| 久久视频在线直播| 免费中文字幕视频| 美女91精品| 国产日韩精品在线| 亚洲精品一区二区三区区别| 久久婷婷国产综合国色天香| 欧美日韩三区四区| 国产鲁鲁视频在线观看特色| 午夜精品久久久久久久久久久 | 中文在线最新版地址| 在线观看亚洲成人| 国产精品无码自拍| 久久综合色占| 欧美大片在线看免费观看| 九九热在线视频播放| 麻豆国产欧美日韩综合精品二区| 国产福利久久精品| av电影在线观看| 亚洲在线视频免费观看| 五月婷婷深爱五月| 福利片一区二区| 亚洲欧美日韩精品久久亚洲区| 国产又粗又硬又长又爽| 国产精品日本| 51国偷自产一区二区三区| 色鬼7777久久| 亚洲国产一区视频| 99国产精品久久久久久| 亚洲激情播播| 久久久综合免费视频| 亚洲视频久久久| 91色porny蝌蚪| 无颜之月在线看| 欧美成人毛片| 国产视频在线观看一区二区| 欧美人妻一区二区| 韩国成人福利片在线播放| 日本一区二区三区四区高清视频| 欧美xxxx免费虐| 欧美一区二区三区小说| 日本猛少妇色xxxxx免费网站| 亚洲美女黄网| av一区和二区| a在线免费观看| 欧美日韩国产高清一区二区| 国产黄色网址在线观看| 亚洲美女视频在线免费观看| dy888夜精品国产专区| 激情影院在线观看| 在线播放/欧美激情| 精品亚洲aⅴ无码一区二区三区| 国产日韩欧美在线播放不卡| 成人h在线播放| 超碰caoporn久久| 欧美精品第1页| 少妇的滋味中文字幕bd| 可以看av的网站久久看| 另类欧美小说| 中文日产幕无线码一区二区| 精品国产乱码久久久久久夜甘婷婷| 小泽玛利亚一区二区免费| 久久精品免费看| 日韩欧美视频一区二区三区四区 | 成人黄色片网站| 欧美69xxx| 91精品国产综合久久精品图片| 亚洲色图27p| 老司机精品视频在线| 亚洲一区在线直播| 视频91a欧美| 欧美理论电影在线播放| 99久久国产免费| 一区二区三区在线观看欧美| 中文字幕免费高清在线| 婷婷综合网站| 99porn视频在线| av在线加勒比| 亚洲乱码国产乱码精品精| 日韩中文字幕二区| 日韩中文一区| 99精品国产一区二区| 亚洲综合在线做性| 成人做爽爽免费视频| 国产精品一区二区电影| 国产精品嫩草影院一区二区| 亚洲a在线播放| 99精品国产一区二区| 久久婷婷国产精品| 日本中文字幕视频一区| 久久久av一区| 午夜精品久久久久久久99| 亚洲一区成人在线| 公侵犯人妻一区二区三区| 全国精品久久少妇| 黄色一级片av| 日韩在线影视| 国产精品视频999| 天堂av最新在线| 精品一区二区亚洲| 在线观看一二三区| 一区二区三区日韩欧美精品| 你懂得在线视频| 久久精品国产亚洲高清剧情介绍| 超级碰在线观看| 奇米影视777在线欧美电影观看| 国产成人精品在线| 天堂8中文在线| 亚洲欧洲偷拍精品| www.久久成人| 色婷婷综合久久| 国产高清在线免费观看| 91老师片黄在线观看| 久久综合在线观看| 国产精品毛片在线看| 三级在线免费观看| 国产亚洲电影| eeuss一区二区三区| 精品123区| 97超级碰碰碰| 蜜芽在线免费观看| 亚洲欧美国产日韩天堂区| av手机免费看| 欧美性感一类影片在线播放| 免费无遮挡无码永久在线观看视频| 久久久久久黄色| 稀缺小u女呦精品呦| 捆绑调教美女网站视频一区| 91av资源网| 黄色在线一区| 2025韩国大尺度电影| 国产亚洲第一伦理第一区| 国产自产精品| 综合激情网...| 91在线|亚洲| 欧美三级电影网址| 国产91免费观看| 小视频免费在线观看| 欧美极品第一页| а天堂中文在线官网| xxx成人少妇69| 国产天堂在线| 日韩久久精品成人| 色婷婷中文字幕| 日韩美一区二区三区| 91精品视频免费在线观看| 91福利小视频| 欧美一区二区三区网站| 亚洲成年人影院| 久草资源在线视频| 亚洲美女区一区| 国产极品美女在线| 一区二区中文视频| 成人性生交大片免费看无遮挡aⅴ| 99国产精品视频免费观看| 韩国三级在线看| 国产99一区视频免费| 亚洲性图第一页| 国产91在线观看| 欧美做受高潮中文字幕| 国产91综合网| 亚洲精品乱码久久| 99热99精品| 中文字幕一二三四区| 91麻豆精品在线观看| xxxx日本免费| 日本一区二区三区高清不卡| 微拍福利一区二区| 国产精品久久影院| 999精品视频在线观看播放| 成人免费在线播放视频| 日韩成人毛片视频| 亚洲一区二区三区四区不卡| 国产亚洲精久久久久久无码77777| 亚洲一区二区三区中文字幕| 久久精品这里有| 午夜影视日本亚洲欧洲精品| youjizz在线视频| 91久久线看在观草草青青| 这里只有精品国产| 欧美精品丝袜中出| 亚洲精品18p| 国产丝袜一区视频在线观看| 国产午夜在线视频| 久久精品影视伊人网| 欧美人与牲禽动交com| 久久久久久久国产| 中文字幕在线直播| 国产有码在线一区二区视频| 日韩一区免费| 欧美日韩在线观看一区| 91久久夜色精品国产按摩| 青青在线视频免费观看| 国产日韩欧美一区在线| 国产一区二区在线免费播放| 国产乱子轮精品视频| 日本黄色动态图| 中文字幕av在线一区二区三区| 午夜成人亚洲理伦片在线观看| 一区二区三区在线看| 黄色一级视频免费看| 欧美一区二区大片| 亚洲av成人精品一区二区三区在线播放 | 国产日韩中文在线中文字幕| 99久久久精品免费观看国产| 亚洲专区视频| 樱空桃在线播放| 亚洲专区免费| 国产不卡的av| 久久女同性恋中文字幕| 人妻少妇精品一区二区三区| 欧美性猛交xxxx富婆弯腰| 国产精品一区二区免费视频| 日韩av影视在线| 黄色免费在线观看网站| 欧美亚洲在线观看| 精品一级视频| 日本一区二区精品| 黄色成人91| 久久久久久久久久毛片| 久久久久久综合| 久久久一二三区| 欧美日韩久久久| 青青色在线视频| 欧美激情伊人电影| 一区二区三区日本视频| 欧美精品亚洲精品| 尤物在线精品| 一区二区三区四区毛片| 久久九九全国免费| 日本亚洲欧美在线| 欧美一卡二卡三卡四卡| 国产精品免费播放| 69av在线播放| jizz性欧美23| 女女百合国产免费网站| 免费在线看一区| 精品久久久久久中文字幕人妻最新| 亚洲一级片在线观看| 国产精品女人久久久| 日韩中文字幕亚洲| 电影一区电影二区| 日本成人黄色| 欧美一级视频| bl动漫在线观看| 亚洲已满18点击进入久久| 国产三级第一页| 俺也去精品视频在线观看| 777午夜精品电影免费看| 美女黄毛**国产精品啪啪| 亚洲裸体俱乐部裸体舞表演av| 免费黄视频在线观看| 亚洲色图在线播放| 国产又粗又猛又爽又黄的视频一| 最近2019中文字幕大全第二页| 精品3atv在线视频| 日韩av电影免费观看| 免费视频一区二区三区在线观看| 朝桐光av一区二区三区| 性做久久久久久| 爽爽视频在线观看| 8050国产精品久久久久久| 偷拍一区二区| 黄色一级一级片| 欧美经典一区二区三区| 免费观看日批视频| 色爱av美腿丝袜综合粉嫩av | 欧美色电影在线| av电影在线观看| 91久久国产综合久久91精品网站| 999久久久亚洲| 激情成人在线观看| 亚洲成人免费看| 青青草视频在线免费观看| 日韩av男人的天堂| 日韩综合网站| 亚洲高清av一区二区三区| 一区二区三区在线观看网站| 内射后入在线观看一区| 欧美性在线观看| 欧美亚洲国产精品久久| 中文av字幕在线观看| 一区二区三区高清不卡| 天天综合在线视频| 国产不卡视频在线| 欧美激情成人| 四虎永久免费观看| 欧美午夜电影在线| 尤物网在线观看| http;//www.99re视频| 亚洲中字黄色| 天堂网avav| 日韩国产精品一区| 99re久久| 成人免费在线视频播放| 99re免费视频精品全部| 国产成人av免费| 欧美国产日韩一区二区| 天天做夜夜做人人爱精品| 黄色永久免费网站| 亚洲狠狠爱一区二区三区| 你懂的免费在线观看| 成人午夜在线视频一区| 99在线精品免费视频九九视| 国产视频三区四区| 欧美videos大乳护士334| 欧美大胆性生话| 8x8x华人在线| 国产亚洲1区2区3区| 精品国产av鲁一鲁一区| 日本sm极度另类视频| 一区二区中文| 丰腴饱满的极品熟妇| 日韩精品一区二区三区中文不卡| 欧美xo影院| av在线播放亚洲|