精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

深入理解Netty編解碼、粘包拆包、心跳機制

開發 前端
本篇重點來理解Netty的編解碼、粘包拆包、心跳機制等實現原理進行講解。

[[346582]]

前言
Netty系列文章:

  • BIO 、NIO 、AIO 總結
  • Unix網絡編程中的五種IO模型
  • 深入理解IO多路復用實現機制
  • Netty核心功能與線程模型

前面我們講了 BIO、NIO、AIO 等一些基礎知識和Netty核心功能與線程模型,本篇重點來理解Netty的編解碼、粘包拆包、心跳機制等實現原理進行講解。

Netty編解碼
Netty 涉及到編解碼的組件有 Channel 、 ChannelHandler 、 ChannelPipe 等,我們先大概了解下這幾個組件的作用。

ChannelHandler
ChannelHandler 充當來處理入站和出站數據的應用程序邏輯容器。例如,實現 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),你就可以接收入站事件和數據,這些數據隨后會被你的應用程序的業務邏輯處理。當你要給連接的客戶端發送響應時,也可以從 ChannelInboundHandler 刷數據。你的業務邏輯通常下在一個或者多個 ChannelInboundHandler 中。

ChannelOutboundHandler 原理一樣,只不過它是用來處理出站數據的。

ChannelPipeline
ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應用程序為例,如果有事件的運動方向是從客戶端到服務端,那么我們稱這些事件為出站的,即客戶端發送給服務端的數據會通過 pipeline 中的一系列 ChannelOutboundHandler (ChannelOutboundHandler 調用是從 tail 到 head 方向逐個調用每個 handler 的邏輯),并被這些 Hadnler 處理,反之稱為入站的,入站只調用 pipeline 里的 ChannelInboundHandler 邏輯(ChannelInboundHandler 調用是從 head 到 tail 方向 逐個調用每個 handler 的邏輯。)

編解碼器
當你通過Netty發送或者接受一個消息的時候,就將會發生一次數據轉換。入站消息會被解碼:從字節轉換為另一種格式(比如java對象);如果是出站消息,它會被編碼成字節。

Netty提供了一系列實用的編碼解碼器,它們都實現了ChannelInboundHadnler或者ChannelOutboundHandler接口。在這些類中, channelRead方法已經被重寫了。

以入站為例,對于每個從入站Channel讀取的消息,這個方法會被調用。隨后,它將調用由已知解碼器所提供的decode()方法進行解碼,并將已經解碼的字節轉發給ChannelPipeline中的下一個ChannelInboundHandler。

Netty提供了很多編解碼器,比如編解碼字符串的StringEncoder和StringDecoder,編解碼對象的ObjectEncoder和ObjectDecoder 等。

當然也可以通過集成ByteToMessageDecoder自定義編解碼器。

示例代碼
完整代碼在 Github :

https://github.com/Niuh-Study/niuh-netty.git

對應的包 com.niuh.netty.codec

Netty粘包拆包
TCP 粘包拆包是指發送方發送的若干包數據到接收方接收時粘成一包或某個數據包被拆開接收。如下圖所示,client 發送了兩個數據包 D1 和 D2,但是 server 端可能會收到如下幾種情況的數據。

程序演示
首先準備客戶端負責發送消息,連續發送5次消息,代碼如下:

  1. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  2.  for (int i = 1; i <= 5; i++) { 
  3.      ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + " ", Charset.forName("utf-8")); 
  4.         ctx.writeAndFlush(byteBuf); 
  5.     } 

然后服務端作為接收方,接收并且打印結果:

  1. // count 變量,用于計數 
  2. private int count
  3.  
  4. @Override 
  5. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  6.  System.out.println("服務器讀取線程 " + Thread.currentThread().getName()); 
  7.  
  8.     ByteBuf buf = (ByteBuf) msg; 
  9.     byte[] bytes = new byte[buf.readableBytes()]; 
  10.     // 把ByteBuf的數據讀到bytes數組中 
  11.     buf.readBytes(bytes); 
  12.     String message = new String(bytes, Charset.forName("utf-8")); 
  13.     System.out.println("服務器接收到數據:" + message); 
  14.     // 打印接收的次數 
  15.     System.out.println("接收到的數據量是:" + (++this.count)); 

啟動服務端,再啟動兩個客戶端發送消息,服務端的控制臺可以看到這樣:

粘包的問題其實是隨機的,所以每次結果都不太一樣。

完整代碼在 Github :

https://github.com/Niuh-Study/niuh-netty.git

對應的包 com.niuh.splitpacket0

為什么出現粘包現象?
TCP 是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有成對的 socket,因此,發送端為了將多個發送給接收端的包,更有效的發送給對方,使用了優化方法(Nagle算法),將多次間隔較少且數據量小的數據,合并成一個大的數據塊,然后進行封包,這樣做雖然提供了效率,但是接收端就難以分辨出完整的數據包了,因為面向流的通信是無消息保護邊界的。

