從零開始實現簡單 RPC 框架 6:網路通訊之 Netty

網路通訊的開發,就涉及到一些開發框架:

Java NIO

Netty

Mina

等等。

理論上來說,類似於序列化器,可以為其定義一套統一的介面,讓不同型別的框架實現,事實上,

Dubbo

就是這麼幹的。

但是,作為一個簡單的 RPC 框架,

ccx-rpc

就先不統一了,因為基本上網路框架是不會換的,而且統一起來程式碼量巨大。

ccx-rpc

選擇的網路框架是

Netty

Netty

是一款大名鼎鼎的非同步事件驅動的網路應用程式框架,支援快速地開發可維護的高效能的面向協議的伺服器和客戶端。

Netty

在 JDK 自帶的

NIO

基礎之上進行了封裝,解決了 JDK 自身的一些問題,具備如下優點:

入門簡單,使用方便,文件齊全,無其他依賴,只依賴 JDK 就夠了。

高效能,高吞吐,低延遲,資源消耗少。

靈活的執行緒模型,支援阻塞和非阻塞的I/O 模型。

程式碼質量高,目前主流版本基本沒有 Bug。

下面我們先來介紹一下

Netty

的核心設計吧。

Netty 執行緒模型設計

#

服務收到請求之後,執行的邏輯大致有:編解碼、訊息派發、業務處理以及返回響應。這些邏輯是放到一個執行緒序列執行,還是分配到不同執行緒中執行,會對程式的效能產生很大的影響。優秀的執行緒模型對一個高效能網路庫來說是至關重要的。

Netty 採用了

Reactor

執行緒模型的設計。

什麼是 Reactor

#

Wikipedia 的定義是:

The reactor design pattern is an

event handling

pattern for handling service requests delivered concurrently to a service handler by

one or more inputs

。 The service handler then

demultiplexes

the incoming requests and

dispatches

them synchronously to the associated request handlers。

從上面的定義可以看出有幾個重點:

事件驅動

能處理一個或多個輸入源

多路複用、分發事件給對應的處理器

Reactor 執行緒模型有幾個角色:

Reactor:負責響應事件,將事件分發給綁定了該事件的 Handler 處理;

Handler:事件處理器,綁定了某類事件,負責對事件進行處理;

Acceptor:Handler 的一種,綁定了連線事件。當客戶端發起連線請求時,Reactor 會將 accept 事件分發給 Acceptor 處理。

簡單來說,其核心原理是 Reactor 負責監聽事件,在監聽到事件之後,分發給相關執行緒的處理器進行處理。

為什麼用 Reactor

#

我們先來看看

傳統阻塞 I/O 模型的缺點

每個連線都需要獨立執行緒處理,當併發數大時,建立執行緒數多,佔用資源

採用阻塞 I/O 模型,連線建立後,若當前執行緒沒有資料可讀,執行緒會阻塞在讀操作上,造成資源浪費

針對傳統阻塞 I/O 模型的兩個問題,可以採用如下的方案

基於池化思想,避免為每個連線建立執行緒,連線完成後將業務處理交給執行緒池處理

基於 I/O 複用模型,多個連線共用同一個阻塞物件,不用等待所有的連線。遍歷到有新資料可以處理時,作業系統會通知程式,執行緒跳出阻塞狀態,進行業務邏輯處理

Reactor 執行緒模型的思想就是

執行緒池

I/O複用

的結合。

為了幫助你更好地瞭解 Netty 執行緒模型的設計理念,我們將從最基礎的單 Reactor 單執行緒模型開始介紹,然後逐步增加模型的複雜度,最終到 Netty 目前使用的非常成熟的執行緒模型設計。

1。 單 Reactor 單執行緒

#

Reactor 物件監聽客戶端請求事件,收到事件後透過進行分發。

如果是

建立連線

事件,則由

Acceptor

透過 Accept 處理連線請求,然後建立一個 Handler 物件處理連線建立之後的業務請求。

如果是

讀寫

事件,則 Reactor 會將事件分發對應的

Handler

來處理,由單執行緒呼叫 Handler 物件來完成讀取資料、業務處理、傳送響應的完整流程。

具體情況如下圖所示:

從零開始實現簡單 RPC 框架 6:網路通訊之 Netty

單 Reactor 單執行緒的優點就是:執行緒模型簡單,沒有引入多執行緒,自然也就沒有多執行緒併發和競爭的問題。

但其缺點也非常明顯,那就是效能瓶頸問題,一個執行緒只能跑在一個 CPU 上,能處理的連線數是有限的,無法完全發揮多核 CPU 的優勢。一旦某個業務邏輯耗時較長,這唯一的執行緒就會卡在上面,無法處理其他連線的請求,程式進入假死的狀態,可用性也就降低了。正是由於這種限制,一般只會在

客戶端

使用這種執行緒模型。

2。 單 Reactor 多執行緒

#

其流程跟 “單 Reactor 單執行緒” 的流程差不多,也是 Acceptor 處理連線事件,Handler 處理讀寫事件。

