得物自研API網關實踐之路
一、業務背景
老網關使用 Spring Cloud Gateway (下稱SCG)技術框架搭建,SCG基于webflux 編程范式,webflux是一種響應式編程理念,響應式編程對于提升系統吞吐率和性能有很大幫助; webflux 的底層構建在netty之上性能表現優秀;SCG屬于spring生態的產物,具備開箱即用的特點,以較低的使用成本助力得物早期的業務快速發展;但是隨著公司業務的快速發展,流量越來越大,網關迭代的業務邏輯越來越多,以及安全審計需求的不斷升級和穩定性需求的提高,SCG在以下幾個方面逐步暴露了一系列的問題。
網絡安全
從網絡安全角度來講,對公網暴露接口無疑是一件風險極高的事情,網關是對外網絡流量的重要橋梁,早期的接口暴露采用泛化路由的模式,即通過正則形式( /api/v1/app/order/** )的路由規則開放接口,單個應用服務往往只配置一個泛化路由,后續上線新接口時外部可以直接訪問;這帶來了極大的安全風險,很多時候業務開發的接口可能僅僅是內部調用,但是一不小心就被泛化路由開放到了公網,甚至很多時候沒人講得清楚某個服務具體有多少接口屬于對外,多少對內;另一方面從監控數據來看,黑產勢力也在不斷對我們的接口做滲透試探。
協同效率
引入了接口注冊機制,所有對外暴露接口逐一注冊到網關,未注冊接口不可訪問,安全的問題得到了解決但同時帶來了性能問題,SCG采用遍歷方式匹配路由規則,接口注冊模式推廣后路由接口注冊數量迅速提升到3W+,路由匹配性能出現嚴重問題;泛化路由的時代,一個服務只有一個路由配置,變動頻率很低,配置工作由網關關開發人員負責,效率尚可,接口注冊模式將路由工作轉移到了業務開發同學的身上,這就得引入一套完整的路由審核流程,以提升協同效率;由于路由信息早期都存在配置中心,同時這么大的數據量給配置中心也帶來極大的壓力和穩定性風險。
性能與維護成本
業務迭代的不斷增多,也使得API網關堆積了很多的業務邏輯,這些業務邏輯分散在不同的filter中,為了降低開發成本,網關只有一套主線分支,不同集群部署的代碼完全相同,但是不同集群的業務屬性不同,所需要的filter 邏輯是不一樣的;如內網網關集群幾乎沒什么業務邏輯,但是App集群可能需要幾十個filter的邏輯協同工作;這樣的一套代碼對內網網關而言,存在著大量的性能浪費;如何平衡維護成本和運行效率是個需要思考的問題。
穩定性風險
API網關作為基礎服務,承載全站的流量出入,穩定性無疑是第一優先級,但其定位決定了絕不可能是一個簡單的代理層,在穩定運行的同時依然需要承接大量業務需求,例如C端用戶登錄下線能力,App強升能力,B端場景下的鑒權能力等;很難想象較長一段時間以來,網關都保持著雙周一次的發版頻率;頻繁的發版也帶來了一些問題,實例啟動初期有很多資源需要初始化,此時承接的流量處理時間較長,存在著明顯的接口超時現象;早期的每次發版幾乎都會導致下游服務的接口短時間內超時率大幅提高,而且往往設計多個服務一起出現類似情況;為此甚至拉了一個網關發版公告群,提前置頂發版公告,讓業務同學和NOC有一個心里預期;在發布升級期間盡可能讓業務服務無感知這是個剛需。
定制能力
流量灰度是網關最常見的功能之一,對于新版本迭代,業務服務的某個節點發布新版本后希望引入少部分流量試跑觀察,但很遺憾SCG原生并不支持,需要對負載均衡算法進行手動改寫才可以,此外基于流量特征的定向節點路由也需要手動開發,在SCG中整個負載均衡算法屬于比較核心的模塊,不對外直接暴露,存在較高的改造成本。
B端業務和C端業務存在著很大的不同,例如對接口的響應時間的忍受度是不一樣的,B端場景下下載一個報表用戶可以接受等待10s或者1分鐘,但是C端用戶現在沒有這個耐心。作為代理層針對以上的場景,我們需要針對不同接口定制不同的超時時間,原生的SCG顯然也不支持。
諸如此類的定制需求還有還多,我們并不寄希望于開源產品能夠開箱即用滿足全部需求,但至少定制性拓展性足夠好。上手改造成本低。
二、技術痛點
SCG主要使用了webflux技術,webflux的底層構建在reactor-netty之上,而reactor-netty構建于netty之上;SCG能夠和spring cloud 的技術棧的各組件,完美適配,做到開箱即用,以較低的使用成本助力得物早期的業務快速發展;但是使用webflux也是需要付出一定成本,首先它會額外增加編碼人員的心智負擔,需要理解流的概念和常用的操作函數,諸如map, flatmap, defer 等等;其次異步非阻塞的編碼形式,充斥著大量的回調函數,會導致順序性業務邏輯被割裂開來,增加代碼閱讀理理解成本;此外經過多方面評估我們發現SCG存在以下缺點:
內存泄露問題
SCG存在較多的內存泄漏問題,排查困難,且官方遲遲未能修復,長期運行會導致服務觸發OOM并宕機;以下為github上SCG官方開源倉庫的待解決的內存泄漏問題,大約有16個之多。
SCG內存泄漏BUG
下圖可以看到SCG在長期運行的過程中內存使用一直在增長,當增長到機器內存上限時當前節點將不可用,聯系到網關單節點所承接的QPS 在幾千,可想而知節點宕機帶來的危害有多大;一段時間以來我們需要對SCG網關做定期重啟。
SCG生產實例內存增長趨勢
響應式編程范式復雜
基于webflux 中的flux 和mono ,在對request和response信息讀取修改時,編碼復雜度高,代碼理解困難,下圖是對body信息進行修改時的代碼邏輯。
圖片對requestBody 進行修改的方式
多層抽象的性能損耗
盡管相比于傳統的阻塞式網關,SCG的性能已經足夠優秀,但相比原生的netty仍然比較低下,SCG依賴于webflux編程范式,webflux構建于reactor-netty之上,reactor-netty 構建于netty 之上, 多層抽象存在較大的性能損耗。
SCG依賴層級
一般認為程序調用棧越深性能越差;下圖為只有一個filter的情況下的調用棧,可以看到存在大量的 webflux 中的 subscribe() 和onNext() 方法調用,這些方法的執行不關聯任何業務邏輯,屬于純粹的框架運行層代碼,粗略估算下沒有引入任何邏輯的情況下SCG的調用棧深度在 90+ ,如果引入多個filter處理不同的業務邏輯,線程棧將進一步加深,當前網關的業務復雜度實際棧深度會達到120左右,也就是差不多有四分之三的非業務棧損耗,這個比例是有點夸張的。
圖片
SCG filter 調用棧深度
路由能力不完善
原生的的SCG并不支持動態路由管理,路由的配置信息通過大量的KV配置來做,平均一個路由配置需要三到四條KV配置信息來支撐,這些配置數據一般放在諸如Apollo或者ark 這樣的配置中心,即使是添加了新的配置SCG并不能動態識別,需要引入動態刷新路由配置的能力。另一方面路由匹配算法通過遍歷所有的路由信息逐一匹配的模式,當接口級別的路由數量急劇膨脹時,性能是個嚴重問題。
SCG路由匹配算法為On時間復雜度
預熱時間長,冷啟動RT尖刺大
SCG中LoadBalancerClient 會調用choose方法來選擇合適的endpoint 作為本次RPC發起調用的真實地址,由于是懶加載,只有在有真實流量觸發時才會加載創建相關資源;在觸發底層的NamedContextFactory#getContext 方法時存在一個全局鎖導致,woker線程在該鎖上大量等待。
NamedContextFactory#getContext方法存在全局鎖
SCG發布時超時報錯增多
定制性差,數據流控制耦合
SCG在開發運維過程中已經出現了較多的針對源碼改造的場景,如動態路由,路由匹配性能優化等;其設計理念老舊,控制流和數據流混合使用,架構不清晰,如對路由管理操作仍然耦合在filter中,即使引入引入spring mvc方式管理,依然綁定使用webflux編程范式,同時也無法做到控制流端口獨立,存在一定安全風險。
filter中對路由進行管理
三、方案調研
理想中的網關
綜合業務需求和技術痛點,我們發現理想型的網關應該是這個樣子的:
支持海量接口注冊,并能夠在運行時支持動態添加修改路由信息,具備出色路由匹配性能
編程范式盡可能簡單,降低開發人員心智負擔,同時最好是開發人員表較為熟悉的語言
性能足夠好,至少要等同于目前SCG的性能,RT99線和ART較低
穩定性好,無內存泄漏,能夠長時間持續穩定運行,發布升級期間要盡可能下游無感
拓展能力強,支持超時定制,多網絡協議支持,http,Dubbo等,生態完善
架構設計清晰,數據流與控制流分離,集成UI控制面
開源網關對比
基于以上需求,我們對市面上的常見網關進行了調研,以下幾個開源方案對比。
圖片
結合當前團隊的技術棧,我們傾向于選擇Java技術棧的開源產品,唯一可選的只有zuul2 ,但是zuul2路由注冊和穩定性方面也不能夠滿足我們的需求,也沒有實現數控分離的架構設計。因此唯有走上自研之路。
四、自研架構
通常而言代理網關分為透明代理與非透明代理,其主要區別在于對于流量是否存在侵入性,這里的侵入性主要是指對請求和響應數據的修改;顯然API Gateway的定位決定了必然會對流量進行數據調整,常見的調整主要有 添加或者修改head 信息,加密或者解密 query params head ,以及 requestbody 或者responseBody,可以說http請求的每一個部分數據都存在修改的可能性,這要求代理層必須要完全解析數據包信息,而非簡單的做一個路由器轉發功能。
傳統的服務器架構,以reactor架構為主。boss線程和worker線程的明確分工,boss線程負責連接建立創建;worker線程負責已經建立的連接的讀寫事件監聽處理,同時會將部分復雜業務的處理放到獨立的線程池中,進而避免worker線程的執行事件過長影響對網絡事件處理的及時性;由于網關是IO密集型服務,相對來說計算內容較少,可以不必引入這樣的業務線程池;直接基于netty 原生reactor架構實現。
Reactor多線程架構
圖片
為了只求極致性能和降低多線程編碼的數據競爭,單個請求從接收到轉發后端,再到接收后端服務響應,以及最終的回寫給client端,這一些列操作被設計為完全閉合在一個workerEventLoop線程中處理;這需要worker線程中執行的IO類型操作全部實現異步非阻塞化,確保worker線程的高速運轉;這樣的架構和NGINX很類似;我們稱之為 thread-per-core模式。
圖片
API網關組件架構
圖片
數據流控制流分離
數據面板專注于流量代理,不處理任何admin 類請求,控制流監聽獨立的端口,接收管理指令。
圖片
五、核心設計
請求上下文封裝
新的API網關底層仍然基于Netty,其自帶的http協議解析handler可以直接使用。基于netty框架的編程范式,需要將相關在初始化時逐一注冊用到的 Handler。
Client到Proxy鏈路Handler 執行順序
HttpServerCodec 負責HTTP請求的解析;對于體積較大的Http請求,客戶端可能會拆成多個小的數據包進行發送,因此在服務端需要適當的封裝拼接,避免收到不玩整的http請求;HttpObjectAggregator 負責整個請求的拼裝組合。
拿到HTTP請求的全部信息后在業務handler 中進行處理;如果請求體積過大直接拋棄;使用ServerWebExchange 對象封裝請求上下文信息,其中包含了client2Proxy的channel, 以及負責處理該channel 的eventLoop 線程等信息,考慮到整個請求的處理過程中可能可能在不同階段傳遞一些拓展信息,引入了getAttributes 方法 用于存儲需要傳遞的數據;此外ServerWebExchange 接口的基本遵循了SCG的設計規范,保證了在遷移業務邏輯時的最小化改動;具體到實現類,可以參考如下代碼:
@Getter
public class DefaultServerWebExchange implements ServerWebExchange {
private final Channel client2ProxyChannel;
private final Channel proxy2ClientChannel;
private final EventLoop executor;
private ServerHttpRequest request;
private ServerHttpResponse response;
private final Map<String, Object> attributes;
}DefaultServerWebExchange
Client2ProxyHttpHandler作為核心的入口handler 負責將接收到的FullHttpRequest 進行封裝和構建ServerWebExchange 對象,其核心邏輯如下。可以看到對于數據讀取封裝的邏輯較為簡單,并沒有植入常見的業務邏輯,封裝完對象后隨即調用 Request filter chain。
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) {
try {
Channel client2ProxyChannel = ctx.channel();
DefaultServerHttpRequest serverHttpRequest = new DefaultServerHttpRequest(fullHttpRequest, client2ProxyChannel);
ServerWebExchange serverWebExchange = new DefaultServerWebExchange(client2ProxyChannel,(EventLoop) ctx.executor(), serverHttpRequest, null);
// request filter chain
this.requestFilterChain.filter(serverWebExchange);
}catch (Throwable t){
log.error("Exception caused before filters!\n {}",ExceptionUtils.getStackTrace(t));
ByteBufHelper.safeRelease(fullHttpRequest);
throw t;
}
}Client2ProxyHttpHandler 精簡后的代碼
FilterChain設計
FilterChain可以解決異步請求發送出去后,還沒收到響應,但是順序邏輯已經執行完成的尷尬;例如當我們在上文的。
channelRead0 方法中發起某個鑒權RPC調用時,處于性能考慮只能使用非阻塞的方式,按照netty的非阻塞編碼API最終要引入類似如下的 callback 機制,在業務邏輯上在沒有收到RPC的響應之前該請求的處理應該“暫停”,等待收到響應時才能繼續后續的邏輯執行; 也就是下面代碼中的下一步執行邏輯并不能執行,正確的做法是將nextBiz() 方包裹在 callBack() 方法內,由callBack() 觸發后續邏輯的執行;這只是發起一次RPC調用的情況,在實際的的日常研發過程中存在著鑒權,風控,集群限流(Redis)等多次RPC調用,這就導致這樣的非阻塞代碼編寫將異常復雜。
ChannelFuture writeFuture = channel.writeAndFlush(asyncRequest.httpRequest);
writeFuture.addListener(future -> {
if(future.isSuccess()) {
callBack();
}
}
);
nextBiz();非阻塞調用下的業務邏輯編排
對于這樣的復雜場景,采用filterChain模式可以很好的解決;首先RequestFilterChain().filter(serverWebExchange); 后不存在任何邏輯;發起請求時 ,當前filter執行結束,由于此時沒有調用chain.filter(exchange); 所以不會繼續執行下一個filter,發送請求到下游的邏輯也不會執行;當前請求的處理流程暫時中止,eventloop 線程將切換到其他請求的處理過程上;當收到RPC響應時,chain.filter(exchange) 被執行,之前中斷的流程被重新拉起。
public void filter(ServerWebExchange exchange) {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
try {
filter.filter(exchange, chain);
}catch (Throwable e){
log.error("Filter chain unhandle backward exception! Request path {}, FilterClass: {}, exception: {}", exchange.getRequest().getPath(), filter.getClass(), ExceptionUtils.getFullStackTrace(e));
ResponseDecorator.failResponse(exchange,500, "網關內部錯誤!filter chain exception!");
}
}
}基于filterChain的調用模式
對于filter的執行需要定義先后順序,這里參考了SCG的方案,每個filter返回一個order值。不同的地方在于DAG的設計不允許 order值重復,因為在order重復的情況下,很難界定到底哪個Filter 先執行,存在模糊地帶,這不是我們期望看到的;DAG中的Filter 執行順序為order值從小到大,且不允許order值重復。為了易于理解,這里將Filter拆分為了 requestFilter,和responseFilter;分別代表請求的處理階段 和拿到下游響應階段,responseFilter 遵循同樣的邏輯執行順序與不可重復性。
public interface GatewayFilter extends Ordered {
void filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
public interface ResponseFilter extends GatewayFilter { }
public interface RequestFilter extends GatewayFilter { }filter接口設計
路由管理與匹配
以SCG網關注冊的路由數量為基準,網關節點的需要支撐的路由規則數量是上萬級別的,按照得物目前的業務量,上限不超過5W,為了保證匹配性能,路由規則放在分布式緩存中顯然是不合適的,需要保存在節點的內存中。類似于在nginx上配置上萬條location 規則,手動維護難度可想而知,即使在配置中心管理起來也很麻煩,所以需要引入獨立路由管理模塊。
在匹配的效率上也需要進一步優化,SCG的路由匹配策略為普通的循環迭代逐一匹配,時間效率為On,在路由規則膨脹到萬級別后,性能急劇拉胯,結合得物的接口規范,新網關采用Hash匹配模式,將匹配消息提升到O1;hash的key為接口的path, 需要強調的是在同一個網關集群中,path是唯一的,這里的path并不等價于業務服務的接口path, 絕大多數時候存在一些剪裁,例如在業務服務的編寫的/order/detail接口,在網關實際注冊的接口可能為/api/v1/app/order/detail;由于使用了path作為key進行hash匹配。常見的restful 接口顯然是不支持的,確切的講基于path傳參數模式的接口均不支持;出于某些歷史原因,網關保留了類似nginx 的前綴匹配的支持,但是這部分功能不對外開放。
public class Route implements Ordered {
private final String id;
private final int skipCount;
private final URI uri;
}route類設計
route的URI字段中包含了,需要路由到的具體服務名,這里也可以稱之為host ,route 信息會暫存在 exchange對象的 attributes 屬性中, 在后續的loadbalance階段host信息會被進一步替換為真實的 endpoint。
private Route lookupRoute(ServerWebExchange exchange) {
String path = exchange.getRequest().getPath();
CachingRouteLocator locator = (CachingRouteLocator) routeLocator;
Route exactRoute = pathRouteMap.getOrDefault(path, null);
if (exactRoute != null) {
exchange.getAttributes().put(DAGApplicationConfig.GATEWAY_ROUTE_CACHE, route);
return exactRoute;
}
}路由匹配邏輯
單線程閉環
為了更好地利用CPU,以及減少不必要的數據競爭,將單個請求的處理全部閉合在一個線程當中;這意味著這個請求的業務邏輯處理,RPC調用,權限驗證,限流token獲取都將始終由某個固定線程處理。netty中 網絡連接被抽象為channel,channel 與eventloop線程的對應關系為 N對1,一個channel 僅能被一個eventloop 線程所處理,這在處理用戶請求時沒有問題,但是在接收請求完畢向下游轉發請求時,我們碰到了一些挑戰,下游的連接往往是連接池在管理,連接池的管理是另一組eventLoop線程在負責,為了保持閉環需要將連接池的線程設定為處理當前請求的線程,并且只能是這一個線程;這樣一來,默認狀態下啟動的N個線程(N 與機器核心數相同),分別需要管理一個連接池;thread-per-core 模式的性能已經在nginx開源組件上得到驗證。
圖片
連接管理優化
為了滿足單線程閉環,需要將連接池的管理線程設置為當前的 eventloop 線程,最終我們通過threadlocal 進行線程與連接池的綁定;通常情況下netty自帶的連接池 FixedChannelPool 可以滿足我們大部分場景下的需求,這樣的連接池也是適用于多線程的場景;由于新網關使用thread-per-core模式并將請求處理的全生命周期閉合在單個線程中,所有為了線程安全的額外操作不再必要且存在性能浪費;為此需要對原生連接池做一些優化, 連接的獲取和釋放簡化為對鏈表結構的簡單getFirst , addLast。
對于RPC 而言,無論是HTTP,還是Dubbo,Redis等最終底層都需要用到TCP連接,將構建在TCP連接上的數據解析協議與連接剝離后,我們發現這種純粹的連接管理是可以復用的,對于連接池而言不需要知道具體連接的用途,只需要維持到特定endpoint的連接穩定即可,那么這里的RPC服務的連接仍然可以放入連接池中進行托管;最終的連接池設計架構圖。
AsyncClient設計
對于七層流量而言基本全部都是Http請求,同樣在RPC請求中 http協議也占了大多數,考慮到還會存在少量的dubbo, Redis 等協議通信的場景。因此需要抽象出一套異步調用框架來支撐;這樣的框架需要具備超時管理,回調執行,錯誤輸出等功能,更重要的是具備協議無關性質, 為了更方便使用需要支持鏈式調用。
發起一次RPC調用通常可以分為以下幾步:
- 獲取目標地址和使用的協議, 目標服務為集群部署時,需要使用loadbalance模塊
- 封裝發送的請求,這樣的請求在應用層可以具體化為某個Request類,網絡層序列化為二進制數據流
- 出于性能考慮選擇非阻塞式發送,發送動作完成后開始計算超時
- 接收數據響應,由于采用非阻塞模式,這里的發送線程并不會以block的方式等待數據
- 在超時時間內完成數據處理,或者觸發超時導致連接取消或者關閉
AsyncClient 模塊內容并不復雜,AsyncClient為抽象類不區分使用的網絡協議;ConnectionPool 作為連接的管理者被client所引用,獲取連接的key 使用 protocol+ip+port 再適合不過;通常在某個具體的連接初始化階段就已經確定了該channel 所使用的協議,因此初始化時會直接綁定協議Handler;當協議為HTTP請求時,HttpClientCodec 為HTTP請求的編解碼handler;也可以是構建在TCP協議上的 Dubbo, Mysql ,Redis 等協議的handler。
首先對于一個請求的不同執行階段需要引入狀態定位,這里引入了 STATE 枚舉:
enum STATE{
INIT,SENDING,SEND,SEND_SUCCESS,FAILED,TIMEOUT,RECEIVED
}其次在執行過程中設計了 AsyncContext作為信息存儲的載體,內部包含request和response信息,作用類似于上文提到的ServerWebExchange;channel資源從連接池中獲取,使用完成后需要自動放回。
public class AsyncContext<Req, Resp> implements Cloneable{
STATE state = STATE.INIT;
final Channel usedChannel;
final ChannelPool usedChannelPool;
final EventExecutor executor;
final AsyncClient<Req, Resp> agent;
Req request;
Resp response;
ResponseCallback<Resp> responseCallback;
ExceptionCallback exceptionCallback;
int timeout;
long deadline;
long sendTimestamp;
Promise<Resp> responsePromise;
}AsyncContextAsyncClient 封裝了基本的網絡通信能力,不拘泥于某個固定的協議,可以是Redis, http,Dubbo 等。當將數據寫出去之后,該channel的非阻塞調用立即結束,在沒有收到響應之前無法對AsyncContext 封裝的數據做進一步處理,如何在收到數據時將接收到的響應和之前的請求管理起來這是需要面對的問題,channel 對象 的attr 方法可以用于臨時綁定一些信息,以便于上下文切換時傳遞數據,可以在發送數據時將AsyncContext對象綁定到該channel的某個固定key上。當channel收到響應信息時,在相關的 AsyncClientHandler 里面取出AsyncContext。
public abstract class AsyncClient<Req, Resp> implements Client {
private static final int defaultTimeout = 5000;
private final boolean doTryAgain = false;
private final ChannelPoolManager channelPoolManager = ChannelPoolManager.getChannelPoolManager();
protected static AttributeKey<AsyncRequest> ASYNC_REQUEST_KEY = AttributeKey.valueOf("ASYNC_REQUEST");
public abstract ApplicationProtocol getProtocol();
public AsyncContext<Req, Resp> newRequest(EventExecutor executor, String endpoint, Req request) {
final ChannelPoolKey poolKey = genPoolKey(endpoint);
ChannelPool usedChannelPool = channelPoolManager.acquireChannelPool(executor, poolKey);
return new AsyncContext<>(this,executor,usedChannelPool,request, defaultTimeout, executor.newPromise());
}
public void submitSend(AsyncContext<Req, Resp> asyncContext){
asyncContext.state = AsyncContext.STATE.SENDING;
asyncContext.deadline = asyncContext.timeout + System.currentTimeMillis();
ReferenceCountUtil.retain(asyncContext.request);
Future<Resp> responseFuture = trySend(asyncContext);
responseFuture.addListener((GenericFutureListener<Future<Resp>>) future -> {
if(future.isSuccess()){
ReferenceCountUtil.release(asyncContext.request);
Resp response = future.getNow();
asyncContext.responseCallback.callback(response);
}
});
}
/**
* 嘗試從連接池中獲取連接并發送請求,若失敗返回錯誤
*/
private Promise<Resp> trySend(AsyncContext<Req, Resp> asyncContext){
Future<Channel> acquireFuture = asyncContext.usedChannelPool.acquire();
asyncContext.responsePromise = asyncContext.executor.newPromise();
acquireFuture.addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> channelFuture) throws Exception {
sendNow(asyncContext,channelFuture);
}
});
return asyncContext.responsePromise;
}
private void sendNow(AsyncContext<Req, Resp> asyncContext, Future<Channel> acquireFuture){
boolean released = false;
try {
if (acquireFuture.isSuccess()) {
NioSocketChannel channel = (NioSocketChannel) acquireFuture.getNow();
released = true;
assert channel.attr(ASYNC_REQUEST_KEY).get() == null;
asyncContext.usedChannel = channel;
asyncContext.state = AsyncContext.STATE.SEND;
asyncContext.sendTimestamp = System.currentTimeMillis();
channel.attr(ASYNC_REQUEST_KEY).set(asyncContext);
ChannelFuture writeFuture = channel.writeAndFlush(asyncContext.request);
channel.eventLoop().schedule(()-> doTimeout(asyncContext), asyncContext.timeout, TimeUnit.MILLISECONDS);
} else {
asyncContext.responsePromise.setFailure(acquireFuture.cause());
}
} catch (Exception e){
throw new Error("Unexpected Exception.............!");
}finally {
if(!released) {
ReferenceCountUtil.safeRelease(asyncContext.request);
}
}
}
}AsyncClient核心源碼
public class AsyncClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
AsyncContext asyncContext = ctx.attr(AsyncClient.ASYNC_REQUEST_KEY).get();
try {
asyncContext.state = AsyncContext.STATE.RECEIVED;
asyncContext.releaseChannel();
asyncContext.responsePromise.setSuccess(msg);
}catch (Throwable t){
log.error("Exception raised when set Success callback. Exception \n: {}", ExceptionUtils.getFullStackTrace(t));
ByteBufHelper.safeRelease(msg);
throw t;
}
}
}AsyncClientHandler
通過上面幾個類的封裝得到了一個易用使用的 AsyncClient,下面的代碼為調用權限系統的案例:
final FullHttpRequest httpRequest = HttpRequestUtil.getDefaultFullHttpRequest(newAuthReq, serviceInstance, "/auth/newCheckSls");
asyncClient.newRequest(exchange.getExecutor(), endPoint,httpRequest)
.timeout(timeout)
.onComplete(response -> {
String checkResultJson = response.content().toString(CharsetUtil.UTF_8);
response.release();
NewAuthResult result = Jsons.parse(checkResultJson,NewAuthResult.class);
TokenResult tokenResult = this.buildTokenResult(result);
String body = exchange.getAttribute(DAGApplicationConfig.REQUEST_BODY);
if (tokenResult.getUserInfoResp() != null) {
UserInfoResp userInfo = tokenResult.getUserInfoResp();
headers.set("userid", userInfo.getUserid() == null ? "" : String.valueOf(userInfo.getUserid()));
headers.set("username", StringUtils.isEmpty(userInfo.getUsername()) ? "" : userInfo.getUsername());
headers.set("name", StringUtils.isEmpty(userInfo.getName()) ? "" : userInfo.getName());
chain.filter(exchange);
} else {
log.error("{},heads: {},response: {}", path, headers, tokenResult);
int code = tokenResult.getCode() != null ? tokenResult.getCode().intValue() : ResultCode.UNAUTHO.code;
ResponseDecorator.failResponse(exchange, code, tokenResult.getMsg());
}
})
.onError(throwable -> {
log.error("Request service {},occur an exception {}",endPoint, throwable);
ResponseDecorator.failResponseWithStatus(exchange,HttpResponseStatus.INTERNAL_SERVER_ERROR,"AuthFilter 驗證失敗");
})
.sendRequest();asyncClient的使用
請求超時管理
一個請求的處理時間不能無限期拉長, 超過某個閾值的情況下App的頁面會被取消 ,長時間的加載卡頓不如快速報錯帶來的體驗良好;顯然網關需要針對接口做超時處理,尤其是在向后端服務發起請求的過程,通常我們會設置一個默認值,例如3秒鐘,超過這個時間網關會向請求端回寫timeout的失敗信息,由于網關下游接入的服務五花八門,可能是RT敏感型的C端業務,也可能是邏輯較重B端服務接口,甚至是存在大量計算的監控大盤接口。這就導致不同接口對超時時間的訴求不一樣,因此針對每個接口的超時時間設定應該被獨立出來,而不是統一配置成一個值。
asyncClient.newRequest(exchange.getExecutor(), endPoint,httpRequest)
.timeout(timeout)
.onComplete(response -> {
String checkResultJson = response.content().toString(CharsetUtil.UTF_8);
//..........
})
.onError(throwable -> {
log.error("Request service {},occur an exception {}",endPoint, throwable);
ResponseDecorator.failResponseWithStatus(exchange,HttpResponseStatus.INTERNAL_SERVER_ERROR,"AuthFilter 驗證失敗");
})
.sendRequest();asyncClient 的鏈式調用設計了 timeout方法,用于傳遞超時時間,我們可以通過一個全局Map來配置這樣的信息。
Map<String,Integer> 其key為全路徑的path 信息,V為設定的超時時間,單位為ms, 至于Map的信息在實際配置過程中如何承載,使用ARK配置或者Mysql 都很容易實現。處于并發安全和性能的極致追求,超時事件的設定和調度最好能夠在與當前channel綁定的線程中執行,慶幸的是 EventLoop線程自帶schedule 方法。具體來看上文的 AsyncClient 的56行。schedule 方法內部以堆結構的方式實現了對超時時間進行管理,整體性能尚可。
堆外內存管理優化
常見的堆外內存手動管理方式無非是引用計數,不同處理邏輯可能針對 RC (引用計數) 的值做調整,到某個環節的業務邏輯處理后已經不記得當前的引用計數值是多少了,甚至是前面的RC增加了,后面的RC忘記減少了;但換個思路,在數據回寫給客戶端后我們肯定要把這個請求整個生命周期所申請的堆外內存全部釋放掉,堆外內存在回收的時候條件只有一個,就是RC值為0 ,那么在最終的release的時候,我們引入一個safeRelase的思路 , 如果當前的RC>0 就不停的 release ,直至為0;因此只要把這樣的邏輯放在netty的最后一個Handler中即可保證內存得到有效釋放。
public static void safeRelease(Object msg){
if(msg instanceof ReferenceCounted){
ReferenceCounted ref = (ReferenceCounted) msg;
int refCount = ref.refCnt();
for(int i=0; i<refCount; i++){
ref.release();
}
}
}safeRelease
響應時間尖刺優化
由于DAG 選擇了復用spring 的 loadbalance 模塊,但這樣一來就會和SCG一樣存在啟動初期的響應時間尖刺問題;為此我們進一步分析RibbonLoadBalancerClient 的構建過程,發現其用到了NamedContextFactory,該類的 contexts 變量保存了每一個serviceName對應的一個獨立context,這種使用模式帶來大量的性能浪費。
public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>implements DisposableBean, ApplicationContextAware {
//1. contexts 保存 key -> ApplicationContext 的map
private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>();
//........
}在實際運行中 RibbonLoadBalancerClient 會調用choose方法來選擇合適的endpoint 作為本次RPC發起調用的真實地址;choose 方法執行過程中會觸發 getLoadBalancer() 方法執行,可以看到該方法的可以按照傳入的serviceId 獲取專屬于這個服務的LoadBalancer,事實上這樣的設計有點多此一舉。大部分情況下,每個服務的負載均衡算法都一致的,完全可以復用一個LoadBalancer對象;該方法最終是從spring 容器中獲取 LoadBalancer。
class RibbonLoadBalancerClient{
//..........
private SpringClientFactory clientFactory;
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
//.........
}RibbonLoadBalancerClient
由于是懶加載,實際流量觸發下才會執行,因此第一次執行時,RibbonLoadBalancerClient 對象并不存在,需要初始化創建,創建時大量線程并發調用SpringClientFactory#getContext 方法, 鎖在同一個對象上,出現大量的RT尖刺。這也解釋了為什么SCG網關在發布期間會出現響應時間大幅度抖動的現象。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>{
//............
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
//.........
}SpringClientFactory
在后期的壓測過程中,發現 DAG的線程數量遠超預期,基于thread-per-core的架構模式下,過多的線程對性能損害比較大,尤其是當負載上升到較高水位時。上文提到默認情況下,每個服務都會創建獨立loadBalanceClient , 而在其內部又會啟動獨立的線程去同步當前關聯的serviceName對應的可用serverList, 網關的特殊性導致需要接入的服務數量極為龐大,進而導致運行一段時間后DAG的線程數量急劇膨脹,對于同步serverList 這樣的動作而言,完全可以采用非阻塞的方式從注冊中心拉取相關的serverList , 這種模式下單線程足以滿足性能要求。
圖片
serverList的更新前后架構對比
通過預先初始化的方式以及全局只使用1個context的方式,可以將這里冷啟動尖刺消除,改造后的測試結果符合預期。
圖片
通過進一步修改優化spring loadbalance serverList 同步機制,降低90%線程數量的使用。
優化前線程數量(725)
優化后線程數量(72)
集群限流改造優化
首先來看DAG 啟動后sentinel相關線程,類似的問題,線程數量非常多,需要針對性優化。
Sentinel 線程數
sentinel線程分析優化:

