skynet源碼分析(8)--skynet的網(wǎng)絡(luò)

作者:shihuaping0918@163.com,轉(zhuǎn)載請注明作者
網(wǎng)絡(luò)部分是一個(gè)服務(wù)器最基礎(chǔ)最核心的部分,這個(gè)技術(shù)也已經(jīng)是非常成熟了,現(xiàn)在已經(jīng)很少有人自己實(shí)現(xiàn)一個(gè)網(wǎng)絡(luò)相關(guān)的庫了。skynet的網(wǎng)絡(luò)庫是自己實(shí)現(xiàn)的。

網(wǎng)絡(luò)底層的技術(shù)在windows上是完成端口(IOCP),在linux上是EPOLL,在mac/freebsd上是kqueue。這些技術(shù)都是能夠承載高負(fù)載高并發(fā)網(wǎng)絡(luò)請求的。IOCP和EPOLL都是異步IO,kqueue我不熟悉,就不敢評論了。

實(shí)際上云風(fēng)只實(shí)現(xiàn)了epoll和kqueue,windows上的變種請自行搜索吧。

epoll和kqueue的實(shí)現(xiàn)分別在skynet_epoll.h和epoll_kqueue.h當(dāng)中。epoll的函數(shù)其實(shí)就是epoll_create/epoll_ctl/epoll_del/epoll_wait這幾個(gè),要注意的是skynet中的epoll_create的參數(shù)是1024。所以連接數(shù)上不去的話很可能就是這里限制了。

skynet在skynet_poll.h中根據(jù)平臺(tái)的不同包含了不同的頭文件,屏蔽了平臺(tái)相關(guān)性。然后在socket_server.c中實(shí)現(xiàn)了網(wǎng)絡(luò)服務(wù)的邏輯。

然后skynet在skynet_socket.c中對socket_server.c中的邏輯再次做了一個(gè)封裝,還添加了socket客戶端相關(guān)的函數(shù),就是connect/send/close之類的函數(shù)。

為了方便lua層使用socket,在lua-socket.c中再將對skynet_socket.c進(jìn)行了一次封裝。這個(gè)封裝就是c語言層和lua語言層的相互轉(zhuǎn)換。目前只支持tcp和udp,基于tcp上的http/websocket之類統(tǒng)統(tǒng)是不支持的。

前面講到socket有一個(gè)單獨(dú)的線程,這個(gè)線程的代碼如下:

static void *
thread_socket(void *p) {
    struct monitor * m = p;
    skynet_initthread(THREAD_SOCKET);
    for (;;) {
        int r = skynet_socket_poll(); //看這里
        if (r==0)
            break;
        if (r<0) {
            CHECK_ABORT
            continue;
        }
        wakeup(m,0);
    }
    return NULL;
}

socket線程一直調(diào)用skynet_socket_poll,這和普通網(wǎng)絡(luò)服務(wù)器寫法是一樣的。普通網(wǎng)絡(luò)服務(wù)器也是創(chuàng)建socket,綁定socket,添加到epoll,然后epoll_wait等待事件的發(fā)生。

skynet中的的連接會(huì)有一個(gè)狀態(tài)流轉(zhuǎn)的過程,可以理解為狀態(tài)機(jī)。

#define SOCKET_TYPE_LISTEN 3  //監(jiān)聽
#define SOCKET_TYPE_CONNECTING 4 //連接中
#define SOCKET_TYPE_CONNECTED 5 //已連接
#define SOCKET_TYPE_HALFCLOSE 6 //半雙工,半連接
#define SOCKET_TYPE_PACCEPT 7 //有連接進(jìn)來
#define SOCKET_TYPE_BIND 8 //綁定

再來重點(diǎn)看一下socket_server.c這個(gè)文件,它做了很多事情,代碼量也比較大,有1800多行。這個(gè)文件要詳細(xì)的講呢,一是篇幅會(huì)特別大,二是大部分是網(wǎng)絡(luò)操作,也和skynet本身沒太大關(guān)聯(lián)性。所以就不會(huì)細(xì)講了,只會(huì)挑一些我認(rèn)為比較重要的東西去講。這一篇主要講一下對網(wǎng)絡(luò)的控制命令。

