Go語言 非阻塞io

2018-07-25 16:08 更新

Go提供的網絡接口,在用戶層是阻塞的,這樣最符合人們的編程習慣。在runtime層面,是用epoll/kqueue實現的非阻塞io,為性能提供了保障。

如何實現

底層非阻塞io是如何實現的呢?簡單地說,所有文件描述符都被設置成非阻塞的,某個goroutine進行io操作,讀或者寫文件描述符,如果此刻io還沒準備好,則這個goroutine會被放到系統(tǒng)的等待隊列中,這個goroutine失去了運行權,但并不是真正的整個系統(tǒng)“阻塞”于系統(tǒng)調用。

后臺還有一個poller會不停地進行poll,所有的文件描述符都被添加到了這個poller中的,當某個時刻一個文件描述符準備好了,poller就會喚醒之前因它而阻塞的goroutine,于是goroutine重新運行起來。

這個poller是在后臺一直運行的,前面分析系統(tǒng)調度章節(jié)時為了簡化并沒有提起它。其實在proc.c文件中,runtime.main函數的第一行代碼就是

newm(sysmon, nil);

這個意思就是新建一個M并讓它運行sysmon函數,前面說過M就是機器的抽象,它會直接開一個物理線程。sysmon里面是個死循環(huán),每睡眠一小會兒就會調用runtime.epoll函數,這個sysmon就是所謂的poller。

poller是一個比gc更高優(yōu)先級的東西,何以見得呢?首先,垃圾回收只是用runtime.newproc建立出來的,它僅僅是個goroutine任務,而poller是直接用newm建立出來的,它跟startm是平級的。也就相當于gc只是線程池里的任務,而poller自身直接就是worker。然后,gc只是被觸發(fā)性地發(fā)生的,是被動的。而poller卻是每隔很短時間就會主動運行。

封裝層次

從最原始的epoll系統(tǒng)調用,到提供給用戶的網絡庫函數,可以分成三個封裝層次。這三個層次分別是,依賴于系統(tǒng)的api封裝,平臺獨立的runtime封裝,提供給用戶的庫的封裝。

最下面一層是依賴于系統(tǒng)部分的封裝。各個平臺下的實現并不一樣,比如linux下是封裝的epoll,freebsd下是封裝的kqueue。以linux為例,實現了一組調用epoll相關系統(tǒng)調用的封裝:

int32 runtime·epollcreate(int32 size);
int32 runtime·epollcreate1(int32 flags);
int32 runtime·epollctl(int32 epfd, int32 op, int32 fd, EpollEvent *ev);
int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
void runtime·closeonexec(int32 fd);

它們都是直接使用匯編調用系統(tǒng)調用實現的,比如:

TEXT runtime·epollcreate1(SB),7,$0
    MOVL    8(SP), DI
    MOVL    $291, AX            // syscall entry
    SYSCALL
    RET

這些函數還要繼續(xù)被封裝成下面一組函數:

runtime·netpollinit(void);
runtime·netpollopen(int32 fd, PollDesc *pd);
runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);

runtime·netpollinit是對poller進行初始化。 runtime·netpollopen是對fd和pd進行關聯,實現邊沿觸發(fā)通知。 runtime·netpollready,使用前必須調用這個函數來表示fd是就緒的

不管是哪個平臺,最終都會將依賴于系統(tǒng)的部分封裝好,提供上面這樣一組函數供runtime使用。

接下來是平臺獨立的poller的封裝,也就是runtime層的封裝。這一層封裝是最復雜的,它對外提供的一組接口是:

func runtime_pollServerInit()
func runtime_pollOpen(fd int) (pd *PollDesc, errno int)
func runtime_pollClose(pd *PollDesc)
func runtime_pollReset(pd *PollDesc, mode int) (err int)
func runtime_pollWait(pd *PollDesc, mode int) (err int)
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int)
func runtime_pollUnblock(pd *PollDesc)

