精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Netty源碼之Reactor模式

開發(fā) 前端
每個(gè)新連接創(chuàng)建一個(gè)線程來處理。對于長連接服務(wù),如果一個(gè)client和server保持一個(gè)連接的話,有多少個(gè)client接入,server就需要?jiǎng)?chuàng)建同等的線程來處理。線程上下文切換,數(shù)據(jù)同步和內(nèi)存消耗,對server來說,將是非常大的開銷。

 [[410505]]

學(xué)習(xí)目標(biāo)

  • 什么是Reactor模式?
  • Reactor模式由什么組成的?
  • Reactor模式解決什么問題?
  • Reactor模式線程模型有哪些?演進(jìn)過程?

web處理請求架構(gòu)

大多數(shù)web請求處理流程可以抽象成這幾個(gè)步驟:讀取(read),解碼(decode),處理(process),編碼(encode),發(fā)送(send),如下圖所示:

 

同時(shí),處理web請求通常有兩種架構(gòu):傳統(tǒng)基于線程架構(gòu)和事件驅(qū)動(dòng)架構(gòu)。

傳統(tǒng)基于線程架構(gòu)

概念

每個(gè)新連接創(chuàng)建一個(gè)線程來處理。對于長連接服務(wù),如果一個(gè)client和server保持一個(gè)連接的話,有多少個(gè)client接入,server就需要?jiǎng)?chuàng)建同等的線程來處理。線程上下文切換,數(shù)據(jù)同步和內(nèi)存消耗,對server來說,將是非常大的開銷。

代碼實(shí)現(xiàn)