下面列出來的都是消息類型,用一個(gè)字符來表示,很不直接,很不好記
/*
    The first byte is TYPE

    S Start socket
    B Bind socket
    L Listen socket
    K Close socket
    O Connect to (Open)
    X Exit
    D Send package (high)
    P Send package (low)
    A Send UDP package
    T Set opt
    U Create UDP socket
    C set udp address
 */
//請求消息頭,上面的操作都是通過發(fā)送消息來實(shí)現(xiàn)的
struct request_package {
    uint8_t header[8];  // 6 bytes dummy
    union {
        char buffer[256];
        struct request_open open;
        struct request_send send;
        struct request_send_udp send_udp;
        struct request_close close;
        struct request_listen listen;
        struct request_bind bind;
        struct request_start start;
        struct request_setopt setopt;
        struct request_udp udp;
        struct request_setudp set_udp;
    } u;
    uint8_t dummy[256];
};
這里就是處理請求的地方
// return type
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
    int fd = ss->recvctrl_fd;
    // the length of message is one byte, so 256+8 buffer size is enough.
    uint8_t buffer[256];
    uint8_t header[2];
    block_readpipe(fd, header, sizeof(header));
    int type = header[0];
    int len = header[1];
    block_readpipe(fd, buffer, len);
    // ctrl command only exist in local fd, so don't worry about endian.
    switch (type) {
    case 'S':
        return start_socket(ss,(struct request_start *)buffer, result);
    case 'B':
        return bind_socket(ss,(struct request_bind *)buffer, result);
    case 'L':
        return listen_socket(ss,(struct request_listen *)buffer, result);
    case 'K':
        return close_socket(ss,(struct request_close *)buffer, result);
    case 'O':
        return open_socket(ss, (struct request_open *)buffer, result);  //在這里面調(diào)用connect
    case 'X':
        result->opaque = 0;
        result->id = 0;
        result->ud = 0;
        result->data = NULL;
        return SOCKET_EXIT;
    case 'D':
        return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL);
    case 'P':
        return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL);
    case 'A': {
        struct request_send_udp * rsu = (struct request_send_udp *)buffer;
        return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
    }
    case 'C':
        return set_udp_address(ss, (struct request_setudp *)buffer, result);
    case 'T':
        setopt_socket(ss, (struct request_setopt *)buffer);
        return -1;
    case 'U':
        add_udp_socket(ss, (struct request_udp *)buffer);
        return -1;
    default:
        fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type);
        return -1;
    };

    return -1;
}

以‘O'為例分析一下這個(gè)流程,skynet是基于消息的,這個(gè)是它的設(shè)計(jì)理念。所以對socket的操作,也都是基于消息的。它是怎么做的呢,首先創(chuàng)建一個(gè)recvctrl_fd,把消息發(fā)到recvctrl_fd。然后每次socket_server_poll被調(diào)用的時(shí)候,會(huì)select這個(gè)recvctrl_fd,看有沒消息。如果有就調(diào)用上面的crl_cmd函數(shù)。

過程大致都講清楚了,現(xiàn)在來看'O’這個(gè)命令,也就是打開一個(gè)連接是怎么一個(gè)過程。
1.lua層的connect函數(shù)對應(yīng)的是lconnect函數(shù),

static int
lconnect(lua_State *L) {
    size_t sz = 0;
    const char * addr = luaL_checklstring(L,1,&sz);
    char tmp[sz];
    int port = 0;
    const char * host = address_port(L, tmp, addr, 2, &port);
    if (port == 0) {
        return luaL_error(L, "Invalid port");
    }
    struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
    int id = skynet_socket_connect(ctx, host, port);  //將Lua數(shù)據(jù)轉(zhuǎn)為c數(shù)據(jù)以后調(diào)c函數(shù)
    lua_pushinteger(L, id);

    return 1;
}

2.lconnect函數(shù)調(diào)用了skynet_socket_connect函數(shù)。

int 
skynet_socket_connect(struct skynet_context *ctx, const char *host, int port) {
    uint32_t source = skynet_context_handle(ctx);
    return socket_server_connect(SOCKET_SERVER, source, host, port);
}

3.skynet_socket_connect函數(shù)又調(diào)了socket_server_connect