這一組函數是由runtime封裝好,提供給net包調用的。里面定義了一個PollDesc的結構體,將fd和對應的goroutine封裝起來,從而實現當goroutine讀寫fd阻塞時,將goroutine變?yōu)镚waiting。等一下回頭再看實現的細節(jié)。

最后一層封裝層次是提供給用戶的net包。在net包中網絡文件描述符都是用一個netFD結構體來表示的,其中有個成員就是pollDesc。

// 網絡文件描述符
type netFD struct {
    sysmu  sync.Mutex
    sysref int

    // must lock both sysmu and pollDesc to write
    // can lock either to read
    closing bool

    // immutable until Close
    sysfd       int
    family      int
    sotype      int
    isConnected bool
    sysfile     *os.File
    net         string
    laddr       Addr
    raddr       Addr

    // serialize access to Read and Write methods
    rio, wio sync.Mutex

    // wait server
    pd pollDesc
}

所有用戶的net包的調用最終調用到pollDesc的上面那一組函數中,這樣就實現了當goroutine讀或寫阻塞時會被放到等待隊列。最終的效果就是用戶層阻塞,底層非阻塞。

文件描述符和goroutine

當一個goroutine進行io阻塞時,會去被放到等待隊列。這里面就關鍵的就是建立起文件描述符和goroutine之間的關聯。pollDesc結構體就是完成這個任務的。它的結構體定義如下:

struct PollDesc
{
    PollDesc* link;    // in pollcache, protected by pollcache.Lock
    Lock;        // protectes the following fields
    int32    fd;
    bool    closing;
    uintptr    seq;    // protects from stale timers and ready notifications
    G*    rg;    // 因讀這個fd而阻塞的G,等待READY信號
    Timer    rt;    // read deadline timer (set if rt.fv != nil)
    int64    rd;    // read deadline
    G*    wg;    // 因寫這個fd而阻塞的goroutines
    Timer    wt;
    int64    wd;
};

這個結構體是重用的,其中l(wèi)ink就是將它鏈起來。PollDesc對象必須是類型穩(wěn)定的,因為在描述符關閉/重用之后我們會得到epoll/kqueue就緒通知。結構體中有一個seq序號,穩(wěn)定的通知是通過使用這個序號實現的,當deadline改變或者描述符重用時,序號會增加。

runtime_pollServerInit的實現就是調用更下層的runtime·netpollinit函數。 runtime_pollOpen從PollDesc結構體緩存中拿一個出來,設置好它的fd。之所以叫Open而不是new,就是因為PollDesc結構體是重用的。 runtime_pollClose函數調用runtime·netpollclose后將PollDesc結構體放回緩存。

這些都還沒涉及到fd與goroutine交互部分,僅僅是直接對epoll的調用。從下面這個函數可以看到fd與goroutine交互部分:

func runtime_pollWait(pd *PollDesc, mode int) (err int)

它會調用到netpollblock,這個函數是這樣子的:

static void
netpollblock(PollDesc *pd, int32 mode)
{
    G **gpp;

    gpp = &pd->rg;
    if(mode == 'w')
        gpp = &pd->wg;
    if(*gpp == READY) {
        *gpp = nil;
        return;
    }
    if(*gpp != nil)
        runtime·throw("epoll: double wait");
    *gpp = g;
    runtime·park(runtime·unlock, &pd->Lock, "IO wait");
    runtime·lock(pd);
}

最后的runtime.park函數,就是將當前的goroutine(調用者)設置為waiting狀態(tài)。

上面這一部分是goroutine被放到等待隊列的部分,下面看它被喚醒的部分。在sysmon函數中,會不停地調用runtime.epoll,這個函數對就緒的網絡連接進行poll,返回可運行的goroutine。epoll只能知道哪個fd就緒了,那么它怎么知道哪個goroutine就緒了呢?原來epoll的data域存放的就是PollDesc結構體指針。因此就可以得到其中的goroutine了。


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號