協(xié)程
協(xié)程,即協(xié)作式程序,其思想是,一系列互相依賴的協(xié)程間依次使用CPU,每次只有一個(gè)協(xié)程工作,而其他協(xié)程處于休眠狀態(tài)。協(xié)程可以在運(yùn)行期間的某個(gè)點(diǎn)上暫停執(zhí)行,并在恢復(fù)運(yùn)行時(shí)從暫停的點(diǎn)上繼續(xù)執(zhí)行。 協(xié)程已經(jīng)被證明是一種非常有用的程序組件,不僅被python、lua、ruby等腳本語言廣泛采用,而且被新一代面向多核的編程語言如golang rust-lang等采用作為并發(fā)的基本單位。 協(xié)程可以被認(rèn)為是一種用戶空間線程,與傳統(tǒng)的搶占式線程相比,有2個(gè)主要的優(yōu)點(diǎn):
與線程不同,協(xié)程是自己主動(dòng)讓出CPU,并交付他期望的下一個(gè)協(xié)程運(yùn)行,而不是在任何時(shí)候都有可能被系統(tǒng)調(diào)度打斷。因此協(xié)程的使用更加清晰易懂,并且多數(shù)情況下不需要鎖機(jī)制。
與線程相比,協(xié)程的切換由程序控制,發(fā)生在用戶空間而非內(nèi)核空間,因此切換的代價(jià)非常的小。
某種意義上,協(xié)程與線程的關(guān)系類似與線程與進(jìn)程的關(guān)系,多個(gè)協(xié)程會(huì)在同一個(gè)線程的上下文之中運(yùn)行。
網(wǎng)絡(luò)編程模型
我們首先來簡(jiǎn)單回顧一下一些常用的網(wǎng)絡(luò)編程模型。網(wǎng)絡(luò)編程模型可以大體的分為同步模型和異步模型兩類。
同步模型:
同步模型使用阻塞IO模式,在阻塞IO模式下調(diào)用read等IO函數(shù)時(shí)會(huì)阻塞線程直到IO完成或失敗。 同步模型的典型代表是thread_per_connection模型,每當(dāng)阻塞在主線程上的accept調(diào)用返回時(shí)則創(chuàng)建一個(gè)新的線程去服務(wù)于新的socket的讀/寫。這種模型的優(yōu)點(diǎn)是程序邏輯簡(jiǎn)潔,符合人的思維;缺點(diǎn)是可伸縮性收到線程數(shù)的限制,當(dāng)連接越來越多時(shí),線程也越來越多,頻繁的線程切換會(huì)嚴(yán)重拖累性能,同時(shí)不得不處理多線程同步的問題。
異步模型:
異步模型一般使用非阻塞IO模式,并配合epoll/select/poll等多路復(fù)用機(jī)制。在非阻塞模式下調(diào)用read,如果沒有數(shù)據(jù)可讀則立即返回,并通知用戶沒有可讀(EAGAIN/EWOULDBLOCK),而非阻塞當(dāng)前線程。異步模型可以使一個(gè)線程同時(shí)服務(wù)于多個(gè)IO對(duì)象。 異步模型的典型代表是reactor模型。在reactor模型中,我們將所有要處理的IO事件注冊(cè)到一個(gè)中心的IO多路復(fù)用器中(一般為epoll/select/poll),同時(shí)主線程阻塞在多路復(fù)用器上。一旦有IO事件到來或者就緒,多路復(fù)用器返回并將對(duì)應(yīng)的IO事件分發(fā)到對(duì)應(yīng)的處理器(即回調(diào)函數(shù))中,最后處理器調(diào)用read/write函數(shù)來進(jìn)行IO操作。
異步模型的特點(diǎn)是性能和可伸縮性比同步模型要好很多,但是其結(jié)構(gòu)復(fù)雜,不易于編寫和維護(hù)。在異步模型中,IO之前的代碼(IO任務(wù)的提交者)和IO之后的處理代碼(回調(diào)函數(shù))是割裂開來的。
【文章福利】由ntyco作者親授的協(xié)程訓(xùn)練營課程,現(xiàn)在免費(fèi)贈(zèng)送給大家,
linux系統(tǒng)下協(xié)程的實(shí)現(xiàn)與原理剖析訓(xùn)練營(上)
linux系統(tǒng)下協(xié)程的實(shí)現(xiàn)與原理剖析訓(xùn)練營(下)
c/c++ linux服務(wù)器開發(fā)免費(fèi)學(xué)習(xí)地址:c/c++ linux后臺(tái)服務(wù)器高級(jí)架構(gòu)師
包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK等
協(xié)程與網(wǎng)絡(luò)編程
協(xié)程的出現(xiàn)出現(xiàn)為克服同步模型和異步模型的缺點(diǎn),并結(jié)合他們的優(yōu)點(diǎn)提供了可能: 現(xiàn)在假設(shè)我們有3個(gè)協(xié)程A,B,C分別要進(jìn)行數(shù)次IO操作。這3個(gè)協(xié)程運(yùn)行在同一個(gè)調(diào)度器或者說線程的上下文中,并依次使用CPU。調(diào)度器在其內(nèi)部維護(hù)了一個(gè)多路復(fù)用器(epoll/select/poll)。 協(xié)程A首先運(yùn)行,當(dāng)它執(zhí)行到一個(gè)IO操作,但該IO操作并沒有立即就緒時(shí),A將該IO事件注冊(cè)到調(diào)度器中,并主動(dòng)放棄CPU。這時(shí)調(diào)度器將B切換到CPU上開始執(zhí)行,同樣,當(dāng)它碰到一個(gè)IO操作的時(shí)候?qū)O事件注冊(cè)到調(diào)度器中,并主動(dòng)放棄CPU。調(diào)度器將C切換到cpu上開始執(zhí)行。當(dāng)所有協(xié)程都被“阻塞”后,調(diào)度器檢查注冊(cè)的IO事件是否發(fā)生或就緒。假設(shè)此時(shí)協(xié)程B注冊(cè)的IO時(shí)間已經(jīng)就緒,調(diào)度器將恢復(fù)B的執(zhí)行,B將從上次放棄CPU的地方接著向下運(yùn)行。A和C同理。 這樣,對(duì)于每一個(gè)協(xié)程來說,它是同步的模型;但是對(duì)于整個(gè)應(yīng)用程序來說,它是異步的模型。
好了,原理說完了,我們來看一個(gè)實(shí)際的例子,echo server。
echo server
在這個(gè)例子中,我們將使用orchid庫來編寫一個(gè)echo server。orchid庫是一個(gè)構(gòu)建于boost基礎(chǔ)上的 協(xié)程/網(wǎng)絡(luò)IO C++庫。
echo server首先必須要處理連接事件,我們創(chuàng)建一個(gè)協(xié)程來專門處理連接事件:
typedef boost::shared_ptr socket_ptr;//處理ACCEPT事件的協(xié)程void handle_accept(orchid::coroutine_handle co) {try{? ? ? ? orchid::acceptor acceptor(co -> get_io_service());//構(gòu)建一個(gè)acceptoracceptor.bind_and_listen("5678",true);for(;;) {? ? ? ? ? ? socket_ptr sock(new orchid::socket(co -> get_scheduler().get_io_service()));? ? ? ? ? ? acceptor.accept(*sock,co);//在調(diào)度器上創(chuàng)建一個(gè)協(xié)程來服務(wù)新的socket。//第一個(gè)參數(shù)是要?jiǎng)?chuàng)建的協(xié)程的main函數(shù),//第二個(gè)參數(shù)是要?jiǎng)?chuàng)建的協(xié)程的棧的大小。co -> get_scheduler().spawn(boost::bind(handle_io,_1,sock),orchid::minimum_stack_size());? ? ? ? }? ? }? ? catch(orchid::io_error& e) {? ? ? ? ORCHID_ERROR("id %lu msg:%s",co->id(),e.what());? ? }}
在orchid中,協(xié)程的main函數(shù)必須滿足函數(shù)簽名void(orchid::coroutine_handle),如handle_accept所示,其中參數(shù)co是協(xié)程句柄,代表了當(dāng)前函數(shù)所位于的協(xié)程。
在上面的代碼中,我們創(chuàng)建了一個(gè)acceptor,并讓它監(jiān)聽5678端口,然后在"阻塞"等待連接到來,當(dāng)連接事件到來時(shí),創(chuàng)建一個(gè)新的協(xié)程來服務(wù)新的socket。處理套接字IO的協(xié)程如下:
//處理SOCKET IO事件的協(xié)程void handle_io(orchid::coroutine_handle co,socket_ptr sock) {orchid::buffered_reader reader(*sock,co,16);//在socket上構(gòu)建緩沖輸入流orchid::buffered_writer writer(*sock,co,16);//在socket上構(gòu)建緩沖輸出流try{? ? std::string line;? ? std::size_t n =0;for(;;) {? ? ? ? n = reader.read_until(line,'\n');? ? ? ? ORCHID_DEBUG("id %lu recv: %s",co->id(),line.c_str());? ? ? ? writer.write(line.c_str(),line.size());? ? ? ? writer.flush();? ? } }catch(constorchid::io_error& e) {if(e.code() == boost::asio::error::eof) {? ? ? ? ORCHID_DEBUG("id %lu msg:%s",co->id(),"socket closed by remote side!");? ? }else{? ? ? ? ORCHID_ERROR("id %lu msg:%s",co->id(),e.what());? ? }}
IO處理協(xié)程首先在傳入的套接字上創(chuàng)建了一個(gè)輸入流和一個(gè)輸出流,分別代表了TCP的輸入和輸出。然后不斷地從輸入流中讀取一行,并輸出到輸出流當(dāng)中。當(dāng)socket上的TCP連接斷開時(shí),會(huì)拋出orchid::io_error的異常,循環(huán)結(jié)束,值得注意的是eof事件也被當(dāng)成異常來拋出。對(duì)于不喜歡使用異常的用戶,orchid提供了另外一套使用boost::system::error_code的接口。同時(shí),對(duì)于熟悉asio的用戶,orchid提供了一套boost asio風(fēng)格的接口。
如果用戶需要無緩沖的讀寫socket或者自建緩沖,可以直接調(diào)用orchid::socket的read和write函數(shù),或者使用無緩沖的reader和writer。
細(xì)心的讀者可能已經(jīng)發(fā)現(xiàn),handle_io的函數(shù)簽名并不滿足void(orchid::coroutine_handle),回到handle_accept中,可以發(fā)現(xiàn),實(shí)際上我們使用了boost.bind對(duì)handle_io函數(shù)進(jìn)行了適配,使之符合函數(shù)簽名的要求。
最后是main函數(shù):
intmain() {orchid::scheduler sche;sche.spawn(handle_accept,orchid::coroutine::minimum_stack_size());//創(chuàng)建協(xié)程sche.run();}
總結(jié)
在上面這個(gè)echo server的例子中,我們采用了一種 coroutine per connection 的編程模型,與傳統(tǒng)的 thread per connection 模型一樣的簡(jiǎn)潔清晰,但是整個(gè)程序?qū)嶋H上運(yùn)行在同一線程當(dāng)中,所以我們也不需要處理多線程同步的問題。 由于協(xié)程的切換開銷遠(yuǎn)遠(yuǎn)小于線程,因此我們可以輕易的同時(shí)啟動(dòng)上千協(xié)程來同時(shí)服務(wù)上千連接,這是 thread per connection的模型很難做到的;在性能方面,整個(gè)底層的IO系統(tǒng)實(shí)際上是使用boost.asio這種高性能的異步io庫實(shí)現(xiàn)的。與IO所費(fèi)的時(shí)間相比,協(xié)程切換的開銷基本可以忽略。 因此通過orchid,我們可以在保持同步IO模型簡(jiǎn)潔性的同時(shí),獲得異步IO模型的高性能和擴(kuò)展性。