int 
socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
    struct request_package request;
    int len = open_request(ss, &request, opaque, addr, port);
    if (len < 0)
        return -1;
    send_request(ss, &request, 'O', sizeof(request.u.open) + len); //注意'O’
    return request.u.open.id;
}

4.socket_server_connect調(diào)用了一個(gè)叫send_request的函數(shù)

static void
send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
    request->header[6] = (uint8_t)type;
    request->header[7] = (uint8_t)len;
    for (;;) {
//注意write和ss->send_ctrl_fd
        ssize_t n = write(ss->sendctrl_fd, &request->header[6], len+2);
        if (n<0) {
            if (errno != EINTR) {
                fprintf(stderr, "socket-server : send ctrl command error %s.\n", strerror(errno));
            }
            continue;
        }
        assert(n == len+2);
        return;
    }
}

到這個(gè)函數(shù)以后,比較清楚地看到,數(shù)據(jù)被發(fā)送到sendctrl_fd這個(gè)描述符上了。
5.從send_request來看,數(shù)據(jù)明明是發(fā)到了sendctrl_fd上面,而上面講的取數(shù)據(jù)是從recvctrl_fd上取的,明顯對不上。這到底是怎么回事?是的,確實(shí)是這樣,還有一段代碼沒有看的話,這個(gè)確實(shí)是解釋不通的。

    int fd[2];
    poll_fd efd = sp_create();
    if (sp_invalid(efd)) {
        fprintf(stderr, "socket-server: create event pool failed.\n");
        return NULL;
    }
    if (pipe(fd)) { //管道操作,fd[0]為讀取端,fd[1]為寫入端
        sp_release(efd);
        fprintf(stderr, "socket-server: create socket pair failed.\n");
        return NULL;
    }
    if (sp_add(efd, fd[0], NULL)) {
        // add recvctrl_fd to event poll
        fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
        close(fd[0]);
        close(fd[1]);
        sp_release(efd);
        return NULL;
    }

    struct socket_server *ss = MALLOC(sizeof(*ss));
    ss->event_fd = efd;
    ss->recvctrl_fd = fd[0];  //其它這個(gè)是管道的讀取端
    ss->sendctrl_fd = fd[1]; //這個(gè)是管道的寫入端
    ss->checkctrl = 1;

6.上面的幾行代碼說明了sendctrl_fd是管道的寫入端,recvctrl_fd是管道的讀取端,這就解釋了上面5的疑問。因?yàn)楣艿缹懭攵说臄?shù)據(jù)都會(huì)到讀取端。所以從sendctrl_fd寫進(jìn)去,會(huì)到recvctrl_fd里。

7.socket_server_poll中select函數(shù)檢查recvctrl_fd,如果有消息,進(jìn)入case控制語句,然后到'O‘,調(diào)用open_socket,在這個(gè)函數(shù)里,會(huì)調(diào)用大家熟悉的connect函數(shù)。打開一個(gè)連接到目標(biāo)ip和端口的連接。

為什么操作一個(gè)網(wǎng)絡(luò)要費(fèi)這么大的勁呢,繞來繞去非常的不直觀。因?yàn)閟kynet是基于消息的,而且每個(gè)服務(wù)都有一個(gè)monitor,每個(gè)消息處理的時(shí)候要盡可能的短,這樣才不會(huì)阻塞服務(wù)里其它的請求。而connect這種明顯是阻塞的,當(dāng)然也可以寫成非阻塞的,但是非阻塞的話,你需要不斷地掛起,因?yàn)榉亲枞麑?shí)際上是基于select技術(shù)來實(shí)現(xiàn)的。而不斷地掛起,這個(gè)就很麻煩,寫起來很痛苦而且很容易出錯(cuò)。因此云風(fēng)把這些都放到網(wǎng)絡(luò)線程中來做,這樣就不會(huì)影響工作線程。但是這樣做也有它的缺點(diǎn),那就是網(wǎng)絡(luò)線程可能會(huì)被阻塞,網(wǎng)絡(luò)線程被阻塞就會(huì)導(dǎo)致服務(wù)無響應(yīng)?;蛘邔?dǎo)致大量的數(shù)據(jù)包積累,引起波峰。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容