最終優化后的線程數量為4個
sentinel原生限流源碼分析如下,進一步分析SphU#entry方法發現其底調用 FlowRuleCheck#passClusterCheck;在passClusterCheck方法中發現底層網絡IO調用為阻塞式,;由于該方法的執行線程為workerEventLoop,因此需要使用上文提到的AsyncClient 進行優化。
private void doSentinelFlowControl(ServerWebExchange exchange, GatewayFilterChain chain, String resource){
Entry urlEntry = null;
try {
if (!StringUtil.isEmpty(resource)) {
//1. 檢測是否限流
urlEntry = SphU.entry(resource, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
}
//2. 通過,走業務邏輯
chain.filter(exchange);
} catch (BlockException e) {
//3. 攔截,直接返回503
ResponseDecorator.failResponseWithStatus(exchange, HttpResponseStatus.SERVICE_UNAVAILABLE, ResultCode.SERVICE_UNAVAILABLE.message);
} catch (RuntimeException e2) {
Tracer.traceEntry(e2, urlEntry);
log.error(ExceptionUtils.getFullStackTrace(e2));
ResponseDecorator.failResponseWithStatus(exchange, HttpResponseStatus.INTERNAL_SERVER_ERROR,HttpResponseStatus.INTERNAL_SERVER_ERROR.reasonPhrase());
} finally {
if (urlEntry != null) {
urlEntry.exit();
}
ContextUtil.exit();
}
}SentinelGatewayFilter(sentinel 適配SCG的邏輯)
public class RedisTokenService implements InitializingBean {
private final RedisAsyncClient client = new RedisAsyncClient();
private final RedisChannelPoolKey connectionKey;
public RedisTokenService(String host, int port, String password, int database, boolean ssl){
connectionKey = new RedisChannelPoolKey(String host, int port, String password, int database, boolean ssl);
}
//請求token
public Future<TokenResult> asyncRequestToken(ClusterFlowRule rule){
....
sendMessage(redisReqMsg,this.connectionKey)
}
private Future<TokenResult> sendMessage(RedisMessage requestMessage, EventExecutor executor, RedisChannelPoolKey poolKey){
AsyncRequest<RedisMessage,RedisMessage> request = client.newRequest(executor, poolKey,requestMessage);
DefaultPromise<TokenResult> tokenResultFuture = new DefaultPromise<>(request.getExecutor());
request.timeout(timeout)
.onComplete(response -> {
...
tokenResultFuture.setSuccess(response);
})
.onError(throwable -> {
...
tokenResultFuture.setFailure(throwable);
}).sendRequest();
return tokenResultFuture;
}
}RedisTokenService
最終的限流Filter代碼如下:
public class SentinelGatewayFilter implements RequestFilter {
@Resource
RedisTokenService tokenService;\
@Override
public void filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//當前為 netty NioEventloop 線程
ServerHttpRequest request = exchange.getRequest();
String resource = request.getPath() != null ? request.getPath() : "";
//判斷是否有集群限流規則
ClusterFlowRule rule = ClusterFlowManager.getClusterFlowRule(resource);
if (rule != null) {
//異步非阻塞請求token
tokenService.asyncRequestToken(rule,exchange.getExecutor())
.addListener(future -> {
TokenResult tokenResult;
if (future.isSuccess()) {
tokenResult = (TokenResult) future.getNow();
} else {
tokenResult = RedisTokenService.FAIL;
}
if(tokenResult == RedisTokenService.FAIL || tokenResult == RedisTokenService.ERROR){
log.error("Request cluster token failed, will back to local flowRule check");
}
ClusterFlowManager.setTokenResult(rule.getRuleId(), tokenResult);
doSentinelFlowControl(exchange, chain, resource);
});
} else {
doSentinelFlowControl(exchange, chain, resource);
}
}
}改造后適配DAG的SentinelGatewayFilter
六、壓測性能
DAG高壓表現
wrk -t32 -c1000 -d60s -s param-delay1ms.lua --latency http://a.b.c.d:xxxxx
DAG網關的QPS、實時RT、錯誤率、CPU、內存監控圖;在CPU占用80% 情況下,能夠支撐的QPS在4.5W。
DAG網關的QPS、RT 折線圖;
圖片
DAG在CPU占用80% 情況下,能夠支撐的QPS在4.5W,ART 19ms
SAG高壓表現
wrk -t32 -c1000 -d60s -s param-delay1ms.lua --latency http://a.b.c.d:xxxxx
SCG網關的QPS、實時RT、錯誤率、CPU、內存監控圖:
圖片
SCG網關的QPS、RT 折線圖:
圖片
SCG在CPU占用95% 情況下,能夠支撐的QPS在1.1W,ART 54.1ms
DAG低壓表現
wrk -t5 -c20 -d120s -s param-delay1ms.lua --latency http://a.b.c.d:xxxxx
DAG網關的QPS、實時RT、錯誤率、CPU、內存:
DAG網關的QPS、RT 折線圖:
DAG在QPS 1.1W情況下,CPU占用30%,ART 1.56ms
數據對比
圖片
結論
滿負載情況下,DAG要比SCG的吞吐量高很多,QPS幾乎是4倍,RT反而消耗更低,SCG在CPU被打滿后,RT表現出現嚴重性能劣化。DAG的吞吐控制和SCG一樣情況下,CPU和RT損耗下降了更多。DAG在最大壓力下,內存消耗比較高,達到了75%左右,不過到峰值后,就不再會有大幅變動了。對比壓測結果,結論令人欣喜,SCG作為Java生態當前使用最廣泛的網關,其性能屬于一線水準,DAG的性能達到其4倍以上也是遠超意料,這樣的結果給與研發同學極大的鼓舞。
七、投產收益
安全性提升
完善的接口級路由管理
基于接口注冊模式的全新路由上線,包含了接口注冊的申請人,申請時間,接口場景備注信息等,接口管理更加嚴謹規范;結合路由組功能可以方便的查詢當前服務的所有對外接口信息,某種程度上具備一定的API查詢管理能力;同時為了緩解用戶需要檢索的接口太多的尷尬,引入了一鍵收藏功能,大部分時候用戶只需要切換到已關注列表即可。
注冊接口列表
接口收藏
防滲透能力極大增強
早期的泛化路由,給黑產的滲透帶來了極大的想象空間和安全隱患,甚至可以在外網直接訪問某些業務的配置信息。
黑產接口滲透
接口注冊模式啟用后,所有未注冊的接口均無法訪問,防滲透能力提升一個臺階,同時自動推送異常接口訪問信息。
404接口訪問異常推送
穩定性增強
內存泄漏問題解決
通過一系列手段改進優化和嚴格的測試,新網關的內存使用更加穩健,內存增長曲線直接拉平,徹底解決了泄漏問題。
老網關內存增長趨勢
新網關內存增長趨勢
響應時間尖刺消除
通過預先初始化 & context 共用等手段,去除了運行時并發創建多個context 搶占全局鎖的開銷,冷啟動RT尖刺降低99% ;關于spring load balance 模塊的更多優化細節可以參考這篇博客:Spring LoadBalance 存在問題與優化。
壓測數據對比
圖片
實際生產監控
趨勢圖上略有差異,但是從非200請求的絕對值上看,這種差異可以忽略, 對比發布期間和非發布期間異常請求的數量,發現基本沒有區別,這代表著以往的發布期間的響應時間尖刺基本消除,做到了發布期間業務服務徹底無感知。
1月4日發布期間各節點流量變化
1月4日異常請求狀態數量監控(發布期間)
1月5日異常請求狀態數量監控(無發布)
降本增效
資源占用下降50% +
SCG平均CPU占用
DAG資源占用
JDK17升級收益
得益于ZGC的優秀算法,JVM17 在GC暫停時間上取得了出色的成果,網關作為延遲敏感型應用對GC的暫停時間尤為看重,為此我們組織升級了JDK17 版本;下面為同等流量壓力情況下的配置不同GC的效果對比,可以看到GC的暫停時間從平均70ms 降低到1ms 內,RT99線得到大幅度提升;吞吐量不再受流量波動而大幅度變化,性能表現更加穩定;同時網關的平均響應時間損耗降低5%。
JDK8-G1 暫停時間表現
JDK17-ZGC暫停時間表現
吞吐量方面,G1伴隨流量的變化呈現出一定的波動趨勢,均線在99.3%左右。ZGC的吞吐量則比較穩定,維持在無限接近100%的水平。
JDK8-G1 吞吐量
JDK17-ZGC吞吐量
對于實際業務接口的影響,從下圖中可以看到平均響應時間有所下降,這里的RT差值表示接口經過網關層的損耗時間;不同接口的RT差值損耗是不同的,這可能和請求響應體的大小,是否經過登錄驗證,風控驗證等業務邏輯有關。
JDK17與JDK8 ART對比
需要指出的是ZGC對于一般的RT敏感型應用有很大提升, 服務的RT 99線得到顯著改善。但是如果當前應用大量使用了堆外內存的方式,則提升相對較弱,如大量使用netty框架的應用, 因為這些應用的大部分數據都是通過手動釋放的方式進行管理。
八、思考總結
架構演進
API網關的自研并非一蹴而就,而是經歷了多次業務迭代循序漸進的過程;從早期的泛化路由引發的安全問題處理,到后面的大量路由注冊,帶來的匹配性能下降 ,以及最終壓垮老網關最后一根稻草的內存泄漏問題;在不同階段需要使用不同的應對策略,早期業務快速迭代,大量的需求堆積,最快的時候一個功能點的改動需要三四天內上線 ,我們很難有足夠的精力去做一些深層次的改造,這個時候需求導向為優先,功能性建設完善優先,是一個快速奔跑的建設期;伴隨體量的增長安全和穩定性的重視程度逐步拔高,繼而推進了這些方面的大量建設;從拓展SCG的原有功能到改進框架源碼,以及最終的自研重寫,可以說新的API網關是一個業務推進而演化出來的產物,也只有這樣 ”生長“ 出來的架構產品才能更好的契合業務發展的需要。
技術思考
開源的API網關有很多,但是自研的案例并不多,我們能夠參考的方案也很有限。除了幾個業界知名的產品外,很多開源的項目參考的價值并不大;從自研的目標來看,我們最基本的要求是性能和穩定性要優于現有的開源產品,至少Java的生態是這樣;這就要求架構設計和代碼質量上必須比現有的開源產品更加優秀,才有可能;為此我們深度借鑒了流量代理界的常青樹Nginx,發現基于Linux 多進程模型下的OS,如果要發揮出最大效能,單CPU核心支撐單進程(線程)是效率最高的模式。可以將OS的進程調度開銷最小化同時將高速緩存miss降到最低,此外還要盡可能減少或者消除數據競爭,避免鎖等待和自旋帶來的性能浪費;DAG的整個技術架構可以簡化的理解為引入了獨立控制流的多線程版的Nginx。
中間件的研發創新存在著較高的難度和復雜性,更何況是在業務不斷推進中換引擎。在整個研發過程中,為了盡可能適配老的業務邏輯,對原有的業務邏輯的改動最小化,新網關對老網關的架構層接口做了全面適配;換句話說新引擎的對外暴露的核心接口與老網關保持一致,讓老的業務邏輯在0改動或者僅改動少量幾行代碼后就能在新網關上直接跑,能夠極大幅度降低我們的測試回歸成本,因為這些代碼本身的邏輯正確性,已經在生產環境得到了大量驗證。這樣的適配器模式同樣適用于其他組件和業務開發。
作為底層基礎組件的開發人員,要對自己寫下的每一行代碼都有清晰的認識,不了解的地方一定要多翻資料,多讀源碼,模棱兩可的理解是絕對不夠的;常見的開源組件雖然說大部分代碼都是資深開發人員寫出來的,但是有程序員的地方就有bug ,要帶著審慎眼光去看到這些組件,而不是一味地使用盲從,所謂盡信書不如無書;很多中間件的基本原理都是相通的,如常見Raft協議,基于epoll的reactor網絡架構,存儲領域的零拷貝技術,預寫日志,常見的索引技術,hash結構,B+樹,LSM樹等等。一個成熟的中間件往往會涉及多個方向的技術內容。研發人員并不需要每一個組件都涉獵極深,也不現實,掌握常見的架構思路和技巧以及一些基本的技術點,做到對一兩個組件做到熟稔于心。思考和理解到位了,很容易觸類旁通。
穩定性把控
自研基礎組件是一項浩大的工程,可以預見代碼量會極為龐大,如何有效管理新項目的代碼質量是個棘手的問題; 原有業務邏輯的改造也需要回歸測試;現實的情況是中間件團隊沒有專職的測試,質量保證完全依賴開發人員;這就對開發人員的代碼質量提出了極高的要求,一方面我們通過與老網關適配相同的代理引擎接口,降低遷移成本和業務邏輯出現bug的概率;另一方面還對編碼質量提出了高標準,平均每周兩到三次的CodeReview;80%的單元測試行覆蓋率要求。
網關作為流量入口,承接全司最高流量,對穩定性的要求極為苛刻。最理想的狀態是在業務服務沒有任何感知的情況下,我們將新網關逐步替換上去;為此我們對新網關上線的過程做了充分的準備,嚴格控制上線過程;具體來看整個上線流程分為以下幾個階段:
第一階段
我們在壓測環境長時間高負載壓測,持續運行時間24小時以上,以檢測內存泄漏等穩定性問題。同時利用性能檢測工具抓取熱點火焰圖,做針對性優化。
第二階段
發布測試環境試跑,采用并行試跑的方式,新老網關同時對外提供服務(流量比例1 :1,初期新網關承接流量可能只有十分之一),一旦用戶反饋的問題可能跟新網關有關,或者發現異常case,立即關停新網關的流量。待查明原因并確認修復后,重新引流。
第三階段
上線預發,小得物環境試跑,由于這些環境流量不大,依然可以并行長時間試跑,發現問題解決問題。
第四階段
生產引流,單節點從萬分之一比例開始灰度,逐步引流放大,每個階段停留24小時以上,觀察修正后再放大,循環此過程;基于單節點承擔正常比例流量后,再次抓取火焰圖,基于真實流量場景下的性能熱點做針對性優化。





