唯一的區別就是:Handler 處理請求的時候,使用的是

執行緒池

來處理。

從零開始實現簡單 RPC 框架 6:網路通訊之 Netty

很明顯,單 Reactor 多執行緒的模型可以充分利用多核 CPU 的處理能力,提高整個系統的吞吐量,但引入多執行緒模型就要考慮執行緒併發、資料共享等問題。

在這個模型中,只有

一個執行緒

來處理 Reactor 監聽到的所有 I/O 事件,其中就包括連線建立事件以及讀寫事件,當連線數不斷增大的時候,這個唯一的 Reactor 執行緒也會遇到瓶頸。

3。 主從 Reactor 多執行緒

#

為了解決單 Reactor 多執行緒模型中的問題,我們可以引入多個 Reactor。

Reactor 主執行緒接收建立連線事件,然後給 Acceptor 處理。網路連線建立之後,主 Reactor 會將連線分配給 子 Reactor 進行後續監聽。

子 Reactor 分配到連線之後,負責監聽該連線上的讀寫事件。讀寫事件到來時分發給 Worker 執行緒池的 Handler 處理。

從零開始實現簡單 RPC 框架 6:網路通訊之 Netty

4。 Netty 執行緒模型

#

Netty 同時支援上述幾種執行緒模式,Netty 針對伺服器端的設計是在主從 Reactor 多執行緒模型的基礎上進行的修改,如下圖所示:

從零開始實現簡單 RPC 框架 6:網路通訊之 Netty

Netty 抽象出兩組執行緒池:BossGroup 專門用於接收客戶端的連線,WorkerGroup 專門用於網路的讀寫。

BossGroup 裡的執行緒 會監聽連線事件,與客戶端建立網路連線後,生成相應的 NioSocketChannel 物件,表示一條網路連線。之後會將 NioSocketChannel 註冊到 WorkerGroup 中某個執行緒上。

WorkerGroup 裡的執行緒會監聽對應連線上的讀寫事件,當監聽到讀寫事件的時候,會透過 Pipeline 新增的多個處理器進行處理,每個處理器中都可以包含一定的邏輯,例如編解碼、心跳、業務邏輯等。

Netty 的核心元件

#

介紹完 Netty 優秀的執行緒模型設計,接下來開始介紹 Netty 的核心元件。

1。 EventLoopGroup / EventLoop

#

在前面介紹 Netty 執行緒模型的時候,提到 BossGroup 和 WorkerGroup,他們就是

EventLoopGroup

,一個 EventLoopGroup 當中會包含一個或多個

EventLoop

,EventLoopGroup 提供 next 介面,可以從一組 EventLoop 裡面按照一定規則獲取其中一個 EventLoop 來處理任務。EventLoop 從表面上看是一個不斷迴圈的

執行緒

EventLoop 最常用的實現類是:NioEventLoop,一個 NioEventLoop 包含了一個 Selector 物件, 可以支援多個 Channel 註冊在其上,該 NioEventLoop 可以同時服務多個 Channel,每個 Channel 只能與一個 NioEventLoop 繫結,這樣就實現了執行緒與 Channel 之間的關聯。

EventLoop 並不是一個純粹的 I/O 執行緒,它除了負責 I/O 的讀寫之外,還兼顧處理以下兩類任務:

系統任務

:透過呼叫 NioEventLoop 的

execute(Runnable task)

方法實現,Netty 有很多系統任務,當 I/O 執行緒和使用者執行緒同時操作網路資源時,為了防止併發操作導致的鎖競爭,將使用者執行緒的操作封裝成任務放入訊息佇列中,由 I/O 執行緒負責執行,這樣就實現了局部無鎖化。

定時任務

:透過呼叫 NioEventLoop 的

schedule(Runnable command, long delay, TimeUnit unit)

方法實現。

2。 Channel

#

Channel 是 Netty 對網路連線的抽象,核心功能是執行網路 I/O 操作,是服務端和客戶端進行 I/O 資料互動的媒介。

工作流程:

當客戶端連線成功,將新建一個 Channel 於該客戶端進行繫結

Channel 從 EventLoopGroup 獲得一個 EventLoop,並註冊到該 EventLoop,Channel 生命週期內都和該 EventLoop 在一起(註冊時獲得selectionKey)

Channel 與客戶端進行網路連線、關閉和讀寫,生成相對應的 event(改變selectinKey資訊),觸發 EventLoop 排程執行緒進行執行

如果是讀事件,執行執行緒排程 Pipeline 來處理邏輯

3。 Pipeline

#

上面介紹 Channel 的時候提到,如果是讀事件,則透過 Pipeline 來處理。一個 Channel 對應一個 Pipeline,一個 Pipeline 由多個 Handler 串成一個有序的連結串列,一個 Handler 處理完,呼叫 next 獲得下一個 Handler 進行處理。

從零開始實現簡單 RPC 框架 6:網路通訊之 Netty