傳統(tǒng)基于線程架構(gòu)通常采用BIO的方式來實(shí)現(xiàn),代碼如下:

  1. public class Server implements Runnable { 
  2.  
  3.     int port; 
  4.  
  5.     public Server(int port) { 
  6.         this.port = port; 
  7.     } 
  8.  
  9.     @Override 
  10.     public void run() { 
  11.         try { 
  12.             ServerSocket serverSocket = new ServerSocket(port); 
  13.             while (true){ 
  14.                 System.out.println("等待新連接..."); 
  15.                 new Thread(new Handler(serverSocket.accept())).start(); 
  16.             } 
  17.         } catch (IOException e) { 
  18.             e.printStackTrace(); 
  19.         } 
  20.     } 
  21.  
  22.     static class Handler implements Runnable{ 
  23.  
  24.         private Socket socket; 
  25.  
  26.         public Handler(Socket socket){ 
  27.             this.socket = socket; 
  28.         } 
  29.  
  30.         @Override 
  31.         public void run() { 
  32.             try { 
  33.                 byte[] input = new byte[1024]; 
  34.  
  35.                 this.socket.getInputStream().read(input); 
  36.                 byte[] output = process(input); 
  37.                 this.socket.getOutputStream().write(output); 
  38.                 this.socket.getOutputStream().flush(); 
  39.                 this.socket.close(); 
  40.                 System.out.println("響應(yīng)完成!"); 
  41.             } catch (IOException e) { 
  42.                 e.printStackTrace(); 
  43.             } 
  44.         } 
  45.  
  46.         private byte[] process(byte[] input) { 
  47.             System.out.println("讀取內(nèi)容:" + new String(input)); 
  48.             return input; 
  49.         } 
  50.     } 
  51.  
  52.     public static void main(String[] args) throws InterruptedException { 
  53.         Thread thread = new Thread(new Server(2021)); 
  54.         thread.setDaemon(true); 
  55.         thread.start(); 
  56.  
  57.         synchronized (Server.class) { 
  58.             Server.class.wait(); 
  59.         } 
  60.     } 

為了避免線程創(chuàng)建銷毀的開銷,我們通常會(huì)采用線程池,但是同樣也有很大的弊端:

  • 同步阻塞IO,讀寫阻塞,線程等待時(shí)間過長
  • 在制定線程策略的時(shí)候,只能根據(jù)CPU的數(shù)目來限定可用線程資源,不能根據(jù)連接并發(fā)數(shù)目來制定,也就是連接有限制。否則很難保證對客戶端請求的高效和公平。
  • 多線程之間的上下文切換,造成線程使用效率并不高,并且不易擴(kuò)展
  • 狀態(tài)數(shù)據(jù)以及其他需要保持一致的數(shù)據(jù),需要采用并發(fā)同步控制

應(yīng)用場景

既然傳統(tǒng)基于線程架構(gòu)弊端這么多,它存在還有什么價(jià)值?它的應(yīng)用場景是什么?

傳統(tǒng)基于線程架構(gòu)適用于連接數(shù)目比較小且一次傳輸大量數(shù)據(jù)的場景,比如上傳,下載。

事件驅(qū)動(dòng)架構(gòu)

事件驅(qū)動(dòng)架構(gòu):可以把線程和連接解耦,線程只用于執(zhí)行事件注冊的回調(diào)函數(shù)。事件驅(qū)動(dòng)架構(gòu)由事件生產(chǎn)者和事件消費(fèi)者組成,前者是事件的來源,它只負(fù)責(zé)監(jiān)聽哪些事件發(fā)生;后者是直接處理事件或者事件發(fā)生時(shí),響應(yīng)事件的實(shí)體。

Reactor模式

什么是Reactor模式?

Reactor模式是事件驅(qū)動(dòng)架構(gòu)的一種具體實(shí)現(xiàn)方法,簡而言之,就是一個(gè)單線程進(jìn)行循環(huán)監(jiān)聽就緒IO事件,并將就緒IO事件分發(fā)給對應(yīng)的回調(diào)函數(shù)。

Reactor模式由什么組成的?

Reactor模式分為兩個(gè)重要組成部分,Reactor和Handler。 Reactor(反應(yīng)器):循環(huán)監(jiān)聽就緒IO事件,并分發(fā)給回調(diào)函數(shù)。 Handler(回調(diào)函數(shù)):執(zhí)行對應(yīng)IO事件的實(shí)際業(yè)務(wù)邏輯。

Reactor模式解決什么問題?

反應(yīng)器模式可以實(shí)現(xiàn)同步的多路復(fù)用,同步是指按照事件到達(dá)的順序分發(fā)處理。反應(yīng)器 接收來自不同的客戶端的消息、請求和連接,盡管客戶端是并發(fā)的,但是反應(yīng)器可以按照事件到達(dá)的順序觸發(fā)回調(diào)函數(shù)。因此,Reactor模式將連接和線程解耦,不需要為每個(gè)連接創(chuàng)建單獨(dú)線程。這個(gè)問題和C10K問題相同,提供了一個(gè)解決思路。

Reactor模式下的三種模型

單線程模型:IO事件輪詢,分發(fā)(accept)和IO事件執(zhí)行(read,decode,compute,encode,send)都在一個(gè)線程中完成,如下圖所示:

在單線程模型下,不僅IO操作在Reactor線程上,而非IO操作(handlder中process()方法)也在Reactor線程上執(zhí)行了,當(dāng)非IO操作執(zhí)行慢的話,這會(huì)大大延遲IO請求響應(yīng),所以應(yīng)該把非IO操作拆出來,來加速Reactor線程對IO請求響應(yīng),就出現(xiàn)多線程模型。

單線程模型實(shí)現(xiàn):

  1. // reactor 
  2. public class Reactor implements Runnable { 
  3.  
  4.     int port; 
  5.     Selector selector; 
  6.     ServerSocketChannel serverSocket; 
  7.  
  8.     public Reactor(int port) throws IOException { 
  9.         this.port = port; 
  10.         // 創(chuàng)建serverSocket對象 
  11.         serverSocket = ServerSocketChannel.open(); 
  12.         // 綁定端口 
  13.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  14.         // 配置非阻塞 
  15.         serverSocket.configureBlocking(false); 
  16.         // 創(chuàng)建selector對象 
  17.         selector = Selector.open(); 
  18.         // serversocket注冊到selector上,幫忙監(jiān)聽accpet事件 
  19.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket,selector)); 
  20.         /** 還可以使用 SPI provider,來創(chuàng)建selector和serversocket對象 
  21.         SelectorProvider p = SelectorProvider.provider(); 
  22.         selector = p.openSelector(); 
  23.         serverSocket = p.openServerSocketChannel(); 
  24.         */ 
  25.     } 
  26.  
  27.     @Override 
  28.     public void run() { 
  29.         try { 
  30.             while (!Thread.interrupted()) { 
  31.                 System.out.println("start select event..."); 
  32.                 selector.select(); 
  33.                 Set selectedKeys = selector.selectedKeys(); 
  34.                 Iterator it = selectedKeys.iterator(); 
  35.                 while (it.hasNext()) { 
  36.                     dispatch((SelectionKey)it.next()); 
  37.                 } 
  38.                 selectedKeys.clear(); 
  39.             } 
  40.         } catch (IOException e) { 
  41.             e.printStackTrace(); 
  42.         } 
  43.     } 
  44.  
  45.     private void dispatch(SelectionKey key) { 
  46.         Runnable r = (Runnable) key.attachment(); 
  47.         if (r != null) { 
  48.             r.run(); 
  49.         } 
  50.     } 
  51.  
  52.  
  53.     public static void main(String[] args) throws IOException, InterruptedException { 
  54.         Thread thread = new Thread(new Reactor(2021)); 
  55.         thread.start(); 
  56.         synchronized (Reactor.class) { 
  57.             Reactor.class.wait(); 
  58.         } 
  59.     } 
  60. // acceptor調(diào)度器 
  61. public class Acceptor implements Runnable { 
  62.  
  63.     ServerSocketChannel serverSocket; 
  64.     Selector selector; 
  65.  
  66.     public Acceptor(ServerSocketChannel serverSocket,Selector selector) { 
  67.         this.serverSocket = serverSocket; 
  68.         this.selector = selector; 
  69.     } 
  70.  
  71.     @Override 
  72.     public void run() { 
  73.         try { 
  74.             SocketChannel socket = this.serverSocket.accept(); 
  75.             if (socket != null) { 
  76.                 new Handler(selector,socket); 
  77.             } 
  78.  
  79.         } catch (IOException e) { 
  80.             e.printStackTrace(); 
  81.         } 
  82.     } 
  83. // 回調(diào)函數(shù)handler 
  84. public class Handler implements Runnable { 
  85.  
  86.     Selector selector; 
  87.     SocketChannel socket; 
  88.     SelectionKey sk; 
  89.  
  90.     ByteBuffer input = ByteBuffer.allocate(1024); 
  91.     ByteBuffer output = ByteBuffer.allocate(1024); 
  92.     static final int READING = 0, SENDING = 1; 
  93.     int state = READING; 
  94.  
  95.  
  96.     public Handler(Selector selector, SocketChannel socket) throws IOException { 
  97.         this.selector = selector; 
  98.         this.socket = socket; 
  99.  
  100.         this.socket.configureBlocking(false); 
  101.         sk = this.socket.register(selector,0); 
  102.         sk.attach(this); 
  103.         sk.interestOps(SelectionKey.OP_READ); 
  104.         selector.wakeup(); 
  105.     } 
  106.  
  107.     @Override 
  108.     public void run() { 
  109.         try{ 
  110.             if (state == READING) { 
  111.                 read(); 
  112.             } else if (state == SENDING) { 
  113.                 send(); 
  114.             } 
  115.         } catch (IOException ex) { 
  116.             ex.printStackTrace(); 
  117.         } 
  118.     } 
  119.  
  120.     private void read() throws IOException { 
  121.         socket.read(input); 
  122.         if (inputIsComplete()) { 
  123.             // 執(zhí)行業(yè)務(wù)邏輯代碼 
  124.             process(); 
  125.             state = SENDING; 
  126.             // Normally also do first write now 
  127.             sk.interestOps(SelectionKey.OP_WRITE); 
  128.         } 
  129.     } 
  130.  
  131.     private void send() throws IOException { 
  132.         socket.write(output); 
  133.         socket.close(); 
  134.         if (outputIsComplete()) sk.cancel(); 
  135.     } 
  136.  
  137.     boolean inputIsComplete() { return true;} 
  138.  
  139.     boolean outputIsComplete() {return true;} 
  140.     // 處理非IO操作(業(yè)務(wù)邏輯代碼) 
  141.     void process(){ 
  142.         String msg = new String(input.array()); 
  143.         System.out.println("讀取內(nèi)容:" + msg); 
  144.         output.put(msg.getBytes()); 
  145.         output.flip(); 
  146.     } 
  • 多線程模型:與單線程模型不同的是添加一個(gè)業(yè)務(wù)線程池,將非IO操作(業(yè)務(wù)邏輯處理)交給業(yè)務(wù)線程池來處理,提高Reactor線程的IO響應(yīng),如圖所示:

 

在多線程模型下,雖然將非IO操作拆出去了,但是所有IO操作都在Reactor單線程中完成的。在高負(fù)載、高并發(fā)場景下,也會(huì)成為瓶頸,于是對Reactor單線程進(jìn)行了優(yōu)化,出現(xiàn)了主從線程模型。

多線程模型實(shí)現(xiàn):

  1. public class Reactor implements Runnable { 
  2.  
  3.     int port; 
  4.     Selector selector; 
  5.     ServerSocketChannel serverSocket; 
  6.  
  7.  
  8.     public Reactor(int port) throws IOException { 
  9.         this.port = port; 
  10.  
  11.         // 創(chuàng)建serverSocket對象 
  12.         serverSocket = ServerSocketChannel.open(); 
  13.         // 綁定端口 
  14.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  15.         // 配置非阻塞 
  16.         serverSocket.configureBlocking(false); 
  17.  
  18.         // 創(chuàng)建selector對象 
  19.         selector = Selector.open(); 
  20.         // serversocket注冊到selector上,幫忙監(jiān)聽accpet事件 
  21.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,selector)); 
  22.  
  23.         /** 還可以使用 SPI provider,來創(chuàng)建selector和serversocket對象 
  24.         SelectorProvider p = SelectorProvider.provider(); 
  25.         selector = p.openSelector(); 
  26.         serverSocket = p.openServerSocketChannel(); 
  27.         */ 
  28.     } 
  29.  
  30.     @Override 
  31.     public void run() { 
  32.         try { 
  33.             while (!Thread.interrupted()) { 
  34.                 System.out.println("start select event..."); 
  35.                 selector.select(); 
  36.                 Set selectedKeys = selector.selectedKeys(); 
  37.                 Iterator it = selectedKeys.iterator(); 
  38.                 while (it.hasNext()) { 
  39.                     dispatch((SelectionKey)it.next()); 
  40.                 } 
  41.                 selectedKeys.clear(); 
  42.             } 
  43.         } catch (IOException e) { 
  44.             e.printStackTrace(); 
  45.         } 
  46.     } 
  47.  
  48.     private void dispatch(SelectionKey key) { 
  49.         SelfRunable r = (SelfRunable) key.attachment(); 
  50.         if (r != null) { 
  51.             System.out.println("dispatch to " + r.getName() + "===="); 
  52.             r.run(); 
  53.         } 
  54.     } 
  55.  
  56.  
  57.     public static void main(String[] args) throws IOException, InterruptedException { 
  58.  
  59.         Thread thread = new Thread(new Reactor(2021)); 
  60.         thread.start(); 
  61.  
  62.         synchronized (Reactor.class) { 
  63.             Reactor.class.wait(); 
  64.         } 
  65.  
  66.  
  67.     } 
  68.  
  69. public class Acceptor implements SelfRunable { 
  70.     ServerSocketChannel serverSocket; 
  71.     Selector selector; 
  72.     String name
  73.     public Acceptor(String name, ServerSocketChannel serverSocket,Selector selector) { 
  74.         this.name = name
  75.         this.serverSocket = serverSocket; 
  76.         this.selector = selector; 
  77.     } 
  78.  
  79.     @Override 
  80.     public void run() { 
  81.         try { 
  82.             SocketChannel socket = this.serverSocket.accept(); 
  83.             if (socket != null) { 
  84.                 new Handler("handler_" + ((InetSocketAddress)socket.getLocalAddress()).getPort(), selector,socket); 
  85.             } 
  86.  
  87.         } catch (IOException e) { 
  88.             e.printStackTrace(); 
  89.         } 
  90.     } 
  91.  
  92.     @Override 
  93.     public String getName() { 
  94.         return this.name
  95.     } 
  96.  
  97. public class Handler implements SelfRunable { 
  98.     String name
  99.     Selector selector; 
  100.     SocketChannel socket; 
  101.     SelectionKey sk; 
  102.     ByteBuffer input = ByteBuffer.allocate(1024); 
  103.     ByteBuffer output = ByteBuffer.allocate(1024); 
  104.     static final int READING = 0, SENDING = 1,  PROCESSING = 3; 
  105.     volatile int state = READING; 
  106.     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5); 
  107.  
  108.     public Handler(String name, Selector selector, SocketChannel socket) throws IOException { 
  109.         this.selector = selector; 
  110.         this.socket = socket; 
  111.         this.name = name
  112.  
  113.         this.socket.configureBlocking(false); 
  114.         sk = this.socket.register(selector,0); 
  115.         sk.attach(this); 
  116.         sk.interestOps(SelectionKey.OP_READ); 
  117.         selector.wakeup(); 
  118.     } 
  119.  
  120.     @Override 
  121.     public void run() { 
  122.         try{ 
  123.             System.out.println("state:" + state); 
  124.             if (state == READING) { 
  125.                 read(); 
  126.             } else if (state == SENDING) { 
  127.                 send(); 
  128.             } 
  129.         } catch (IOException ex) { 
  130.             ex.printStackTrace(); 
  131.         } 
  132.     } 
  133.  
  134.     synchronized void read() throws IOException { 
  135.         socket.read(input); 
  136.         if (inputIsComplete()) { 
  137.             state = PROCESSING; 
  138.            poolExecutor.execute(new Processer()); 
  139.         } 
  140.     } 
  141.  
  142.     synchronized void processAndHandOff() { 
  143.         System.out.println("processAndHandOff========="); 
  144.         process(); 
  145.         state = SENDING; // or rebind attachment 
  146.         sk.interestOps(SelectionKey.OP_WRITE); 
  147.         selector.wakeup(); 
  148.         System.out.println("processAndHandOff finish ! ========="); 
  149.     } 
  150.  
  151.     private void send() throws IOException { 
  152.         System.out.println("start send ..."); 
  153.         socket.write(output); 
  154.         socket.close(); 
  155.         System.out.println("start send finish!"); 
  156.         if (outputIsComplete()) sk.cancel(); 
  157.     } 
  158.  
  159.     boolean inputIsComplete() { return true;} 
  160.  
  161.     boolean outputIsComplete() {return true;} 
  162.  
  163.     void process(){ 
  164.         String msg = new String(input.array()); 
  165.         System.out.println("讀取內(nèi)容:" + msg); 
  166.         output.put(msg.getBytes()); 
  167.         output.flip(); 
  168.     } 
  169.  
  170.     @Override 
  171.     public String getName() { 
  172.         return this.name
  173.     } 
  174.  
  175.     class Processer implements Runnable { 
  176.         public void run() { processAndHandOff(); } 
  177.     } 
  • 主從線程模型: 相比多線程模型而言,對于多核cpu,為了充分利用資源,將Reactor拆分成了mainReactor 和 subReactor,但是,主從線程模型也有弊端,不適合大量數(shù)據(jù)傳輸。 mainReactor:負(fù)責(zé)監(jiān)聽接收(accpet)新連接,將新連接后續(xù)操作交給subReactor來處理,通常由一個(gè)線程處理。 subReactor: 負(fù)責(zé)處理IO的讀寫操作,通常由多個(gè)線程處理。 非IO操作依然由業(yè)務(wù)線程池來處理。

主從線程模型實(shí)現(xiàn):

  1. public class Reactor implements Runnable { 
  2.  
  3.     int port; 
  4.     Selector selector; 
  5.     ServerSocketChannel serverSocket; 
  6.     int SUBREACTOR_SIZE = 1; 
  7.     SubReactor[] subReactorPool = new SubReactor[SUBREACTOR_SIZE]; 
  8.  
  9.  
  10.     public Reactor(int port) throws IOException { 
  11.         this.port = port; 
  12.  
  13.         // 創(chuàng)建serverSocket對象 
  14.         serverSocket = ServerSocketChannel.open(); 
  15.         // 綁定端口 
  16.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  17.         // 配置非阻塞 
  18.         serverSocket.configureBlocking(false); 
  19.  
  20.         // 創(chuàng)建selector對象 
  21.         selector = Selector.open(); 
  22.         // serversocket注冊到selector上,幫忙監(jiān)聽accpet事件 
  23.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,subReactorPool)); 
  24.  
  25.         // 初始化subreactor pool 
  26.         initSubReactorPool(); 
  27.  
  28.  
  29.         /** 還可以使用 SPI provider,來創(chuàng)建selector和serversocket對象 
  30.         SelectorProvider p = SelectorProvider.provider(); 
  31.         selector = p.openSelector(); 
  32.         serverSocket = p.openServerSocketChannel(); 
  33.         */ 
  34.     } 
  35.  
  36.     @Override 
  37.     public void run() { 
  38.         try { 
  39.             while (!Thread.interrupted()) { 
  40.                 System.out.println("mainReactor start select event..."); 
  41.                 selector.select(); 
  42.                 Set selectedKeys = selector.selectedKeys(); 
  43.                 Iterator it = selectedKeys.iterator(); 
  44.                 while (it.hasNext()) { 
  45.                     dispatch((SelectionKey)it.next()); 
  46.                 } 
  47.                 selectedKeys.clear(); 
  48.             } 
  49.         } catch (IOException e) { 
  50.             e.printStackTrace(); 
  51.         } 
  52.     } 
  53.  
  54.     void initSubReactorPool() { 
  55.         try { 
  56.             for (int i = 0; i < SUBREACTOR_SIZE; i++) { 
  57.                 subReactorPool[i] = new SubReactor("SubReactor" + i); 
  58.             } 
  59.         } catch (IOException ex) { /* ... */ } 
  60.     } 
  61.  
  62.     private void dispatch(SelectionKey key) { 
  63.         SelfRunable r = (SelfRunable) key.attachment(); 
  64.         if (r != null) { 
  65.             System.out.println("mainReactor dispatch to " + r.getName() + "===="); 
  66.             r.run(); 
  67.         } 
  68.     } 
  69.  
  70.  
  71.     public static void main(String[] args) throws IOException, InterruptedException { 
  72.  
  73.         Thread thread = new Thread(new Reactor(2021)); 
  74.         thread.start(); 
  75.  
  76.         synchronized (Reactor.class) { 
  77.             Reactor.class.wait(); 
  78.         } 
  79.     } 
  80.  
  81. public class SubReactor implements SelfRunable { 
  82.  
  83.     private Selector selector; 
  84.     private String name
  85.     private List<SelfRunable> task = new ArrayList<SelfRunable>(); 
  86.  
  87.     public SubReactor(String name) throws IOException { 
  88.         this.name = name
  89.         selector = Selector.open(); 
  90.         new Thread(this).start(); 
  91.     } 
  92.  
  93.     @Override 
  94.     public String getName() { 
  95.         return this.name
  96.     } 
  97.  
  98.     @Override 
  99.     public void run() { 
  100.         try { 
  101.             while (!Thread.interrupted()) { 
  102.                 System.out.println("subReactor start select event..."); 
  103.                 selector.select(5000); 
  104.                 Set selectedKeys = selector.selectedKeys(); 
  105.                 Iterator it = selectedKeys.iterator(); 
  106.                 while (it.hasNext()) { 
  107.                     dispatch((SelectionKey)it.next()); 
  108.                 } 
  109.                 selectedKeys.clear(); 
  110.  
  111.             } 
  112.         } catch (IOException e) { 
  113.             e.printStackTrace(); 
  114.         } 
  115.     } 
  116.  
  117.     private void dispatch(SelectionKey key) { 
  118.         SelfRunable r = (SelfRunable) key.attachment(); 
  119.         if (r != null) { 
  120.             System.out.println("subReactor dispatch to " + r.getName() + "===="); 
  121.             r.run(); 
  122.         } 
  123.     } 
  124.  
  125.     public Selector getSelector(){ 
  126.         return this.selector; 
  127.     } 
  128.  
  129.     public void submit(SelfRunable runnable) { 
  130.         task.add(runnable); 
  131.     } 
  132.  
  133.  
  134. public class Acceptor implements SelfRunable { 
  135.  
  136.     int next = 0; 
  137.     String name
  138.     SubReactor[] subReactorPool; 
  139.     ServerSocketChannel serverSocket; 
  140.  
  141.     public Acceptor(String name, ServerSocketChannel serverSocket,SubReactor[] subReactorPool) { 
  142.         this.name = name
  143.         this.serverSocket = serverSocket; 
  144.         this.subReactorPool = subReactorPool; 
  145.     } 
  146.  
  147.     @Override 
  148.     public void run() { 
  149.         try { 
  150.             SocketChannel socket = this.serverSocket.accept(); 
  151.             if (socket != null) { 
  152.                 new Handler("handler", subReactorPool[next].getSelector(),socket); 
  153.             } 
  154.             if (++next == subReactorPool.length) {next=0;} 
  155.  
  156.         } catch (IOException e) { 
  157.             e.printStackTrace(); 
  158.         } 
  159.     } 
  160.  
  161.     @Override 
  162.     public String getName() { 
  163.         return this.name
  164.     } 
  165.  
  166. public class Handler implements SelfRunable { 
  167.  
  168.     String name
  169.     Selector selector; 
  170.     SocketChannel socket; 
  171.     SelectionKey sk; 
  172.  
  173.     ByteBuffer input = ByteBuffer.allocate(1024); 
  174.     ByteBuffer output = ByteBuffer.allocate(1024); 
  175.     static final int READING = 0, SENDING = 1,  PROCESSING = 3; 
  176.     volatile int state = READING; 
  177.  
  178.     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5); 
  179.  
  180.     public Handler(String name, Selector selector, SocketChannel socket) throws IOException { 
  181.         this.selector = selector; 
  182.         this.socket = socket; 
  183.         this.name = name
  184.  
  185.         this.socket.configureBlocking(false); 
  186.         sk = this.socket.register(this.selector,0); 
  187.         sk.attach(this); 
  188.         sk.interestOps(SelectionKey.OP_READ); 
  189.         selector.wakeup(); 
  190.     } 
  191.  
  192.     @Override 
  193.     public void run() { 
  194.         try{ 
  195.             System.out.println("state:" + state); 
  196.             if (state == READING) { 
  197.                 read(); 
  198.             } else if (state == SENDING) { 
  199.                 send(); 
  200.             } 
  201.         } catch (IOException ex) { 
  202.             ex.printStackTrace(); 
  203.         } 
  204.     } 
  205.  
  206.     synchronized void read() throws IOException { 
  207.         socket.read(input); 
  208.         if (inputIsComplete()) { 
  209.             state = PROCESSING; 
  210.            poolExecutor.execute(new Processer()); 
  211.         } 
  212.     } 
  213.  
  214.     synchronized void processAndHandOff() { 
  215.         System.out.println("processAndHandOff========="); 
  216.         process(); 
  217.         state = SENDING; // or rebind attachment 
  218.         sk.interestOps(SelectionKey.OP_WRITE); 
  219.         selector.wakeup(); 
  220.         System.out.println("processAndHandOff finish ! ========="); 
  221.     } 
  222.  
  223.     private void send() throws IOException { 
  224.         System.out.println("start send ..."); 
  225.         socket.write(output); 
  226.         socket.close(); 
  227.         System.out.println("start send finish!"); 
  228.         if (outputIsComplete()) sk.cancel(); 
  229.     } 
  230.  
  231.     boolean inputIsComplete() { return true;} 
  232.  
  233.     boolean outputIsComplete() {return true;} 
  234.  
  235.     void process(){ 
  236.         String msg = new String(input.array()); 
  237.         System.out.println("讀取內(nèi)容:" + msg); 
  238.         output.put(msg.getBytes()); 
  239.         output.flip(); 
  240.     } 
  241.  
  242.     @Override 
  243.     public String getName() { 
  244.         return this.name
  245.     } 
  246.  
  247.     class Processer implements Runnable { 
  248.         public void run() { processAndHandOff(); } 
  249.     } 

Reactor線程模型演進(jìn)

模型

簡介

弊端

單線程模型

IO/非IO操作都在Reactor單線程中完成

非IO操作執(zhí)行慢,影響IO操作響應(yīng)延遲

多線程模型

拆分非IO操作交給業(yè)務(wù)線程池執(zhí)行,IO操作由Reator單線程執(zhí)行

高并發(fā),高負(fù)載場景下,Reactor單線程會(huì)成為瓶頸

主從線程模型

Reactor單線程拆分為mainReactor和subReactor

不適合大量數(shù)據(jù)傳輸

Netty線程模型

Reactor主從線程模型-抽象模型

  • 創(chuàng)建ServerSocketChannel過程(創(chuàng)建channel,配置非阻塞)
  • ServerSocketChannel注冊到mainReactor的selector對象上,監(jiān)聽accept事件
  • mainReactor的selector監(jiān)聽到新連接SocketChannel,將SocketChannel注冊到subReactor的selector對象上,監(jiān)聽read/write事件
  • subReactor的selector監(jiān)聽到read/write事件,移交給業(yè)務(wù)線程池(對應(yīng)netty的pipeline)

Netty線程模型

我們再好好看看mainReactor和subReactor,其實(shí)這兩個(gè)類功能非常相似,所以Netty將mainReactor和subReactor統(tǒng)一成了EventLoop。對于Netty零基礎(chǔ)的,請參考這個(gè)Reactor主從線程模型-抽象模型和下面這張圖來理解EventLoop。

 

 

責(zé)任編輯:武曉燕 來源: 今日頭條
相關(guān)推薦

2022-03-04 08:10:35

NettyIO模型Reactor

2022-03-06 12:15:38

NettyReactor線程

2025-05-08 10:25:00

Netty網(wǎng)絡(luò)編程框架

2021-06-16 14:18:37

NettyReactor線程模型

2022-10-25 08:23:09

Reactor模式I/O

2012-08-24 09:58:09

ReactorDSSC

2022-03-10 07:58:12

ReactorNetty運(yùn)轉(zhuǎn)架構(gòu)

2022-09-29 15:39:10

服務(wù)器NettyReactor

2020-12-11 11:04:07

NettyIO

2022-02-09 09:37:54

ReactorNettyI/O

2024-11-22 08:00:00

Netty開發(fā)

2019-01-15 10:54:03

高性能ServerReactor

2022-05-24 15:46:51

Wi-FiSTA模式

2024-10-24 20:48:04

Netty線程Java

2020-08-21 07:23:50

工廠模式設(shè)計(jì)

2021-04-26 17:38:40

ReactorProactor網(wǎng)絡(luò)

2021-09-27 08:56:44

NettyChannelHand架構(gòu)

2015-03-31 18:26:43

陌陌社交

2015-09-08 13:39:10

JavaScript設(shè)計(jì)模式

2012-01-13 15:59:07

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

国产精品波多野结衣| 欧美国产日韩在线| 污色网站在线观看| 成a人片在线观看| 成人免费黄色大片| 国产精品美女网站| 久久亚洲AV无码| 夜夜躁狠狠躁日日躁2021日韩| 欧美三片在线视频观看 | 你懂的视频欧美| 欧美三级中文字幕| 97久久国产亚洲精品超碰热| 欧美少妇另类| 国产一区二区三区黄视频| 51色欧美片视频在线观看| 欧美日韩生活片| 精品自拍偷拍| 911国产精品| 国产91美女视频| 国产日产一区二区| 国产午夜亚洲精品理论片色戒| 91亚洲精品在线观看| 日韩欧美成人一区二区三区 | 国产福利一区二区| 国产福利成人在线| 日韩精品国产一区二区| 五月天久久网站| 亚洲欧美日韩精品久久奇米色影视| 奇米视频7777| 裤袜国产欧美精品一区| 一区2区3区在线看| 在线精品日韩| 极品美乳网红视频免费在线观看 | 加勒比精品视频| 伊人久久大香伊蕉在人线观看热v| 亚洲电影一区二区三区| 久久免费视频2| www.亚洲.com| 久久久99精品久久| 久久精品一二三区| 日韩一级片免费在线观看| 精品亚洲porn| 国产精品日韩欧美大师| 伊人中文字幕在线观看| 夜夜嗨一区二区| 久久乐国产精品| 波多野结衣不卡视频| 日韩aaaa| 中文字幕亚洲自拍| 中文字幕av久久爽一区| 国产成人黄色| 亚洲欧美三级伦理| 人妻体内射精一区二区| 亚洲国产精品嫩草影院久久av| 精品久久国产字幕高潮| 日本黄色大片在线观看| 免费精品一区二区三区在线观看| 欧美精品一卡两卡| 三上悠亚在线一区二区| 主播大秀视频在线观看一区二区| 在线视频欧美精品| 午夜免费福利在线| 国产亚洲精品精品国产亚洲综合| 欧美中文字幕一区二区三区亚洲| 少妇人妻互换不带套| 亚洲高清黄色| 精品视频全国免费看| 91福利国产成人精品播放| av成人亚洲| 欧美丰满美乳xxx高潮www| 亚洲激情在线看| 日韩欧美一级| 日韩av在线网址| 国精品无码人妻一区二区三区| 亚洲另类春色校园小说| 亚洲天堂一区二区三区| xxxxx99| 999国产精品永久免费视频app| 色av吧综合网| 美女福利视频在线观看| 伊人激情综合| 国产成人精品电影久久久| 国模私拍一区二区| 国产在线精品免费| 国产一区二区三区黄| 青青久在线视频免费观看| 国产欧美日韩综合| 红桃一区二区三区| 松下纱荣子在线观看| 欧美日韩一卡二卡三卡| 91成人在线观看喷潮蘑菇| 精品五月天堂| 在线视频中文亚洲| 免费一级a毛片夜夜看| 国产农村妇女毛片精品久久莱园子 | 国产精品亚洲一区| 国产精品久久久久久久龚玥菲| 国产精品第一页第二页第三页| 日本久久久网站| 亚洲欧美在线成人| 日韩欧美黄色影院| 野外性满足hd| 欧美精品国产| 国产精品27p| 成人毛片在线免费观看| 国产亚洲一区二区三区四区| 国产在线拍揄自揄拍无码| 国产免费拔擦拔擦8x高清在线人| 欧美日韩亚洲另类| 波多野结衣影院| 色偷偷综合网| 奇米4444一区二区三区 | 久久久国产精品黄毛片| 久久综合婷婷| 国产乱码精品一区二区三区日韩精品 | 成年免费在线观看| 国产一区二区看久久| 欧洲视频一区二区三区| a'aaa级片在线观看| 欧美日韩精品系列| 魔女鞋交玉足榨精调教| 激情婷婷亚洲| 成人午夜小视频| 国产精品久久久久久久龚玥菲| 精品久久久久久久久久国产| wwwxxx色| 中文字幕人成人乱码| 国产日韩欧美综合| 国产色a在线| 色一情一伦一子一伦一区| 催眠调教后宫乱淫校园| 欧美成人69av| 亚洲综合自拍一区| 欧美jizz18性欧美| 在线不卡的av| 黑人と日本人の交わりビデオ| 先锋亚洲精品| 看高清中日韩色视频| а√天堂8资源中文在线| 日韩午夜激情av| 在线免费观看亚洲视频| 国产美女久久久久| 一级特黄妇女高潮| 在线高清欧美| 久久精品国产亚洲7777| 中文字幕无线码一区 | av永久免费观看| 一本色道久久综合亚洲精品高清 | 国产精品30p| 国产成人av影院| 中文字幕色一区二区| 国产成人精品一区二区三区视频| 亚洲精品小视频| 中文字幕一区二区三区精品| 99在线精品免费| 日本中文字幕亚洲| 亚洲欧洲国产精品一区| 欧美成人性色生活仑片| 国产精品久久免费| 亚洲激情网站免费观看| 国内av一区二区| 日韩国产在线| 国产精品亚洲网站| 最新av网站在线观看| 婷婷综合久久一区二区三区| 亚洲精品久久一区二区三区777| 亚洲欧美综合| 成人黄色片视频网站| 性爱视频在线播放| 欧美另类高清zo欧美| 黄色一级视频免费观看| 丁香五精品蜜臀久久久久99网站 | 亚洲狠狠婷婷综合久久久久图片| 亚洲精品婷婷| 久久久精彩视频| 粉嫩一区二区| 亚洲人成网站999久久久综合| 国产又大又粗又爽| 欧美极品美女视频| www.五月天色| 欧美午夜a级限制福利片| 狠狠色噜噜狠狠色综合久| 国产在线精彩视频| 亚洲区中文字幕| 中文字幕一区二区三区人妻四季 | 亚洲资源av| 日韩精品伦理第一区| 亚洲午夜国产成人| 欧美国产日韩中文字幕在线| 性xxxx搡xxxxx搡欧美| 日韩欧美亚洲范冰冰与中字| 日本高清黄色片| 国产不卡免费视频| 成人在线免费观看av| 欧洲激情综合| 成人午夜一级二级三级| 日本大片在线播放| 国产亚洲精品久久久久久牛牛 | 蜜桃久久精品成人无码av| 麻豆专区一区二区三区四区五区| 美女黄色免费看| 国产一区二区精品久| 96pao国产成视频永久免费| xxx性欧美| 中文国产成人精品| 丰满人妻一区二区三区免费视频| 色偷偷一区二区三区| 神马久久精品综合| www.在线欧美| 亚洲免费黄色网| 美女日韩在线中文字幕| 中文字幕超清在线免费观看| 国内精品偷拍| 国产精品免费视频久久久| 欧亚在线中文字幕免费| 日韩中文在线中文网三级| 国产综合在线播放| 欧美亚一区二区| 日本少妇xxxx动漫| 樱花草国产18久久久久| 少妇av片在线观看| 成人一区在线观看| 中文字幕亚洲影院| 久久男女视频| 日韩一级性生活片| 99精品全国免费观看视频软件| 精品国产免费一区二区三区 | 精品人伦一区二区三区蜜桃免费 | 国产极品久久久| 欧美午夜精品久久久| 中文字幕69页| 午夜精品久久久久| 欧美日韩成人免费观看| 中文一区一区三区高中清不卡| 欧美体内she精高潮| 精品一二线国产| 国产嫩草在线观看| 亚洲专区一区二区三区| 91免费黄视频| 日韩午夜在线| 久久福利一区二区| 我不卡神马影院| 亚洲欧美成人一区| 欧美精品中文字幕亚洲专区| 成人福利在线视频| 99久久伊人| 国产精品丝袜白浆摸在线| 亚洲v国产v在线观看| yellow91字幕网在线| 最近中文字幕日韩精品| 国产尤物视频在线| 日韩成人在线视频观看| 日本成人动漫在线观看| 日韩精品一区二区三区视频播放 | 国产精品一区二区3区| 欧美极品videos大乳护士| 国a精品视频大全| 第四色日韩影片| 久久久久久久一区二区| 亚洲综合伊人久久大杳蕉| 在线电影欧美日韩一区二区私密| 国产精品一级伦理| 国产一区二区三区在线免费观看| 精品推荐蜜桃传媒| 亚洲人成电影在线观看天堂色| 日本不卡视频一区二区| 亚洲跨种族黑人xxx| 欧美少妇另类| 国产亚洲欧洲在线| 午夜福利一区二区三区| 亚洲品质视频自拍网| 国产美女性感在线观看懂色av| 亚洲视频在线观看网站| 国产精品久久久久一区二区国产| 日韩少妇与小伙激情| 搞黄网站在线观看| 久久久久久国产精品三级玉女聊斋 | 国产一区二区美女| 在线免费看黄色片| 亚洲国产精品成人综合色在线婷婷 | 国产精品原创视频| 国产免费黄色一级片| 我爱我色成人网| 日本精品久久久| 婷婷午夜社区一区| 国产精品三级网站| 成人永久在线| 激情小说综合网| 久久99国内| 伊人av成人| 欧美88av| 日韩免费毛片视频| 美女看a上一区| 91精品人妻一区二区三区蜜桃2| 成人aa视频在线观看| 国产黄色大片免费看| 亚洲精品欧美综合四区| 日本一级片免费看| 欧美亚洲动漫制服丝袜| 一区二区三区免费在线| 日韩av在线直播| 日本在线观看网站| 欧美精品videossex88| 一级毛片久久久| 91超碰在线电影| 伊人久久大香线蕉综合网站 | 欧美**vk| 亚洲精品高清视频| 一区二区毛片| 日韩在线不卡一区| av中文字幕亚洲| 亚洲色图100p| 色999日韩国产欧美一区二区| 国产有码在线观看| 精品亚洲国产成av人片传媒| 国产午夜视频在线观看| 韩国一区二区电影| 日韩精品一级毛片在线播放| 精品91免费| 91精品91| 亚洲一区二区福利视频| 91一区一区三区| 黄色一级片中国| 91福利区一区二区三区| 亚洲日本在线播放| 久久精品免费电影| 欧美天堂视频| 九色91在线视频| 欧美精品大片| 捷克做爰xxxⅹ性视频| 国产午夜精品久久| 神马久久久久久久| 精品国产一区二区三区忘忧草| 一区二区三区视频在线观看视频| 7777精品视频| 亚洲国产合集| 国产原创popny丨九色| 国产综合色在线| 一区二区三区影视| 欧美日韩免费高清一区色橹橹| 亚洲三级黄色片| 97精品在线视频| 日韩系列在线| 欧美三级在线观看视频| 国产成人精品亚洲日本在线桃色 | 亚洲日本青草视频在线怡红院| 97人妻一区二区精品视频| 亚洲精品美女在线观看播放| 国产午夜精品久久久久免费视| 91精品啪aⅴ在线观看国产| 久久激情电影| 91制片厂毛片| 国产精品国产三级国产a| 18国产免费视频| 在线观看视频99| 99欧美精品| 久久视频免费在线| 国产精品影视在线| 久久久精品视频免费观看| 欧美三日本三级三级在线播放| 国产精品久久久久一区二区国产| 国产精品69久久| 日本成人小视频| 6080国产精品| 一区二区三区免费网站| 国产高中女学生第一次| 久久久成人精品| 国产一区福利| 欧美精品一区免费| 久久夜色精品一区| 伊人久久亚洲综合| 日韩亚洲一区二区| 国产区一区二| 黄色一级视频片| 久久久久国产精品人| 自拍偷拍色综合| 久久成人这里只有精品| 日韩欧美一级| 免费在线观看视频a| 成人午夜视频网站| 夜夜爽妓女8888视频免费观看| 中文字幕成人在线| 亚州欧美在线| 91国视频在线| 国产精品色噜噜| 精品国产区一区二| 欧美精品www| 大片网站久久| 中文国产在线观看| 亚洲电影一区二区三区| 9i精品一二三区| 99c视频在线| 男女精品视频| 卡通动漫亚洲综合| 欧美精品一区二区三区一线天视频| 国产v日韩v欧美v| 欧美日韩视频在线一区二区观看视频| 国产一区二区三区黄视频| 日本五十熟hd丰满| 色婷婷综合成人av|