[原]PHP-yar拓展源碼解讀六-transport篇

傳輸器結(jié)構(gòu)

yar底層用一個_yar_transport_interface結(jié)構(gòu)表示一個傳輸器,處理網(wǎng)絡(luò)IO相關(guān)事宜。

typedef struct _yar_transport_interface {
    void *data;//這個變量在并發(fā)調(diào)用章節(jié)才會講到,暫時不用管
    int  (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
    int  (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
    struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
    int  (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
    int  (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
    void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;

yar_transport_interface_tyar_transport_t生成

typedef struct _yar_transport {
    const char *name;
    struct _yar_transport_interface * (*init)();
    void (*destroy)(yar_transport_interface_t *self);
    yar_transport_multi_t *multi;
} yar_transport_t;

兩者關(guān)系如圖。


傳輸器類圖.png

前面提過Yar是典型的OO風(fēng)格的C代碼。雖然說是C語言的實現(xiàn),但要理解這里幾個相關(guān)結(jié)構(gòu)必須要用面向?qū)ο蟮难酃狻?/p>

yar_transport_t結(jié)構(gòu)體類型相當(dāng)于一個工廠接口,聲明了一個工廠骨架。
yar_transport_t類型的兩個變量相當(dāng)于 兩個 工廠接口的實現(xiàn)對象,不同的變量使用不同的函數(shù)指針形成了多態(tài)。yar_transport_t管理著yar_transport_interface結(jié)構(gòu)體變量的構(gòu)造和析構(gòu),產(chǎn)生具體的yar_transport_interface類型的實例,每個yar_transport_interface實例都有自己的成員變量。

還有一種相似的理解思路。
yar_transport_t是一個抽象類。
yar_transport_t類型的兩個變量是該抽象類的兩個實現(xiàn)子類,yar_transport_t 聲明的方法都是類方法和類變量,對應(yīng)PHP中的 __construct()__destruct()
yar_transport_interface則是yar_transport_t這個抽象類的實例,其結(jié)構(gòu)體上的成員對應(yīng)過的是實例的成員變量和成員方法。

但無論是哪一種,理解的核心都是以下兩點:
1.yar_transport_interface是一個傳輸器對象,有自己的獨立數(shù)據(jù)和狀態(tài)。
2.yar_transport_t類型的變量和yar_transport_t類型構(gòu)成了多態(tài),用于生成不同類型的yar_transport_interface對象。
后文統(tǒng)一使用第一種方式去描述yar的傳輸器模塊。

傳輸器工廠類

yar_transport_t->name表示傳輸器工廠的名字,下文會用傳輸器工廠的名字借代該工廠構(gòu)造的傳輸器。
Yar目前支持HttpTcpUnixSock(僅單機內(nèi)調(diào)用可用)三種傳輸方式/協(xié)議,Http目前的傳輸器使用Curl實現(xiàn),傳輸器(工廠)的名字為curl,另外兩種兩種使用socket實現(xiàn),傳輸器名字為sock,不過由于PHP目前沒有Tcp/UnixSock的Yar Rpc Server,所以這個sock傳輸器很少會用到。
init()destroy()是函數(shù)指針,用于構(gòu)造和銷毀 yar_transport_interface_t實例。
multi()是另外一個工廠,用于構(gòu)造一個并發(fā)調(diào)度器實例,后續(xù)并發(fā)調(diào)用章節(jié)我們也會提到。

傳輸器類

傳輸器有sockcurl兩種,前者應(yīng)用場景較少。
但是考慮其比較簡單,沒有并發(fā)調(diào)用模式,這里將以其為示例先講述傳輸器類中同步RPC調(diào)用模式的相關(guān)方法 ,至于curl傳輸器以及 并發(fā)RPC調(diào)用 在后面會用另外一篇文章單獨分析。實際上兩種傳輸器的接口方法語義和調(diào)用方式都有一定差異。

Open()

//socket.c
int php_yar_socket_open(yar_transport_interface_t *self, zend_string *address, long options, char **err_msg) /* {{{ */ {
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;
    struct timeval tv;
    php_stream *stream = NULL;
    zend_string *errstr = NULL;
    char *persistent_key = NULL;
    int err;

    tv.tv_sec = (ulong)(YAR_G(connect_timeout) / 1000);
    tv.tv_usec = (ulong)((YAR_G(connect_timeout) % 1000)? (YAR_G(connect_timeout) % 1000) * 1000 : 0);
     //直到我看到這塊源碼,我才知道原來Yar是支持長連接的,長連接開關(guān)的來源為ini文件中的`yar.allow_persistent`,這個配置在php.net上也是找不到的,curl傳輸器同樣支持該配置。
    if (options & YAR_PROTOCOL_PERSISTENT) {
        data->persistent = 1;
        spprintf(&persistent_key, 0, "yar_%s", ZSTR_VAL(address));
    } else {
        data->persistent = 0;
    }

    //發(fā)起連接或根據(jù)uri獲取可復(fù)用的舊連接
    stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);

    if (persistent_key) {
        efree(persistent_key);
    }

    if (stream == NULL) {
        spprintf(err_msg, 0, "Unable to connect to %s (%s)", ZSTR_VAL(address), strerror(errno));
        efree(errstr);
        return 0;
    }
    //sock傳輸器是同步阻塞的,不支持并行調(diào)用
    php_stream_set_option(stream, PHP_STREAM_OPTION_BLOCKING, 0, NULL);

#if ZEND_DEBUG
    stream->__exposed++;
#endif

    data->stream = stream;
        
    return 1;
} /* }}} */

yar_transport_interface->open()用于發(fā)起連接或初始化網(wǎng)絡(luò)IO相關(guān)的資源。
雖然這個socket傳輸器的名字叫'sock',但是socket方式也沒有直接使用socket()系函數(shù),而是使用了PHP提供的 機制封裝的統(tǒng)一接口,簡化相關(guān)通信細(xì)節(jié)。

Send()

//socket.c
int php_yar_socket_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) /* {{{ */ {
    fd_set rfds;
    zend_string *payload;
    struct timeval tv;
    int ret = -1, fd, retval;
    char buf[SEND_BUF_SIZE];
    yar_header_t header = {0};
    //self變量相當(dāng)于python方法的self參數(shù),也類似于手動傳遞一個PHP 的$this指針供方法使用,面向?qū)ο箫L(fēng)格的C常見做法。
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;

    FD_ZERO(&rfds);
    //php的流轉(zhuǎn)文件描述符
    if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
        PHP_SAFE_FD_SET(fd, &rfds);
    } else {
        spprintf(msg, 0, "Unable cast socket fd form stream (%s)", strerror(errno));
        return 0;
    }

    //request變量序列化
    if (!(payload = php_yar_request_pack(request, msg))) {
        return 0;
    }

    DEBUG_C(ZEND_ULONG_FMT": pack request by '%.*s', result len '%ld', content: '%.32s'", 
            request->id, 7, ZSTR_VAL(payload), ZSTR_LEN(payload), ZSTR_VAL(payload) + 8);

    //構(gòu)建header變量
    /* for tcp/unix RPC, we need another way to supports auth */
    php_yar_protocol_render(&header, request->id, "Yar PHP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);

    memcpy(buf, (char *)&header, sizeof(yar_header_t));

    tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
    tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);

    //php_select()是select()的別名宏,等待socket文件描述符可寫
    retval = php_select(fd+1, NULL, &rfds, NULL, &tv);

    if (retval == -1) {
        zend_string_release(payload);
        spprintf(msg, 0, "select error '%s'", strerror(errno));
        return 0;
    } else if (retval == 0) {
        zend_string_release(payload);
        spprintf(msg, 0, "select timeout '%ld' seconds reached", YAR_G(timeout));
        return 0;
    }

    //通過buff緩沖區(qū)循環(huán)發(fā)送所有報文(協(xié)議頭+載荷)
    if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
        size_t bytes_left = 0, bytes_sent = 0;

        if (ZSTR_LEN(payload) > (sizeof(buf) - sizeof(yar_header_t))) {
            memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), sizeof(buf) - sizeof(yar_header_t));
            if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(buf), 0, NULL, 0)) < 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "unable to send data");
                return 0;
            }
        } else {
            memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), ZSTR_LEN(payload));
            if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(yar_header_t) + ZSTR_LEN(payload), 0, NULL, 0)) < 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "unable to send data");
                return 0;
            }
        }

        bytes_sent = ret - sizeof(yar_header_t);
        bytes_left = ZSTR_LEN(payload) - bytes_sent;

