如何使用Go建開(kāi)發(fā)高負(fù)載WebSocket服務(wù)器
嗨,大家好! 我的名字是Sergey Kamardin,我是Mail.Ru的工程師。
介紹
首先介紹我們的故事的上下文,應(yīng)該介紹幾點(diǎn)我們?yōu)槭裁葱枰@個(gè)服務(wù)器。
Mail.Ru有很多有狀態(tài)的系統(tǒng)。 用戶(hù)電子郵件存儲(chǔ)是其中之一。 跟蹤系統(tǒng)中的狀態(tài)變化和系統(tǒng)事件有幾種方法。 這主要是通過(guò)定期系統(tǒng)輪詢(xún)或關(guān)于其狀態(tài)變化的系統(tǒng)通知。
兩種方式都有利弊。 但是當(dāng)涉及郵件時(shí),用戶(hù)收到新郵件的速度越快越好。
郵件輪詢(xún)涉及每秒大約50,000個(gè)HTTP查詢(xún),其中60%返回304狀態(tài),這意味著郵箱沒(méi)有變化。
因此,為了減少服務(wù)器上的負(fù)載并加快郵件傳遞給用戶(hù),決定通過(guò)編寫(xiě)發(fā)布-訂閱服務(wù)器,一方面將接收有關(guān)狀態(tài)更改的通知,另一方面則會(huì)收到這種通知的訂閱。
先前
現(xiàn)在
***個(gè)方案顯示了以前的樣子。 瀏覽器定期輪詢(xún)API,并查詢(xún)有關(guān)Storage(郵箱服務(wù))的更改。
第二個(gè)方案描述了新架構(gòu)。 瀏覽器與通知API建立WebSocket連接,通知API是Bus服務(wù)器的客戶(hù)端。收到新的電子郵件后,Storage會(huì)向Bus(1)發(fā)送一條通知,由Bus發(fā)送到訂閱者。 API確定連接以發(fā)送接收到的通知,并將其發(fā)送到用戶(hù)的瀏覽器(3)。
所以今天我們將討論API或WebSocket服務(wù)器。 我們的服務(wù)器將有大約300萬(wàn)個(gè)在線(xiàn)連接。
實(shí)現(xiàn)方式
讓我們看看如何使用Go函數(shù)實(shí)現(xiàn)服務(wù)器的某些部分,而無(wú)需任何優(yōu)化。
在進(jìn)行net/http ,我們來(lái)談?wù)勎覀內(nèi)绾伟l(fā)送和接收數(shù)據(jù)。 站在WebSocket協(xié)議(例如JSON對(duì)象) 之上的數(shù)據(jù)在下文中將被稱(chēng)為分組 。
我們開(kāi)始實(shí)現(xiàn)包含通過(guò)WebSocket連接發(fā)送和接收這些數(shù)據(jù)包的Channel結(jié)構(gòu)。
channel 結(jié)構(gòu)
- // Packet represents application level data.
- type Packet struct {
- ...
- }
- // Channel wraps user connection.
- type Channel struct {
- conn net.Conn // WebSocket connection.
- send chan Packet // Outgoing packets queue.
- }
- func NewChannel(conn net.Conn) *Channel {
- c := &Channel{
- conn: conn,
- send: make(chan Packet, N),
- }
- go c.reader()
- go c.writer()
- return c
- }
注意這里有reader和writer連個(gè)goroutines。 每個(gè)goroutine都需要自己的內(nèi)存棧, 根據(jù)操作系統(tǒng)和Go版本可能具有2到8 KB的初始大小。
在300萬(wàn)個(gè)在線(xiàn)連接的時(shí)候,我們將需要24 GB的內(nèi)存 (堆棧為4 KB)用于維持所有連接。 這還沒(méi)有計(jì)算為Channel結(jié)構(gòu)分配的內(nèi)存,傳出的數(shù)據(jù)包c(diǎn)h.send和其他內(nèi)部字段消耗的內(nèi)存。
- I/O goroutines
我們來(lái)看看“reader”的實(shí)現(xiàn):
- func (c *Channel) reader() {
- // We make a buffered read to reduce read syscalls.
- buf := bufio.NewReader(c.conn)
- for {
- pkt, _ := readPacket(buf)
- c.handle(pkt)
- }
- }
這里我們使用bufio.Reader來(lái)減少read() syscalls的數(shù)量,并讀取與buf緩沖區(qū)大小一樣的數(shù)量。 在***循環(huán)中,我們期待新數(shù)據(jù)的到來(lái)。 請(qǐng)記住: 預(yù)計(jì)新數(shù)據(jù)將會(huì)來(lái)臨。 我們稍后會(huì)回來(lái)。
我們將離開(kāi)傳入數(shù)據(jù)包的解析和處理,因?yàn)閷?duì)我們將要討論的優(yōu)化不重要。 但是, buf現(xiàn)在值得我們注意:默認(rèn)情況下,它是4 KB,這意味著我們需要另外12 GB內(nèi)存。 “writer”有類(lèi)似的情況:
- func (c *Channel) writer() {
- // We make buffered write to reduce write syscalls.
- buf := bufio.NewWriter(c.conn)
- for pkt := range c.send {
- _ := writePacket(buf, pkt)
- buf.Flush()
- }
- }
我們遍歷c.send ,并將它們寫(xiě)入緩沖區(qū)。細(xì)心讀者已經(jīng)猜到的,我們的300萬(wàn)個(gè)連接還將消耗12 GB的內(nèi)存。
HTTP
我們已經(jīng)有一個(gè)簡(jiǎn)單的Channel實(shí)現(xiàn),現(xiàn)在我們需要一個(gè)WebSocket連接才能使用。
注意:如果您不知道WebSocket如何工作。客戶(hù)端通過(guò)稱(chēng)為升級(jí)的特殊HTTP機(jī)制切換到WebSocket協(xié)議。 在成功處理升級(jí)請(qǐng)求后,服務(wù)器和客戶(hù)端使用TCP連接來(lái)交換二進(jìn)制WebSocket幀。 這是連接中的框架結(jié)構(gòu)的描述。
- import (
- "net/http"
- "some/websocket"
- )
- http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
- conn, _ := websocket.Upgrade(r, w)
- ch := NewChannel(conn)
- //...
- })
請(qǐng)注意, http.ResponseWriter為bufio.Reader和bufio.Writer (使用4 KB緩沖區(qū))進(jìn)行內(nèi)存分配,用于*http.Request初始化和進(jìn)一步的響應(yīng)寫(xiě)入。
無(wú)論使用什么WebSocket庫(kù),在成功響應(yīng)升級(jí)請(qǐng)求后, 服務(wù)器在responseWriter.Hijack()調(diào)用之后,連同TCP連接一起接收 I/O緩沖區(qū)。
提示:在某些情況下, go:linkname 可用于 通過(guò)調(diào)用 net/http.putBufio{Reader,Writer} 將緩沖區(qū)返回到 net/http 內(nèi) 的 sync.Pool 。
因此,我們需要另外24 GB的內(nèi)存來(lái)維持300萬(wàn)個(gè)鏈接。
所以,我們的程序即使什么都沒(méi)做,也需要72G內(nèi)存。
優(yōu)化
我們來(lái)回顧介紹部分中談到的內(nèi)容,并記住用戶(hù)連接的行為。 切換到WebSocket之后,客戶(hù)端發(fā)送一個(gè)包含相關(guān)事件的數(shù)據(jù)包,換句話(huà)說(shuō)就是訂閱事件。 然后(不考慮諸如ping/pong等技術(shù)信息),客戶(hù)端可能在整個(gè)連接壽***不發(fā)送任何其他信息。
連接壽命可能是幾秒到幾天。
所以在最多的時(shí)候,我們的Channel.reader()和Channel.writer()正在等待接收或發(fā)送數(shù)據(jù)的處理。 每個(gè)都有4 KB的I/O緩沖區(qū)。
現(xiàn)在很明顯,某些事情可以做得更好,不是嗎?
Netpoll
你還記得bufio.Reader.Read()內(nèi)部,Channel.reader()實(shí)現(xiàn)了在沒(méi)有新數(shù)據(jù)的時(shí)候conn.read()會(huì)被鎖。如果連接中有數(shù)據(jù),Go運(yùn)行時(shí)“喚醒”我們的goroutine并允許它讀取下一個(gè)數(shù)據(jù)包。 之后,goroutine再次鎖定,期待新的數(shù)據(jù)。 讓我們看看Go運(yùn)行時(shí)如何理解goroutine必須被“喚醒”。 如果我們看看conn.Read()實(shí)現(xiàn) ,我們將在其中看到net.netFD.Read()調(diào)用 :
- // net/fd_unix.go
- func (fd *netFD) Read(p []byte) (n int, err error) {
- //...
- for {
- n, err = syscall.Read(fd.sysfd, p)
- if err != nil {
- n = 0
- if err == syscall.EAGAIN {
- if err = fd.pd.waitRead(); err == nil {
- continue
- }
- }
- }
- //...
- break
- }
- //...
- }
Go在非阻塞模式下使用套接字。 EAGAIN表示,套接字中沒(méi)有數(shù)據(jù),并且在從空套接字讀取時(shí)不會(huì)被鎖定,操作系統(tǒng)將控制權(quán)返還給我們。
我們從連接文件描述符中看到一個(gè)read()系統(tǒng)調(diào)用。 如果讀取返回EAGAIN錯(cuò)誤 ,則運(yùn)行時(shí)會(huì)使pollDesc.waitRead()調(diào)用 :
- // net/fd_poll_runtime.go
- func (pd *pollDesc) waitRead() error {
- return pd.wait('r')
- }
- func (pd *pollDesc) wait(mode int) error {
- res := runtime_pollWait(pd.runtimeCtx, mode)
- //...
- }
如果我們深入挖掘 ,我們將看到netpoll是使用Linux中的epoll和BSD中的kqueue來(lái)實(shí)現(xiàn)的。 為什么不使用相同的方法來(lái)進(jìn)行連接? 我們可以分配一個(gè)讀緩沖區(qū),只有在真正有必要時(shí)才使用goroutine:當(dāng)套接字中有真實(shí)可讀的數(shù)據(jù)時(shí)。
在github.com/golang/go上, 導(dǎo)出netpoll函數(shù)有問(wèn)題 。
擺脫goroutines
假設(shè)我們有Go的netpoll實(shí)現(xiàn) 。 現(xiàn)在我們可以避免使用內(nèi)部緩沖區(qū)啟動(dòng)Channel.reader() goroutine,并在連接中訂閱可讀數(shù)據(jù)的事件:
- ch := NewChannel(conn)
- // Make conn to be observed by netpoll instance.
- poller.Start(conn, netpoll.EventRead, func() {
- // We spawn goroutine here to prevent poller wait loop
- // to become locked during receiving packet from ch.
- go Receive(ch)
- })
- // Receive reads a packet from conn and handles it somehow.
- func (ch *Channel) Receive() {
- buf := bufio.NewReader(ch.conn)
- pkt := readPacket(buf)
- c.handle(pkt)
- }
使用Channel.writer()更容易,因?yàn)橹挥挟?dāng)我們要發(fā)送數(shù)據(jù)包時(shí),我們才能運(yùn)行g(shù)oroutine并分配緩沖區(qū):
- func (ch *Channel) Send(p Packet) {
- if c.noWriterYet() {
- go ch.writer()
- }
- ch.send <- p
- }
請(qǐng)注意,當(dāng)操作系統(tǒng)在 write() 系統(tǒng)調(diào)用時(shí)返回 EAGAIN 時(shí),我們不處理這種情況 。 對(duì)于這種情況,我們傾向于Go運(yùn)行時(shí)那樣處理。 如果需要,它可以以相同的方式來(lái)處理。
從ch.send (一個(gè)或幾個(gè))讀出傳出的數(shù)據(jù)包后,writer將完成其操作并釋放goroutine棧和發(fā)送緩沖區(qū)。
***! 通過(guò)擺脫兩個(gè)連續(xù)運(yùn)行的goroutine中的堆棧和I/O緩沖區(qū),我們節(jié)省了48 GB 。
資源控制
大量的連接不僅涉及高內(nèi)存消耗。 在開(kāi)發(fā)服務(wù)器時(shí),我們會(huì)經(jīng)歷重復(fù)的競(jìng)爭(zhēng)條件和死鎖,常常是所謂的自動(dòng)DDoS,這種情況是當(dāng)應(yīng)用程序客戶(hù)端肆意嘗試連接到服務(wù)器,從而破壞服務(wù)器。
例如,如果由于某些原因我們突然無(wú)法處理ping/pong消息,但是空閑連接的處理程序會(huì)關(guān)閉這樣的連接(假設(shè)連接斷開(kāi),因此沒(méi)有提供數(shù)據(jù)),客戶(hù)端會(huì)不斷嘗試連接,而不是等待事件。
如果鎖定或超載的服務(wù)器剛剛停止接受新連接,并且負(fù)載均衡器(例如,nginx)將請(qǐng)求都傳遞給下一個(gè)服務(wù)器實(shí)例,那壓力將是巨大的。
此外,無(wú)論服務(wù)器負(fù)載如何,如果所有客戶(hù)端突然想要以任何原因發(fā)送數(shù)據(jù)包(大概是由于錯(cuò)誤原因),則先前節(jié)省的48 GB將再次使用,因?yàn)槲覀儗?shí)際恢復(fù)到初始狀態(tài)goroutine和并對(duì)每個(gè)連接分配緩沖區(qū)。
Goroutine池
我們可以使用goroutine池來(lái)限制同時(shí)處理的數(shù)據(jù)包數(shù)量。 這是一個(gè)go routine池的簡(jiǎn)單實(shí)現(xiàn):
- package gopool
- func New(size int) *Pool {
- return &Pool{
- work: make(chan func()),
- sem: make(chan struct{}, size),
- }
- }
- func (p *Pool) Schedule(task func()) error {
- select {
- case p.work <- task:
- case p.sem <- struct{}{}:
- go p.worker(task)
- }
- }
- func (p *Pool) worker(task func()) {
- defer func() { <-p.sem }
- for {
- task()
- task = <-p.work
- }
- }
現(xiàn)在我們的netpoll代碼如下:
- pool := gopool.New(128)
- poller.Start(conn, netpoll.EventRead, func() {
- // We will block poller wait loop when
- // all pool workers are busy.
- pool.Schedule(func() {
- Receive(ch)
- })
- })
所以現(xiàn)在我們讀取數(shù)據(jù)包可以在池中使用了空閑的goroutine。
同樣,我們將更改Send() :
- pool := gopool.New(128)
- func (ch *Channel) Send(p Packet) {
- if c.noWriterYet() {
- pool.Schedule(ch.writer)
- }
- ch.send <- p
- }
而不是go ch.writer() ,我們想寫(xiě)一個(gè)重用的goroutine。 因此,對(duì)于N goroutines池,我們可以保證在N請(qǐng)求同時(shí)處理并且到達(dá)N + 1我們不會(huì)分配N(xiāo) + 1緩沖區(qū)進(jìn)行讀取。 goroutine池還允許我們限制新連接的Accept()和Upgrade() ,并避免大多數(shù)情況下被DDoS打垮。
零拷貝升級(jí)
讓我們從WebSocket協(xié)議中偏離一點(diǎn)。 如前所述,客戶(hù)端使用HTTP升級(jí)請(qǐng)求切換到WebSocket協(xié)議。 協(xié)議是樣子:
- GET /ws HTTP/1.1
- Host: mail.ru
- Connection: Upgrade
- Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
- Sec-Websocket-Version: 13
- Upgrade: websocket
- HTTP/1.1 101 Switching Protocols
- Connection: Upgrade
- Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
- Upgrade: websocket
也就是說(shuō),在我們的例子中,我們需要HTTP請(qǐng)求和header才能切換到WebSocket協(xié)議。 這個(gè)知識(shí)點(diǎn)和http.Request的內(nèi)部實(shí)現(xiàn)表明我們可以做優(yōu)化。我們會(huì)在處理HTTP請(qǐng)求時(shí)拋棄不必要的內(nèi)存分配和復(fù)制,并放棄標(biāo)準(zhǔn)的net/http服務(wù)器。
例如, http.Request 包含一個(gè)具有相同名稱(chēng)的頭文件類(lèi)型的字段,它通過(guò)將數(shù)據(jù)從連接復(fù)制到值字符串而無(wú)條件填充所有請(qǐng)求頭。 想像一下這個(gè)字段中可以保留多少額外的數(shù)據(jù),例如大型Cookie頭。
但是要做什么呢?
WebSocket實(shí)現(xiàn)
不幸的是,在我們的服務(wù)器優(yōu)化時(shí)存在的所有庫(kù)都允許我們對(duì)標(biāo)準(zhǔn)的net/http服務(wù)器進(jìn)行升級(jí)。 此外,所有庫(kù)都不能使用所有上述讀寫(xiě)優(yōu)化。 為使這些優(yōu)化能夠正常工作,我們必須使用一個(gè)相當(dāng)?shù)图?jí)別的API來(lái)處理WebSocket。 要重用緩沖區(qū),我們需要procotol函數(shù)看起來(lái)像這樣:
- func ReadFrame(io.Reader) (Frame, error)
- func WriteFrame(io.Writer, Frame) error
如果我們有一個(gè)這樣的API的庫(kù),我們可以從連接中讀取數(shù)據(jù)包,如下所示(數(shù)據(jù)包寫(xiě)入看起來(lái)差不多):
- // getReadBuf, putReadBuf are intended to
- // reuse *bufio.Reader (with sync.Pool for example).
- func getReadBuf(io.Reader) *bufio.Reader
- func putReadBuf(*bufio.Reader)
- // readPacket must be called when data could be read from conn.
- func readPacket(conn io.Reader) error {
- buf := getReadBuf()
- defer putReadBuf(buf)
- buf.Reset(conn)
- frame, _ := ReadFrame(buf)
- parsePacket(frame.Payload)
- //...
- }
簡(jiǎn)而言之,現(xiàn)在是制作我們自己庫(kù)的時(shí)候了。
- github.com/gobwas/ws
為了避免將協(xié)議操作邏輯強(qiáng)加給用戶(hù),我們編寫(xiě)了WS庫(kù)。 所有讀寫(xiě)方法都接受標(biāo)準(zhǔn)的io.Reader和io.Writer接口,可以使用或不使用緩沖或任何其他I/O包裝器。
除了來(lái)自標(biāo)準(zhǔn)net/http升級(jí)請(qǐng)求之外, ws支持零拷貝升級(jí) ,升級(jí)請(qǐng)求的處理和切換到WebSocket,而無(wú)需內(nèi)存分配或復(fù)制。 ws.Upgrade()接受io.ReadWriter ( net.Conn實(shí)現(xiàn)了這個(gè)接口)。 換句話(huà)說(shuō),我們可以使用標(biāo)準(zhǔn)的net.Listen()并將接收到的連接從ln.Accept()立即傳遞給ws.Upgrade() 。 該庫(kù)可以復(fù)制任何請(qǐng)求數(shù)據(jù)以供將來(lái)在應(yīng)用程序中使用(例如, Cookie以驗(yàn)證會(huì)話(huà))。
以下是升級(jí)請(qǐng)求處理的基準(zhǔn) :標(biāo)準(zhǔn)net/http服務(wù)器與net.Listen()加零拷貝升級(jí):
- BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
- BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
切換到ws和零拷貝升級(jí)節(jié)省了另外24 GB內(nèi)存 - 這是由net/http處理程序請(qǐng)求處理時(shí)為I/O緩沖區(qū)分配的空間。
概要
讓我們結(jié)合代碼告訴你我們做的優(yōu)化。
- 讀取內(nèi)部緩沖區(qū)的goroutine是非常昂貴的。 解決方案 :netpoll(epoll,kqueue); 重用緩沖區(qū)。
- 寫(xiě)入內(nèi)部緩沖區(qū)的goroutine是非常昂貴的。 解決方案 :必要時(shí)啟動(dòng)goroutine; 重用緩沖區(qū)。
- DDOS,netpoll將無(wú)法工作。 解決方案 :重新使用數(shù)量限制的goroutines。
- net/http不是處理升級(jí)到WebSocket的最快方法。 解決方案 :在連接上使用零拷貝升級(jí)。
這就是服務(wù)器代碼的樣子:
- import (
- "net"
- "github.com/gobwas/ws"
- )
- ln, _ := net.Listen("tcp", ":8080")
- for {
- // Try to accept incoming connection inside free pool worker.
- // If there no free workers for 1ms, do not accept anything and try later.
- // This will help us to prevent many self-ddos or out of resource limit cases.
- err := pool.ScheduleTimeout(time.Millisecond, func() {
- conn := ln.Accept()
- _ = ws.Upgrade(conn)
- // Wrap WebSocket connection with our Channel struct.
- // This will help us to handle/send our app's packets.
- ch := NewChannel(conn)
- // Wait for incoming bytes from connection.
- poller.Start(conn, netpoll.EventRead, func() {
- // Do not cross the resource limits.
- pool.Schedule(func() {
- // Read and handle incoming packet(s).
- ch.Recevie()
- })
- })
- })
- if err != nil {
- time.Sleep(time.Millisecond)
- }
- }
結(jié)論
過(guò)早優(yōu)化是萬(wàn)惡之源。 Donald Knuth
當(dāng)然,上述優(yōu)化是有意義的,但并非所有情況都如此。 例如,如果可用資源(內(nèi)存,CPU)和在線(xiàn)連接數(shù)之間的比例相當(dāng)高(服務(wù)器很閑),則優(yōu)化可能沒(méi)有任何意義。 但是,您可以從哪里需要改進(jìn)以及改進(jìn)內(nèi)容中受益匪淺。




