如何理解TCP是面向字節流的

  1. 應用程序和 TCP 的交互是一次一個數據塊(大小不等),但 TCP 把應用程序交下來的數據僅僅看成是一連串的無結構的字節流。TCP 并不知道所傳送的字節流的含義;
  2. 因此 TCP 不保證接收方應用程序所收到的數據塊和發送方應用程序所發出的數據塊具有對應大小的關系(例如,發送方應用程序交給發送方的 TCP 共 10 個數據塊,但接收方的 TCP 可能只用了 4 個就把收到的字節流交付上層的應用程序);
  3. 同時,TCP 不關心應用進程一次把多長的報文發送到 TCP 的緩存中,而是根據對方給出的窗口值和當前網絡阻塞的程度來決定一個報文段應包含多少個字節(UDP 發送的報文長度是應用進程給出的)。如果應用進程傳送到 TCP 緩存的數據塊太長,TCP 就可以把它劃分短一點再傳送。如果應用程序一次只發來一個字節,TCP 也可以等待積累有足夠多的字節后再構成報文段發送出去。

TCP發送報文一般是 3 個時機

  1. 緩沖區數據達到,最大報文長度 MSS;
  2. 由發送端的應用進程指明要求發送報文段,即 TCP 支持的推送(push)操作;
  3. 當發送方的一個計時器期限到了,即使長度不超過 MSS,也發送。

解決方案
一般解決粘包拆包問題有 4 種辦法
1.在數據的末尾添加特殊的符號標識數據包的邊界。通常會加\n、\r、\t或者其他的符號
學習 HTTP、FTP 等,使用回車換行符號;

2.在數據的頭部聲明數據的長度,按長度獲取數據
將消息分為 head 和 body,head 中包含 body 長度的字段,一般 head 的第一個字段使用 int 值來表示 body 長度;

3.規定報文的長度,不足則補空位。讀取時按規定好的長度來讀取。比如 100 字節,如果不夠就補空格;
4.使用更復雜的應用層協議。
使用LineBasedFrameDecoder
LineBasedFrameDecoder 是Netty內置的一個解碼器,對應的編碼器是 LineEncoder。

原理是上面講的第一種思路,在數據末尾加上特殊符號以標識邊界。默認是使用換行符\n。

用法很簡單,發送方加上編碼器:

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  //添加編碼器,使用默認的符號\n,字符集是UTF-8 
  4.     ch.pipeline().addLast(new LineEncoder(LineSeparator.DEFAULT, CharsetUtil.UTF_8)); 
  5.     ch.pipeline().addLast(new TcpClientHandler()); 

接收方加上解碼器:

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  //解碼器需要設置數據的最大長度,我這里設置成1024 
  4.  ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 
  5.  //給pipeline管道設置業務處理器 
  6.  ch.pipeline().addLast(new TcpServerHandler()); 

然后在發送方,發送消息時在末尾加上標識符:

  1. @Override 
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  3.     for (int i = 1; i <= 5; i++) { 
  4.   //在末尾加上默認的標識符\n 
  5.      ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED, Charset.forName("utf-8")); 
  6.         ctx.writeAndFlush(byteBuf); 
  7.  } 

于是我們再次啟動服務端和客戶端,在服務端的控制臺可以看到:

在數據的末尾添加特殊的符號標識數據包的邊界,粘包、拆包的問題就得到解決了。

注意:數據末尾一定是分隔符,分隔符后面不要再加上數據,否則會當做下一條數據的開始部分。下面是錯誤演示:

  1. @Override 
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  3.     for (int i = 1; i <= 5; i++) { 
  4.   //在末尾加上默認的標識符\n 
  5.      ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED + "[我是分隔符后面的字符串]", Charset.forName("utf-8")); 
  6.         ctx.writeAndFlush(byteBuf); 
  7.  } 

服務端的控制臺就會看到這樣的打印信息:

使用自定義長度幀解碼器
使用這個解碼器解決粘包問題的原理是上面講的第二種,在數據的頭部聲明數據的長度,按長度獲取數據。這個解碼器構造器需要定義5個參數,相對較為復雜一點,先看參數的解釋:

  • maxFrameLength 發送數據包的最大長度
  • lengthFieldOffset 長度域的偏移量。長度域位于整個數據包字節數組中的開始下標。
  • lengthFieldLength 長度域的字節數長度。長度域的字節數長度。
  • lengthAdjustment 長度域的偏移量矯正。如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。
  • initialBytesToStrip 丟棄的起始字節數。丟棄處于此索引值前面的字節。

前面三個參數比較簡單,可以用下面這張圖進行演示:

矯正偏移量是什么意思呢?

是假設你的長度域設置的值除了包括有效數據的長度還有其他域的長度包含在里面,那么就要設置這個值進行矯正,否則解碼器拿不到有效數據。

丟棄的起始字節數。這個比較簡單,就是在這個索引值前面的數據都丟棄,只要后面的數據。一般都是丟棄長度域的數據。當然如果你希望得到全部數據,那就設置為0。

下面就在消息接收端使用自定義長度幀解碼器,解決粘包的問題:

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  //數據包最大長度是1024 
  4.     //長度域的起始索引是0 
  5.     //長度域的數據長度是4 
  6.     //矯正值為0,因為長度域只有 有效數據的長度的值 
  7.     //丟棄數據起始值是4,因為長度域長度為4,我要把長度域丟棄,才能得到有效數據 
  8.     ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); 
  9.     ch.pipeline().addLast(new TcpClientHandler()); 

接著編寫發送端代碼,根據解碼器的設置,進行發送:

  1. @Override 
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  3.  for (int i = 1; i <= 5; i++) { 
  4.      String str = "msg No" + i; 
  5.         ByteBuf byteBuf = Unpooled.buffer(1024); 
  6.         byte[] bytes = str.getBytes(Charset.forName("utf-8")); 
  7.         //設置長度域的值,為有效數據的長度 
  8.         byteBuf.writeInt(bytes.length); 
  9.         //設置有效數據 
  10.         byteBuf.writeBytes(bytes); 
  11.         ctx.writeAndFlush(byteBuf); 
  12.     } 