wait_io:
        if (bytes_left) {
            retval = php_select(fd+1, NULL, &rfds, NULL, &tv);

            if (retval == -1) {
                zend_string_release(payload);
                spprintf(msg, 0, "select error '%s'", strerror(errno));
                return 0;
            } else if (retval == 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
                return 0;
            }

            if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
                if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
                    bytes_left -= ret;
                    bytes_sent += ret;
                }
            }
            goto wait_io;
        }
    }

    zend_string_release(payload);

    return ret < 0? 0 : 1;
} /* }}} */

yar_transport_interface->send()接受一個request參數(shù),用于實際的IO發(fā)包。
備注:對于curl傳輸器,由于libcurl的api限制因為其send()并未進(jìn)行實際的發(fā)包,實際上發(fā)包收包都在exec()中

Exec()

//socket.c
yar_response_t * php_yar_socket_exec(yar_transport_interface_t* self, yar_request_t *request) /* {{{ */ {
    fd_set rfds;
    struct timeval tv;
    yar_header_t *header;
    yar_response_t *response;
    int fd, retval, recvd;
    size_t len = 0, total_recvd = 0;
    char *msg, buf[RECV_BUF_SIZE], *payload = NULL;
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;

    response = ecalloc(1, sizeof(yar_response_t));

    //php stram轉(zhuǎn)socket fd
    FD_ZERO(&rfds);
    if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
        PHP_SAFE_FD_SET(fd, &rfds);
    } else {
        len = snprintf(buf, sizeof(buf), "Unable cast socket fd form stream (%s)", strerror(errno));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
        return response;
    }

    tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
    tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);