上圖黃色部分即為 Handler,一個 Handler 可以是 Inbound、OutBound。處理入站事件時,Handler 按照正向順序執行。處理出站事件時,Handler 按照

反向

順序執行。

常規 Pipeline 的 Handler 註冊程式碼如下:

new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch。pipeline(); // 30 秒之內沒有收到客戶端請求的話就關閉連線 p。addLast(new IdleStateHandler(30, 0, 0, TimeUnit。SECONDS)); // 編解碼器 p。addLast(new NettyEncoder()); p。addLast(new NettyDecoder()); // RPC 訊息處理器 p。addLast(serviceHandlerGroup, new NettyServerHandler()); }}

4。 ByteBuf

#

在進行跨程序遠端互動的時候,我們需要以位元組的形式傳送和接收資料,傳送端和接收端都需要一個高效的資料容器來快取位元組資料,ByteBuf 就扮演了這樣一個數據容器的角色。

ByteBuf 類似於一個

位元組陣列

,其中維護了一個讀索引(

readerIndex

)和一個寫索引(

writerIndex

),分別用來控制對 ByteBuf 中資料的讀寫操作。還有一個

capacity

用來記錄緩衝區的總長度,當寫資料超過

capacity

時,ByteBuf 會自動擴容,直到

capacity

達到

maxCapacity

ByteBuf 的結構如下:

Netty 中主要分為以下三大類 ByteBuf:

Heap Buffer

(堆緩衝區):這是最常用的一種 ByteBuf,它將資料儲存在 JVM 的堆空間,其底層實現是在 JVM 堆內分配一個數組,實現資料的儲存。堆緩衝區可以快速分配,當不使用時也可以由 GC 輕鬆釋放。

建立程式碼:

Unpooled。buffer()

或者

ctx。alloc()。buffer()

Direct Buffer

(直接緩衝區):直接緩衝區會使用堆外記憶體儲存資料,不會佔用 JVM 堆的空間,使用時應該考慮應用程式要使用的最大記憶體容量以及如何及時釋放。直接緩衝區在使用 Socket 傳遞資料時效能很好,當然,它也是有缺點的,因為沒有了 JVM GC 的管理,在分配記憶體空間和釋放記憶體時,比堆緩衝區更復雜,Netty 主要使用記憶體池來解決這樣的問題。

建立程式碼:

Unpooled。directBuffer()

或者

ctx。alloc()。directBuffer()

Composite Buffer

(複合緩衝區):我們可以建立多個不同的 ByteBuf,然後提供一個這些 ByteBuf 組合的檢視,也就是 CompositeByteBuf。它就像一個列表,可以動態新增和刪除其中的 ByteBuf。

例如:一條訊息由 header 和 body 兩部分組成,將 header 和 body 組裝成一條訊息傳送出去,可能 body 相同,只是 header 不同,使用CompositeByteBuf 就不用每次都重新分配一個新的緩衝區。

建立程式碼:

Unpooled。compositeBuffer()

或者

ctx。alloc()。compositeBuffer()

Netty 使用 ByteBuf 物件作為資料容器,進行 I/O 讀寫操作,其實 Netty 的記憶體管理也是圍繞著 ByteBuf 物件高效地分配和釋放。從

記憶體管理

角度來看,ByteBuf 可分為 Unpooled 和 Pooled 兩類。

Unpooled

:是指

非池化

的記憶體管理方式。每次分配時直接呼叫系統 API 向作業系統申請 ByteBuf,在使用完成之後,透過系統呼叫進行釋放。Unpooled 將記憶體管理完全交給系統,不做任何特殊處理,使用起來比較方便,對於申請和釋放操作不頻繁、操作成本比較低的 ByteBuf 來說,是比較好的選擇。

使用示例:

Unpooled。buffer()

Pooled

:是指

池化

的記憶體管理方式。該方式會預先申請一塊大記憶體形成記憶體池,在需要申請 ByteBuf 空間的時候,會將記憶體池中一部分合理的空間封裝成 ByteBuf 給服務使用,使用完成後回收到記憶體池中。前面提到 DirectByteBuf 底層使用的堆外記憶體管理比較複雜,池化技術很好地解決了這一問題。

池化分配器 ByteBufAllocator 需要從 ChannelHandlerContext 中獲取例項:

ByteBufAllocator byteBufAllocator = ctx。alloc()

,然後再生成 ByteBuf 物件:byteBufAllocator。buffer()`

最後,我們來總結一下 ByteBuf 的優點:

透過內建的複合緩衝區型別實現了透明的零複製。

容量可以按需增長。

在讀和寫這兩種模式之間切換不需要呼叫 ByteBuffer 的 flip()方法。

讀和寫使用了不同的索引。

支援引用計數。

支援池化。

總結

#

上面我們介紹了 Netty 優秀的執行緒模型和核心元件,Netty 優秀的設計還有很多,感興趣的讀者可以再去深入瞭解,以上介紹的已經夠寫一個 RPC 框架了。

接下來,我們就要講到網路通訊的核心實現了,敬請期待!