然后啟動服務端,客戶端,我們可以看到控制臺打印結果:

可以看到,利用自定義長度幀解碼器解決了粘包問題。

使用Google Protobuf編解碼器
Netty官網上是明顯寫著支持Google Protobuf的,如下圖所示:

Google Protobuf是什么
官網的原話: Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

翻譯一下:Protocol buffers是Google公司的與語言無關、平臺無關、可擴展的序列化數據的機制,類似XML,但是更小、更快、更簡單。您只需定義一次數據的結構化方式,然后就可以使用特殊生成的源代碼,輕松地將結構化數據寫入和讀取到各種數據流中,并支持多種語言。

在rpc或tcp通信等很多場景都可以使用。通俗來講,如果客戶端和服務端使用的是不同的語言,那么在服務端定義一個數據結構,通過protobuf轉化為字節流,再傳送到客戶端解碼,就可以得到對應的數據結構。這就是protobuf神奇的地方。并且,它的通信效率極高,“一條消息數據,用protobuf序列化后的大小是json的10分之一,xml格式的20分之一,是二進制序列化的10分之一”。

Google Protobuf 官網 :

https://developers.google.cn/protocol-buffers/

為什么使用Google Protobuf
在一些場景下,數據需要在不同的平臺,不同的程序中進行傳輸和使用,例如某個消息是用C++程序產生的,而另一個程序是用java寫的,當前者產生一個消息數據時,需要在不同的語言編寫的不同的程序中進行操作,如何將消息發送并在各個程序中使用呢?這就需要設計一種消息格式,常用的就有json和xml,protobuf出現的則較晚。

Google Protobuf優點

  • protobuf 的主要優點是簡單,快;
  • protobuf將數據序列化為二進制之后,占用的空間相當小,基本僅保留了數據部分,而xml和json會附帶消息結構在數據中;
  • protobuf使用起來很方便,只需要反序列化就可以了,而不需要xml和json那樣層層解析。

Google Protobuf安裝
因為我這里是Mac系統,Mac下面除了用dmg、pkg來安裝軟件外,比較方便的還有用brew命令進行安裝 , 它能幫助安裝其他所需要的依賴,從而減少不必要的麻煩。

安裝最新版本的protoc

1.從github上下載 protobuf3
https://github.com/protocolbuffers/protobuf/releases/tag/v3.13.0

Mac系統選擇第一個,如下圖所示:

2.下載成功后,切換到root用戶

  1. sudo -i 

3.解壓壓縮包,并進入你自己解壓的目錄

  1. tar xyf protobuf-all-3.13.0.tar.gz 
  2. cd protobuf-3.13.0 

4.設置編譯目錄

  1. ./configure --prefix=/usr/local/protobuf 

5.安裝

  1. make 
  2. make install 

6.配置環境變量
第一步:找到.bash_profile文件并編輯

  1. cd ~ 
  2. open .bash_profile 

第二步:然后在打開的bash_profile文件末尾添加如下配置:

  1. export PROTOBUF=/usr/local/protobuf  
  2. export PATH=$PROTOBUF/bin:$PATH 

第三步:source一下使文件生效

  1. source .bash_profile 

7.測試安裝結果

  1. protoc --version 

使用Google Protobuf
以下步驟參考Google Protobuf的github項目的指南。

https://github.com/protocolbuffers/protobuf/tree/master/java

第一步:添加maven依賴

  1. <dependency> 
  2.   <groupId>com.google.protobuf</groupId> 
  3.   <artifactId>protobuf-java</artifactId> 
  4.   <version>3.11.0</version> 
  5. </dependency> 

第二步:編寫proto文件Message.proto

如何編寫.proto文件的相關文檔說明,可以去官網查看 下面寫一個例子,請看示范:

  1. syntax = "proto3"; //版本 
  2. option java_outer_classname = "MessagePojo";//生成的外部類名,同時也是文件名 
  3.  
  4. message Message { 
  5.     int32 id = 1;//Message類的一個屬性,屬性名稱是id,序號為1 
  6.     string content = 2;//Message類的一個屬性,屬性名稱是content,序號為2 

第三步:使用編譯器,通過.proto文件生成代碼

在執行上面的安裝步驟后,進入到 bin 目錄下,可以看到一個可執行文件 protoc

  1. cd /usr/local/protobuf/bin/ 

然后復制前面寫好的Message.proto文件到此目錄下,如圖所示:

輸入命令:

  1. protoc --java_out=. Message.proto 

然后就可以看到生成的MessagePojo.java文件。最后把文件復制到IDEA項目中。

第四步:在發送端添加編碼器,在接收端添加解碼器

客戶端添加編碼器,對消息進行編碼。

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  //在發送端添加Protobuf編碼器 
  4.     ch.pipeline().addLast(new ProtobufEncoder()); 
  5.  ch.pipeline().addLast(new TcpClientHandler()); 

服務端添加解碼器,對消息進行解碼。

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  //添加Protobuf解碼器,構造器需要指定解碼具體的對象實例 
  4.  ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance())); 
  5.  //給pipeline管道設置處理器 
  6.  ch.pipeline().addLast(new TcpServerHandler()); 

第五步:發送消息