//根據(jù)協(xié)議頭長度循環(huán)收包
wait_io:
    retval = php_select(fd+1, &rfds, NULL, NULL, &tv);

    if (retval == -1) {
        len = snprintf(buf, sizeof(buf), "Unable to select %d '%s'", fd, strerror(errno));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
        return response;
    } else if (retval == 0) {
        len = snprintf(buf, sizeof(buf), "select timeout %ldms reached", YAR_G(timeout));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
        return response;
    }

    if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
        zval *retval, rret;
        //尚未解析出載荷信息/未解析出協(xié)議頭
        if (!payload) {
            if ((recvd = php_stream_xport_recvfrom(data->stream, buf, sizeof(buf), 0, NULL, NULL, NULL)) > 0) {
                //解析出協(xié)議頭
                if (!(header = php_yar_protocol_parse(buf))) {
                    php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
                    return response;
                }
                //根據(jù)協(xié)議頭獲取載荷長度并分配相應(yīng)內(nèi)存接受載荷數(shù)據(jù)
                payload = emalloc(header->body_len);
                len = header->body_len;
                total_recvd  = recvd - sizeof(yar_header_t);
                            
                memcpy(payload, buf + sizeof(yar_header_t), total_recvd);

                if (recvd < (sizeof(yar_header_t) + len)) {
                    goto wait_io;   
                }
            } else if (recvd < 0) {
                /* this should never happen */
                goto wait_io;
            }
        } else {
            if ((recvd = php_stream_xport_recvfrom(data->stream, payload + total_recvd, len - total_recvd, 0, NULL, NULL, NULL)) > 0) {
                total_recvd += recvd;
            }

            if (total_recvd < len) {
                goto wait_io;
            }
        }
        //此處已經(jīng)收包結(jié)束
        if (len) {
            //調(diào)用相應(yīng)打包器反序列化載荷數(shù)據(jù)成一個PHP數(shù)組變量到`rret`和`retval`中,具體參考response章節(jié)
            if (!(retval = php_yar_packager_unpack(payload, len, &msg, &rret))) {
                php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
                efree(msg);
                return response;
            }
            //isore數(shù)組變量轉(zhuǎn)response結(jié)構(gòu)
            php_yar_response_map_retval(response, retval);

            DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'",
                    response->id, 7, payload, header->body_len, payload + 8);

            efree(payload);
            zval_ptr_dtor(retval);
        } else {
            php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
        }
        return response;
    } else {
        goto wait_io;
    }
} /* }}} */

yar_transport_interface->exec()用于IO收包,并將返回的數(shù)據(jù)封裝成response變量返回。

close()

//socket.c
void php_yar_socket_close(yar_transport_interface_t* self) /* {{{ */ {
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;

    if (!data) {
        return;
    }
    //長連接模式不關(guān)閉連接
    if (!data->persistent && data->stream) {
        php_stream_close(data->stream);
    }

    efree(data);
    efree(self);

    return;
}
/* }}} */

yar_transport_interface->exec()->close()用于相關(guān)資源的回收和釋放。

yar_transport_interface的其他接口方法

yar_transport_interface->exec()->setopt()用于相關(guān)配置項的配置,socket傳輸器沒有實現(xiàn)并使用該接口,curl在exec()方法中用于設(shè)置超時信息。
yar_transport_interface->exec()->call_data()目前僅并發(fā)調(diào)用會用到,這個我們在并發(fā)調(diào)用章節(jié)再提及。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容