客戶端發送消息,代碼如下:

  1. @Override 
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  3.  //使用的是構建者模式進行創建對象 
  4.  MessagePojo.Message message = MessagePojo 
  5.       .Message 
  6.             .newBuilder() 
  7.             .setId(1) 
  8.             .setContent("一角錢,起飛~"
  9.             .build(); 
  10.     ctx.writeAndFlush(message); 

服務端接收到數據,并且打印:

  1. @Override 
  2. protected void channelRead0(ChannelHandlerContext ctx, MessagePojo.Message messagePojo) throws Exception { 
  3.     System.out.println("id:" + messagePojo.getId()); 
  4.     System.out.println("content:" + messagePojo.getContent()); 

測試結果正確:

分析Protocol的粘包、拆包
實際上直接使用Protocol編解碼器還是存在粘包問題的。

證明一下,發送端循環一百次發送100條"一角錢,起飛"的消息,請看發送端代碼演示:

  1. @Override 
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  3.  for (int i = 1; i <= 100; i++) { 
  4.   MessagePojo.Message message = MessagePojo 
  5.       .Message 
  6.             .newBuilder() 
  7.             .setId(i) 
  8.             .setContent(i + "號一角錢,起飛~"
  9.             .build(); 
  10.       ctx.writeAndFlush(message); 
  11.  } 

這時,啟動服務端,客戶端后,可能只有打印幾條消息或者在控制臺看到如下錯誤:

com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.

意思是:分析protocol消息時,輸入意外地在字段中間結束。這可能意味著輸入被截斷,或者嵌入的消息誤報了自己的長度。

其實就是粘包問題,多條數據合并成一條數據了,導致解析出現異常。

解決Protocol的粘包、拆包問題
只需要在發送端加上編碼器 ProtobufVarint32LengthFieldPrepender

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); 
  4.     ch.pipeline().addLast(new ProtobufEncoder()); 
  5.     ch.pipeline().addLast(new TcpClientHandler()); 

接收方加上解碼器 ProtobufVarint32FrameDecoder

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.  ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); 
  4.  ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance())); 
  5.  //給pipeline管道設置處理器 
  6.  ch.pipeline().addLast(new TcpServerHandler()); 

然后再啟動服務端和客戶端,我們可以看到正常了~

ProtobufVarint32LengthFieldPrepender 編碼器的工作如下:

  1.  * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes) 
  2.  * +---------------+               +--------+---------------+ 
  3.  * | Protobuf Data |-------------->| Length | Protobuf Data | 
  4.  * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  | 
  5.  * +---------------+               +--------+---------------+ 
  6. @Sharable 
  7. public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> { 
  8.     @Override 
  9.     protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 
  10.         int bodyLen = msg.readableBytes(); 
  11.         int headerLen = computeRawVarint32Size(bodyLen); 
  12.         //寫入請求頭,消息長度 
  13.         out.ensureWritable(headerLen + bodyLen); 
  14.         writeRawVarint32(out, bodyLen); 
  15.         //寫入數據 
  16.         out.writeBytes(msg, msg.readerIndex(), bodyLen); 
  17.     } 

ProtobufVarint32FrameDecoder 解碼器的工作如下:

  1. * BEFORE DECODE (302 bytes)       AFTER DECODE (300 bytes) 
  2. * +--------+---------------+      +---------------+ 
  3. * | Length | Protobuf Data |----->| Protobuf Data | 
  4. * | 0xAC02 |  (300 bytes)  |      |  (300 bytes)  | 
  5. * +--------+---------------+      +---------------+ 
  6. ublic class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder { 
  7.    @Override 
  8.    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
  9.        //標記讀取的下標位置 
  10.        in.markReaderIndex(); 
  11.        //獲取讀取的下標位置 
  12.        int preIndex = in.readerIndex(); 
  13.        //解碼,獲取消息的長度,并且移動讀取的下標位置 
  14.        int length = readRawVarint32(in); 
  15.        //比較解碼前和解碼后的下標位置,如果相等。表示字節數不夠讀取,跳到下一輪 
  16.        if (preIndex == in.readerIndex()) { 
  17.            return
  18.        } 
  19.        //如果消息的長度小于0,拋出異常 
  20.        if (length < 0) { 
  21.            throw new CorruptedFrameException("negative length: " + length); 
  22.        } 
  23.        //如果不夠讀取一個完整的數據,reset還原下標位置。 
  24.        if (in.readableBytes() < length) { 
  25.            in.resetReaderIndex(); 
  26.        } else { 
  27.            //否則,把數據寫入到out,接收端就拿到了完整的數據了 
  28.            out.add(in.readRetainedSlice(length)); 
  29.        } 

總結:

  • 發送端通過編碼器在發送的時候在消息體前面加上一個描述數據長度的數據塊。
  • 接收方通過解碼器先獲取描述數據長度的數據塊,知道完整數據的長度,然后根據數據長度獲取一條完整的數據。

Netty心跳檢測機制
何為心跳
所謂心跳, 即在 TCP 長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 TCP 連接的有效性.

注:心跳包還有另一個作用,經常被忽略,即:一個連接如果長時間不用,防火墻或者路由器就會斷開該連接。

在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 看下它的構造器:

  1. public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { 
  2.  this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); 

三個參數的含義如下:

  • readerIdleTimeSeconds: 讀超時。即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件。
  • writerIdleTimeSeconds: 寫超時。 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件。
  • allIdleTimeSeconds: 讀/寫超時。 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件。

注:這三個參數默認的時間單位是秒。若需要指定其他時間單位,可以使用另一個構造方法:

  1. public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { 
  2.  this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); 

要實現Netty服務端心跳檢測機制需要在服務器端的ChannelInitializer中加入如下的代碼:

  1. pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); 

Netty心跳源碼分析
初步地看下IdleStateHandler源碼,先看下IdleStateHandler中的channelRead方法:

紅框代碼其實表示該方法只是進行了透傳,不做任何業務邏輯處理,讓channelPipe中的下一個handler處理channelRead方法;

我們再看看channelActive方法:

這里有個initialize的方法,這是IdleStateHandler的精髓,接著探究:

這邊會觸發一個Task,ReaderIdleTimeoutTask,這個task里的run方法源碼是這樣的:

第一個紅框代碼是用當前時間減去最后一次channelRead方法調用的時間,假如這個結果是6s,說明最后一次調用channelRead已經是6s 之前的事情了,你設置的是5s,那么nextDelay則為-1,說明超時了,那么第二個紅框代碼則會觸發下一個handler的 userEventTriggered方法:

如果沒有超時則不觸發userEventTriggered方法。

Netty心跳檢測代碼示例
服務端

  1. package com.niuh.netty.heartbeat; 
  2.  
  3. import io.netty.bootstrap.ServerBootstrap; 
  4. import io.netty.channel.ChannelFuture; 
  5. import io.netty.channel.ChannelInitializer; 
  6. import io.netty.channel.ChannelPipeline; 
  7. import io.netty.channel.EventLoopGroup; 
  8. import io.netty.channel.nio.NioEventLoopGroup; 
  9. import io.netty.channel.socket.SocketChannel; 
  10. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  11. import io.netty.handler.codec.string.StringDecoder; 
  12. import io.netty.handler.codec.string.StringEncoder; 
  13. import io.netty.handler.timeout.IdleStateHandler; 
  14.  
  15. import java.util.concurrent.TimeUnit; 
  16.  
  17. public class HeartBeatServer { 
  18.  
  19.     public static void main(String[] args) throws Exception { 
  20.         EventLoopGroup boss = new NioEventLoopGroup(); 
  21.         EventLoopGroup worker = new NioEventLoopGroup(); 
  22.         try { 
  23.             ServerBootstrap bootstrap = new ServerBootstrap(); 
  24.             bootstrap.group(boss, worker) 
  25.                     .channel(NioServerSocketChannel.class) 
  26.                     .childHandler(new ChannelInitializer<SocketChannel>() { 
  27.                         @Override 
  28.                         protected void initChannel(SocketChannel ch) throws Exception { 
  29.                             ChannelPipeline pipeline = ch.pipeline(); 
  30.                             pipeline.addLast("decoder", new StringDecoder()); 
  31.                             pipeline.addLast("encoder", new StringEncoder()); 
  32.                             //IdleStateHandler的readerIdleTime參數指定超過3秒還沒收到客戶端的連接, 
  33.                             //會觸發IdleStateEvent事件并且交給下一個handler處理,下一個handler必須 
  34.                             //實現userEventTriggered方法處理對應事件 
  35.                             pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); 
  36.                             pipeline.addLast(new HeartBeatServerHandler()); 
  37.                         } 
  38.                     }); 
  39.             System.out.println("netty server start。。"); 
  40.             ChannelFuture future = bootstrap.bind(9000).sync(); 
  41.             future.channel().closeFuture().sync(); 
  42.         } catch (Exception e) { 
  43.             e.printStackTrace(); 
  44.         } finally { 
  45.             worker.shutdownGracefully(); 
  46.             boss.shutdownGracefully(); 
  47.         } 
  48.     } 

服務端回調處理類

  1. package com.niuh.netty.heartbeat; 
  2.  
  3. import io.netty.channel.ChannelHandlerContext; 
  4. import io.netty.channel.SimpleChannelInboundHandler; 
  5. import io.netty.handler.timeout.IdleStateEvent; 
  6.  
  7. public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> { 
  8.  
  9.     int readIdleTimes = 0; 
  10.  
  11.     @Override 
  12.     protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { 
  13.         System.out.println(" ====== > [server] message received : " + s); 
  14.         if ("Heartbeat Packet".equals(s)) { 
  15.             ctx.channel().writeAndFlush("ok"); 
  16.         } else { 
  17.             System.out.println(" 其他信息處理 ... "); 
  18.         } 
  19.     } 
  20.  
  21.     @Override 
  22.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
  23.         IdleStateEvent event = (IdleStateEvent) evt; 
  24.  
  25.         String eventType = null
  26.         switch (event.state()) { 
  27.             case READER_IDLE: 
  28.                 eventType = "讀空閑"
  29.                 readIdleTimes++; // 讀空閑的計數加1 
  30.                 break; 
  31.             case WRITER_IDLE: 
  32.                 eventType = "寫空閑"
  33.                 // 不處理 
  34.                 break; 
  35.             case ALL_IDLE: 
  36.                 eventType = "讀寫空閑"
  37.                 // 不處理 
  38.                 break; 
  39.         } 
  40.  
  41.  
  42.  
  43.         System.out.println(ctx.channel().remoteAddress() + "超時事件:" + eventType); 
  44.         if (readIdleTimes > 3) { 
  45.             System.out.println(" [server]讀空閑超過3次,關閉連接,釋放更多資源"); 
  46.             ctx.channel().writeAndFlush("idle close"); 
  47.             ctx.channel().close(); 
  48.         } 
  49.     } 
  50.  
  51.     @Override 
  52.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  53.         System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); 
  54.     } 

客戶端

  1. package com.niuh.netty.heartbeat; 
  2.  
  3. import io.netty.bootstrap.Bootstrap; 
  4. import io.netty.channel.*; 
  5. import io.netty.channel.nio.NioEventLoopGroup; 
  6. import io.netty.channel.socket.SocketChannel; 
  7. import io.netty.channel.socket.nio.NioSocketChannel; 
  8. import io.netty.handler.codec.string.StringDecoder; 
  9. import io.netty.handler.codec.string.StringEncoder; 
  10.  
  11. import java.util.Random; 
  12.  
  13. public class HeartBeatClient { 
  14.     public static void main(String[] args) throws Exception { 
  15.         EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); 
  16.         try { 
  17.             Bootstrap bootstrap = new Bootstrap(); 
  18.             bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) 
  19.                     .handler(new ChannelInitializer<SocketChannel>() { 
  20.                         @Override 
  21.                         protected void initChannel(SocketChannel ch) throws Exception { 
  22.                             ChannelPipeline pipeline = ch.pipeline(); 
  23.                             pipeline.addLast("decoder", new StringDecoder()); 
  24.                             pipeline.addLast("encoder", new StringEncoder()); 
  25.                             pipeline.addLast(new HeartBeatClientHandler()); 
  26.                         } 
  27.                     }); 
  28.  
  29.             System.out.println("netty client start。。"); 
  30.             Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel(); 
  31.             String text = "Heartbeat Packet"
  32.             Random random = new Random(); 
  33.             while (channel.isActive()) { 
  34.                 int num = random.nextInt(10); 
  35.                 Thread.sleep(2 * 1000); 
  36.                 channel.writeAndFlush(text); 
  37.             } 
  38.         } catch (Exception e) { 
  39.             e.printStackTrace(); 
  40.         } finally { 
  41.             eventLoopGroup.shutdownGracefully(); 
  42.         } 
  43.     } 
  44.  
  45.     static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { 
  46.  
  47.         @Override 
  48.         protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { 
  49.             System.out.println(" client received :" + msg); 
  50.             if (msg != null && msg.equals("idle close")) { 
  51.                 System.out.println(" 服務端關閉連接,客戶端也關閉"); 
  52.                 ctx.channel().closeFuture(); 
  53.             } 
  54.         } 
  55.     } 

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-netty.git

責任編輯:姜華 來源: 今日頭條
相關推薦

2025-04-10 10:15:30

2021-07-15 10:35:16

NettyTCPJava

2021-10-08 09:38:57

NettyChannelHand架構

2024-12-19 11:00:00

TCP網絡通信粘包

2023-10-19 11:12:15

Netty代碼

2024-12-04 13:52:30

2019-10-25 00:32:12

TCP粘包Netty

2019-10-17 11:06:32

TCP粘包通信協議

2011-03-02 12:33:00

JavaScript

2022-05-06 16:18:00

Block和 C++OC 類lambda

2010-07-26 11:27:58

Perl閉包

2020-01-06 15:23:41

NettyTCP粘包

2021-03-09 22:30:47

TCP拆包協議

2017-01-13 22:42:15

iosswift

2020-09-30 14:07:05

Kafka心跳機制API

2022-04-28 08:38:09

TCP協議解碼器

2020-11-02 13:06:42

Java裝箱拆箱

2017-05-03 17:00:16

Android渲染機制

2023-10-13 13:30:00

MySQL鎖機制

2012-05-31 02:54:07

HadoopJava
點贊
收藏

51CTO技術棧公眾號

亚洲综合在线电影| 日本国产在线| 精品9999| 国产亚洲aⅴaaaaaa毛片| 亚洲一级片免费| 影音先锋在线视频| 91亚洲大成网污www| 国产99久久精品一区二区| 激情高潮到大叫狂喷水| 亚洲视频精选| 色成年激情久久综合| 天堂v在线视频| 欧日韩在线视频| 奇米精品一区二区三区在线观看| 久久综合九色九九| 欧美一区二区三区成人精品| 国产成人免费9x9x人网站视频| 亚洲美女视频一区| 欧美一区二区三区在线免费观看| 国产女人爽到高潮a毛片| 国产视频一区欧美| 欧美精品一本久久男人的天堂| 精品人妻无码一区二区三区| 欧美精品影院| 欧美日产在线观看| 黄色片一级视频| 超碰在线cao| 亚洲人精品一区| 日本午夜精品电影| 国产又爽又黄网站亚洲视频123| 国产在线观看一区二区| 国产成人精品久久久| 日本黄色片视频| 欧美成人有码| 久久精品国产一区| 国产真人真事毛片视频| 亚洲精品小区久久久久久| 91精品国产色综合久久ai换脸| 激情视频综合网| 中文字幕资源网在线观看免费| 亚洲一区中文日韩| 超级碰在线观看| 日韩毛片久久久| 国产喷白浆一区二区三区| 国产一区喷水| 色婷婷激情五月| 成人国产视频在线观看| 俄罗斯精品一区二区| www.黄色av| 国产精品伊人色| 亚洲一区亚洲二区| 国产特级黄色片| 国产一区二区电影| 91热福利电影| 精品人妻一区二区三区含羞草 | 欧美一级视频在线播放| 日本性爱视频在线观看| 亚洲一区二区在线观看视频| 老司机午夜网站| а√天堂资源地址在线下载| 亚洲特黄一级片| 青草全福视在线| 亚洲h片在线看| 亚洲永久精品国产| 日韩 欧美 视频| 精品三级久久| 色综合久久88色综合天天| 老熟妇仑乱视频一区二区| 人人鲁人人莫人人爱精品| 91福利在线播放| 亚洲一级免费观看| dy888夜精品国产专区| 亚洲欧美偷拍视频| 久久亚洲欧洲| 国产精品成人av性教育| 中文字幕黄色av| 国产在线麻豆精品观看| 99久久精品久久久久久ai换脸| 高清毛片aaaaaaaaa片| 成人av电影在线观看| 久久精品日韩| 男人久久精品| 中文字幕一区二区三区在线不卡| 国产性生活免费视频| 丰乳肥臀在线| 欧美色xxxx| 亚欧激情乱码久久久久久久久| 日韩一级特黄| 亚洲电影在线看| 中文字幕在线观看免费高清| 小说区亚洲自拍另类图片专区| 欧美精品video| 国产成人无码av| 韩国三级中文字幕hd久久精品| 动漫美女被爆操久久久| 国产在线视频福利| 亚洲人成网站影音先锋播放| 免费av手机在线观看| 午夜无码国产理论在线| 欧美一区二区精品在线| 久久久久久九九九九九| 911精品美国片911久久久| 午夜精品久久久久久久男人的天堂| 国产真人无遮挡作爱免费视频| 国内一区二区视频| 欧美大陆一区二区| 99视频免费在线观看| 欧美性猛交xxxxx免费看| 色网站在线视频| 国产成人精品免费视| 久久久久999| 日本免费观看视| 国产一区999| 蜜桃传媒视频第一区入口在线看| 麻豆av在线导航| 欧美性猛交xxxx| 精品国产aⅴ一区二区三区东京热 久久久久99人妻一区二区三区 | 欧美激情偷拍| 国产不卡av在线| 亚洲乱码国产乱码精品精软件| 91美女视频网站| 国产性生活免费视频| 成人在线视频免费| 日韩精品免费在线播放| 国产一级视频在线| 韩国精品免费视频| 亚洲精品永久www嫩草| 国产美女精品写真福利视频| 日韩一区国产二区欧美三区| 国产又黄又粗的视频| 亚洲欧美日韩精品一区二区| 99理论电影网| 少妇视频在线| 宅男在线国产精品| 少妇视频一区二区| 免费黄网站欧美| 欧美日韩精品免费观看| 激情国产在线| 亚洲国产精品嫩草影院久久| 亚洲国产美女视频| 精品在线免费观看| 亚洲高清视频一区| 素人啪啪色综合| 亚洲精品色婷婷福利天堂| 国产黄色片免费看| 不卡区在线中文字幕| 国产成a人亚洲精v品在线观看| 精品一区二区三区在线观看视频| 视频一区视频二区国产精品| 亚洲精品毛片一区二区三区| 久久精品人人做| 青青在线视频免费| 日本欧美国产| 成人疯狂猛交xxx| 午夜视频在线看| 欧美视频一区二| 免费成人美女女在线观看| 美女mm1313爽爽久久久蜜臀| 亚洲一区精品视频| 91亚洲精品在看在线观看高清| 久久精品国亚洲| 国产一区二区自拍视频| 亚洲欧美日韩中文字幕一区二区三区 | 69久久夜色精品国产69| 色哟哟国产精品色哟哟| 精品美女永久免费视频| 日本丰满少妇裸体自慰| 久久久夜夜夜| 亚洲人成人77777线观看| 欧美最新精品| 精品国产一区久久久| 国产肥老妇视频| 亚洲精品视频在线看| 无码人妻精品一区二区三| 国产视频亚洲| 亚洲一区二区高清视频| 天堂av一区| 5278欧美一区二区三区| yw在线观看| 欧美大胆人体bbbb| 国产区一区二区三| 国产精品网站在线观看| 丰满饥渴老女人hd| 久久精品一区二区国产| 一区二区精品国产| 国产日韩三级| 国产精品久久久久久av福利软件| 黄网站在线免费| 日韩av在线网页| 亚洲手机在线观看| 精品成人av一区| 少妇愉情理伦三级| 福利一区在线观看| 99久久国产宗和精品1上映| 亚洲精品2区| 免费成人看片网址| 久久九九精品视频| 日本高清久久天堂| 丝袜在线观看| 一本色道久久88综合日韩精品| www.好吊色| 欧美亚洲一区二区在线观看| 国产性一乱一性一伧一色| 国产婷婷色一区二区三区| 性xxxxxxxxx| 蜜桃av噜噜一区| 日本丰满少妇xxxx| 天天av综合| 欧美深深色噜噜狠狠yyy| 中文字幕一区二区三区日韩精品| 国产精品www色诱视频| 金瓶狂野欧美性猛交xxxx| 日韩中文在线中文网在线观看| 亚洲卡一卡二卡三| 91精品视频网| 波多野结衣一区二区在线| 亚洲国产一区二区视频| 精品国产国产综合精品| 久久人人爽人人爽| 无码人妻丰满熟妇啪啪网站| 免费黄网站欧美| 丁香啪啪综合成人亚洲| 亚洲精品系列| 日本久久久网站| 中文字幕乱码亚洲无线精品一区| 欧美亚洲国产免费| 日韩电影不卡一区| 懂色av一区二区三区在线播放| 亚洲男男av| 国产精品久久久精品| 在线免费日韩片| 91精品国产高清久久久久久91| 在线中文免费视频| 久久久国产精品视频| 日本在线免费播放| 在线播放国产精品| 成年人视频免费在线观看| 亚洲男人第一av网站| 偷拍自拍在线| 亚洲精品美女久久久久| 亚洲日本国产精品| 日韩av在线网| 男同在线观看| 亚洲品质视频自拍网| 欧美女同网站| 亚洲乱码一区二区| 欧美婷婷久久五月精品三区| 日韩激情在线视频| 手机看片福利在线| 日韩av一卡二卡| 性xxxx视频播放免费| 精品视频偷偷看在线观看| 日韩精品系列| 一区二区欧美在线| 97电影在线看视频| 精品国产一区二区三区在线观看| 日本最黄一级片免费在线| 色婷婷久久av| 午夜影院免费在线| 欧美激情一区二区三区在线视频观看| 国产乱码在线| 45www国产精品网站| 亚洲第一二三四区| 国产在线不卡精品| 亚洲国产视频二区| 国产在线一区二区三区欧美| 午夜先锋成人动漫在线| 日韩一本精品| 一区二区在线| 日韩av在线第一页| 视频在线观看一区二区三区| 午夜欧美福利视频| 国产乱淫av一区二区三区| 免费看91视频| 久久影院午夜论| 国产精品久久久久久成人| 亚洲激情在线激情| 综合激情网五月| 欧美老肥妇做.爰bbww| 国产av一区二区三区| 亚洲精品国产电影| lutube成人福利在线观看| 久久精品国产亚洲精品2020| free性欧美| 国产99在线|中文| 九九九九九九精品任你躁| 韩国成人av| 99久久精品网| 黄色一级片播放| 久久99精品久久久久久动态图 | 日韩在线网址| 蜜桃视频在线观看成人| 亚洲精品国产成人影院| 久久久久久久中文| 久久成人精品无人区| 国产一级免费片| 国产精品乱码人人做人人爱| 黄色小说在线观看视频| 欧美自拍丝袜亚洲| 日韩在线视频第一页| 一色桃子一区二区| 91探花在线观看| 成人国产精品一区二区| 羞羞答答一区二区| 天天想你在线观看完整版电影免费| 免播放器亚洲| 国产精品一级无码| 国产精品每日更新在线播放网址| 国产午夜福利片| 6080yy午夜一二三区久久| 狠狠v欧美ⅴ日韩v亚洲v大胸| 欧美日本高清视频| 成人国产综合| 久久久久久久久四区三区| 欧美成人中文| www.久久av.com| 欧美激情中文字幕一区二区| 日韩福利片在线观看| 日韩欧美一级特黄在线播放| 77777影视视频在线观看| 91精品国产91久久久久久久久| 免费看一区二区三区| 亚洲视频在线二区| 日韩中文字幕一区二区三区| 精品久久久久久无码人妻| ●精品国产综合乱码久久久久| 无码人妻精品一区二区蜜桃色欲| 精品福利一区二区三区免费视频| 毛片免费不卡| 国产日本欧美一区二区三区在线| 国产精品嫩草影院在线看| 国产一区二区在线视频播放| 成人免费va视频| 欧美片一区二区| 91精品国产91久久久久久最新毛片 | 国产精品天天干| 大荫蒂欧美视频另类xxxx| 亚洲男人天堂久久| 另类色图亚洲色图| 成人在线精品| 免费成人进口网站| 国产一区二区三区在线观看免费视频| 在线观看亚洲大片短视频| 在线亚洲欧美专区二区| 国产黄色在线| 国产精品久久久久久久久久久不卡 | 在线视频国内自拍亚洲视频| 偷拍自拍在线| 欧美最顶级的aⅴ艳星| 婷婷亚洲成人| 日韩免费毛片视频| 91视频国产资源| 国产精品视频123| 精品一区精品二区| 成人免费网站视频| 日韩精品久久久免费观看| 日本欧美一区二区在线观看| 91麻豆精品国产91久久综合| 欧美日精品一区视频| 日本欧美在线视频免费观看| 成人免费看片视频| 欧美在线亚洲| 大乳护士喂奶hd| 色综合久久中文综合久久97| 国产视频在线看| 成人性生交xxxxx网站| 女同性一区二区三区人了人一 | 国产精品男女| 免费观看精品视频| 国产网站一区二区| 国产精品一区二区黑人巨大| 欧美猛交ⅹxxx乱大交视频| 哺乳挤奶一区二区三区免费看| 日韩av一二三四区| 久久久精品免费网站| 国产精品乱码久久久| 欧美黑人巨大精品一区二区| 天堂av一区二区三区在线播放| 亚洲成人福利在线观看| 亚洲欧美激情小说另类| 亚洲精品国产av| 国产精品第8页| 欧美1区3d| 受虐m奴xxx在线观看| 欧美日本一区二区三区| 成人福利影视| 亚洲国产精品综合| 粉嫩一区二区三区在线看| jizz国产在线观看| 久久av.com| 精品在线91| 国产ts在线观看| 欧美色综合影院| 波多野结衣乳巨码无在线观看| 日韩精品一区二区三区四区五区 | 国产精品一区毛片| 成人在线观看高清| 日韩精品丝袜在线| 韩国一区二区三区视频| 99热成人精品热久久66|