[原]PHP-yar拓展源碼解讀七-concurrent_client篇

異步任務(wù)的封裝

//yar_transport.h
typedef struct _yar_call_data {
    ulong sequence;
    zend_string *uri;
    zend_string *method;
    zval callback;
    zval ecallback;
    zval parameters;
    zval options;
} yar_call_data_t;

Yar用yar_call_data_t表示一個(gè)異步任務(wù),sequence是從1開(kāi)始的任務(wù)ID,除了sequence,其他基本上就是對(duì)應(yīng)Yar_Concurrent_Client::call()上的同名參數(shù)。為方便作為一個(gè)PHP變量使用該結(jié)構(gòu)體,該結(jié)構(gòu)體平時(shí)被Yar包裝成一個(gè)le_calldata型Resource。

異步客戶端

//從c源碼反推出的的PHP類定義,沒(méi)有PHP定義文件
class Yar_Concurrent_Client{
     /** @var Resource[] le_calldata資源數(shù)組 */
     protected static $_callstack;  
     /** @var Callable RPC成功的回調(diào) */
     protected static $_callback;
     /** @var Callable RPC失敗的回調(diào) */ 
     protected static $_error_callback;
     /** @var boolean $_start true 異步調(diào)用執(zhí)行中 false 未執(zhí)行 */
     protected static $_start;

    public static call($uri , $method, $parameters=null, $callback=null,$error_callback=null,$options):int
    public static loop($callback=null , $error_callback=null):bool

}

Yar_Concurrent_Client定義如上,
看下Yar_Concurrent_Client::call()用于注冊(cè)一個(gè)異步RPC調(diào)用

//yar_client.c
/* {{{ proto Yar_Concurrent_Client::call($uri, $method, $parameters = NULL, $callback = NULL, $error_callback = NULL, $options = array()) */
PHP_METHOD(yar_concurrent_client, call) {
    zend_string *uri, *method;
    zend_string *name = NULL;
    long sequence;
    zval *callstack, item, *status;
    zval *error_callback = NULL, *callback = NULL, *parameters = NULL, *options = NULL;
    yar_call_data_t *entry;

    if (zend_parse_parameters(ZEND_NUM_ARGS(), "SS|a!z!za",
                &uri, &method, &parameters, &callback, &error_callback, &options) == FAILURE) {
        return;
    }

    if (!ZSTR_LEN(uri)) {
        php_error_docref(NULL, E_WARNING, "first parameter is expected to be a valid rpc server uri");
        return;
    }

    if (strncasecmp(ZSTR_VAL(uri), "http://", sizeof("http://") - 1) 
            && strncasecmp(ZSTR_VAL(uri), "https://", sizeof("https://") - 1)) {
        php_error_docref(NULL, E_WARNING, "only http protocol is supported in concurrent client for now");
        return;
    }

    if (!method->len) {
        php_error_docref(NULL, E_WARNING, "second parameter is expected to be a valid rpc api name");
        return;
    }

    //回調(diào)有效性檢查
    if (callback && !Z_ISNULL_P(callback) &&
            !zend_is_callable(callback, 0, &name)) {
        php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fourth parameter is expected to be a valid callback");
        zend_string_release(name);
        RETURN_FALSE;
    }

    if (name) {
        zend_string_release(name);
        name = NULL;
    }

    if (error_callback && !Z_ISNULL_P(error_callback) &&
            !zend_is_callable(error_callback, 0, &name)) {
        php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fifth parameter is expected to be a valid error callback");
        zend_string_release(name);
        RETURN_FALSE;
    }

    if (name) {
        zend_string_release(name);
    }

    //執(zhí)行中的concurrent_client不能添加任務(wù)
    status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
    if (Z_TYPE_P(status) == IS_TRUE) {
        php_error_docref(NULL, E_WARNING, "concurrent client has already started");
        RETURN_FALSE;
    }

    entry = ecalloc(1, sizeof(yar_call_data_t));

    entry->uri = zend_string_copy(uri);
    entry->method = zend_string_copy(method);

    if (callback && !Z_ISNULL_P(callback)) {
        ZVAL_COPY(&entry->callback, callback);
    }
    if (error_callback && !Z_ISNULL_P(error_callback)) {
        ZVAL_COPY(&entry->ecallback, error_callback);
    }
    if (parameters && IS_ARRAY == Z_TYPE_P(parameters)) {
        ZVAL_COPY(&entry->parameters, parameters);
    }
    if (options && IS_ARRAY == Z_TYPE_P(options)) {
        ZVAL_COPY(&entry->options, options);
    }

    callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
    //初始化Yar_Concurrent_Client::_callstack;
    if (Z_ISNULL_P(callstack)) {
        zval tmp;
        array_init(&tmp);
        zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), &tmp);
        zval_ptr_dtor(&tmp);
    }
    
    //用yar_call_data_t生成一個(gè)le_calldata資源,寫(xiě)入_callstack屬性
    ZVAL_RES(&item, zend_register_resource(entry, le_calldata));
    sequence = zend_hash_next_free_element(Z_ARRVAL_P(callstack));
    entry->sequence = sequence + 1;
    zend_hash_next_index_insert(Z_ARRVAL_P(callstack), &item);
    RETURN_LONG(entry->sequence);
}
/* }}} */

整個(gè)call調(diào)用,除去防御代碼,其實(shí)就做了一件事,構(gòu)造一個(gè) le_calldata型Resource,并添加到Yar_Concurrent_Client::_callstack數(shù)組中。
接著是Yar_Concurrent_Client::loop()

//yar_client.c
PHP_METHOD(yar_concurrent_client, loop) {
    zend_string *name = NULL;
    zval *callstack;
    zval *callback = NULL, *error_callback = NULL;
    zval *status;
    uint ret = 0;

    if (zend_parse_parameters(ZEND_NUM_ARGS(), "|zz", &callback, &error_callback) == FAILURE) {
        return;
    }

    status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
    if (Z_TYPE_P(status) == IS_TRUE) {
        php_error_docref(NULL, E_WARNING, "concurrent client has already started");
        RETURN_FALSE;
    }

    if (callback && !Z_ISNULL_P(callback) &&
            !zend_is_callable(callback, 0, &name)) {
        php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "first argument is expected to be a valid callback");
        zend_string_release(name);
        RETURN_FALSE;
    }

    if (name) {
        zend_string_release(name);
        name = NULL;
    }

    if (error_callback && !Z_ISNULL_P(error_callback) &&
            !zend_is_callable(error_callback, 0, &name)) {
        php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "second argument is expected to be a valid error callback");
        zend_string_release(name);
        RETURN_FALSE;
    }
    if (name) {
        zend_string_release(name);
    }

    callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
    if (Z_ISNULL_P(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
        RETURN_TRUE;
    }

    //回調(diào)函數(shù)寫(xiě)入$_callback,$_error_callback 類成員變量
    if (callback && !Z_ISNULL_P(callback)) {
        zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), callback);
    }

    if (error_callback && !Z_ISNULL_P(error_callback)) {
        zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), error_callback);
    }

    //更新$_start類成員變量并執(zhí)行 php_yar_concurrent_client_hand()
    ZVAL_BOOL(status, 1);
    ret = php_yar_concurrent_client_handle(callstack);
    ZVAL_BOOL(status, 0);
    RETURN_BOOL(ret);
}

可見(jiàn)除了設(shè)置回調(diào)函數(shù),唯一實(shí)際有效的操作是調(diào)用php_yar_concurrent_client_hand()

//yar_client.c
int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
    char *msg;
    zval *calldata;
    zend_ulong sequence;
    yar_request_t *request;
    const yar_transport_t *factory;
    yar_transport_interface_t *transport;
    yar_transport_multi_interface_t *multi;

    //socket(Tcp/unixsock協(xié)議)傳輸器目前不支持并發(fā)調(diào)用
    factory = php_yar_transport_get(ZEND_STRL("curl"));
    multi = factory->multi->init();//調(diào)用curl_multi_init()初始化 yar-curl并發(fā)調(diào)度器

    //遍歷Yar_Concurrent_Client::_callstack 中的yar_call_data_t 結(jié)構(gòu)體
    ZEND_HASH_FOREACH_NUM_KEY_VAL(Z_ARRVAL_P(callstack), sequence, calldata) {
        yar_call_data_t *entry;
        long flags = 0;

        entry = (yar_call_data_t *)zend_fetch_resource(Z_RES_P(calldata), "Yar Call Data", le_calldata);

        if (!entry) {
            continue;
        }
        //下面的流程和curl的同步client大同小異
        //構(gòu)造request
        if (Z_ISUNDEF(entry->parameters)) {
            array_init(&entry->parameters);
        } 

        transport = factory->init();

        if (YAR_G(allow_persistent)) {
            if (!Z_ISUNDEF(entry->options)) {
                zval *flag = php_yar_client_get_opt(&entry->options, YAR_OPT_PERSISTENT);
                if (flag && (Z_TYPE_P(flag) == IS_TRUE || (Z_TYPE_P(flag) == IS_LONG && Z_LVAL_P(flag)))) {
                    flags |= YAR_PROTOCOL_PERSISTENT;
                }
            }
        }

         
        if (!(request = php_yar_request_instance(entry->method,
                        &entry->parameters, Z_ISUNDEF(entry->options)? NULL: & entry->options))) {
            transport->close(transport);
            factory->destroy(transport);
            return 0;
        }

        msg = (char*)&entry->options;
        //打開(kāi)并初始化一個(gè)libc-CURL句柄
        if (!transport->open(transport, entry->uri, flags, &msg)) {
            php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
            transport->close(transport);
            factory->destroy(transport);
            efree(msg);
            return 0;
        }

        DEBUG_C(ZEND_ULONG_FMT": call api '%s' at (%c)'%s' with '%d' parameters",
                request->id, ZSTR_VAL(request->method), (flags & YAR_PROTOCOL_PERSISTENT)? 'p' : 'r', entry->uri, 
                zend_hash_num_elements(Z_ARRVAL(request->parameters)));
        //設(shè)置libc-CURL要發(fā)送的數(shù)據(jù)
        if (!transport->send(transport, request, &msg)) {
            php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
            transport->close(transport);
            factory->destroy(transport);
            efree(msg);
            return 0;
        }

        //將entry存到transport實(shí)例的data對(duì)象中,方便后續(xù)使用
        transport->calldata(transport, entry);
        //注冊(cè)異步任務(wù)
        multi->add(multi, transport);
        php_yar_request_destroy(request);
    } ZEND_HASH_FOREACH_END();

    //發(fā)包收包 執(zhí)行回調(diào)方法
    if (!multi->exec(multi, php_yar_concurrent_client_callback)) {
        multi->close(multi);
        return 0;
    }
    //資源釋放
    multi->close(multi);
    return 1;
} /* }}} */

并發(fā)調(diào)度器

上面的流程大體上和之前介紹的同步調(diào)用的流程類似,但是多了一個(gè)multi = factory->multi->init();和該multi變量的相關(guān)調(diào)用。
multi相關(guān)結(jié)構(gòu)如下:

//yar_transport.h
//并發(fā)調(diào)度器
typedef struct _yar_transport_multi_interface {
    //對(duì)象成員變量
    void *data;
    //對(duì)象方法
    int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
    int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
    void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;
//并發(fā)調(diào)度器工廠
typedef struct _yar_transport_multi {
    struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;

我們稱呼上面的那個(gè)(yar_transport_multi_interface_t)類型的變量為為 并發(fā)調(diào)度器。
看過(guò)傳輸器章節(jié)的結(jié)構(gòu)關(guān)系分析,這幾個(gè)結(jié)構(gòu)理解起來(lái)應(yīng)該也毫無(wú)壓力。

由于C語(yǔ)言中實(shí)現(xiàn)“繼承”所用的"父類"的結(jié)構(gòu)和“子類實(shí)例”的結(jié)構(gòu)是完全一致的,我們不能像PHP/JAVA一樣隨意往子類中添加成員變量(其實(shí)還是有辦法的,參考PHP內(nèi)核的zend_function相關(guān)結(jié)構(gòu)的實(shí)現(xiàn),思路不一樣),所以上面的(void *)data變量并不是對(duì)應(yīng)我們理解的對(duì)象中的某個(gè)變量,而是所有變量,子類對(duì)象通過(guò)存放不同類型的指針來(lái)“持有"不同數(shù)量和不同類型的成員變量。
curl的并發(fā)傳輸器實(shí)例中,其該data指針的類型為yar_curl_multi_data_t:

//curl.c
typedef struct _yar_curl_multi_data_t {
    CURLM *cm;//libcurl的批處理句柄,也是curl并發(fā)調(diào)度器的核心
    yar_transport_interface_t *chs;//curl傳輸器實(shí)例
} yar_curl_multi_data_t;

另外yar_transport_interface_t也有一個(gè)data用于同樣的用途,curl"子類"實(shí)例中其類型為_yar_curl_data_t:

typedef struct _yar_curl_data_t {
    CURL        *cp;//libcurl-CURL普通句柄
    char        persistent;//是否長(zhǎng)連接
    smart_str   buf;.//返回報(bào)文存放用的buf
    smart_str   postfield;//post報(bào)文
    php_url     *host;
    yar_call_data_t *calldata;//異步任務(wù)
    yar_curl_plink_t *plink;//長(zhǎng)連接用的
    struct curl_slist *headers;//http header頭
    yar_transport_interface_t *next;//傳輸器鏈表
#if LIBCURL_VERSION_NUM < 0x071100
    zend_string *address;
#endif
} yar_curl_data_t;

相關(guān)數(shù)據(jù)結(jié)構(gòu)介紹完了,這里看下curl傳輸器的實(shí)現(xiàn)下multl的成員方法.

Init()

yar_transport_curl_multi->init()代表并發(fā)調(diào)度器的初始化,即構(gòu)造器,除了內(nèi)存分配和成員方法設(shè)置,核心就是調(diào)用libcurl的curl_multi_init生成一個(gè)libcurl-CURLM句柄

yar_transport_multi_interface_t * php_yar_curl_multi_init() /* {{{ */ {
    yar_transport_multi_interface_t *multi = emalloc(sizeof(yar_transport_multi_interface_t));
    yar_curl_multi_data_t *data = ecalloc(1, sizeof(yar_curl_multi_data_t));

    data->cm = curl_multi_init();

    multi->data     = data;
    multi->add      = php_yar_curl_multi_add_handle;
    multi->exec     = php_yar_curl_multi_exec;
    multi->close    = php_yar_curl_multi_close;

    return multi;
} /* }}} */

Add()

yar_transport_multi_interface_t->add()用于向異步調(diào)度器注冊(cè)一個(gè)異步調(diào)用。

int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) /* {{{ */ {
    yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
    yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
        //libcurl相關(guān)配置
    php_yar_curl_prepare(handle);
        //往CURLM棧添加CURL句柄
    curl_multi_add_handle(multi->cm, data->cp);
        //將最后注冊(cè)的傳輸器添加到傳輸器鏈表表頭
    if (multi->chs) {
        data->next = multi->chs;
        multi->chs = handle;
    } else {
        multi->chs = handle;
    }

    return 1;
} /* }}} */

Exec()

yar_transport_multi_interface_t->exec()執(zhí)行異步調(diào)度器,基于select事件模型,使用curl_multi_perform收發(fā)所有數(shù)據(jù)。
其實(shí)Yar差不多從初版開(kāi)始就有一個(gè)epoll事件模型實(shí)現(xiàn)的異步調(diào)度器,不過(guò)啟用需要自行在config.h開(kāi)啟ENABLE_EPOLL宏。

int php_yar_curl_multi_exec(yar_transport_multi_interface_t *self, yar_concurrent_client_callback *f) /* {{{ */ {
    int running_count, rest_count;
    yar_curl_multi_data_t *multi;


    multi = (yar_curl_multi_data_t *)self->data;
    while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));

    //調(diào)用注冊(cè)的回調(diào)函數(shù),Yar回調(diào)回調(diào)函數(shù)有兩個(gè)時(shí)機(jī)
    //一個(gè)在某個(gè)請(qǐng)求返回報(bào)文接受完畢時(shí),這個(gè)是我們一般意義上理解的回調(diào)函數(shù)
   //另一個(gè)是Yar的特色處理方式,在一個(gè)在所有請(qǐng)求發(fā)出后,這時(shí)會(huì)用空參數(shù)回調(diào)Yar_Concurrent_Client的回調(diào)函數(shù)讓用戶先行處理其他事情。第二種情況對(duì)應(yīng)的就是下面這面這句。
    if (!f(NULL, YAR_ERR_OKEY, NULL)) {
        goto bailout;
    }

    //用戶回調(diào)函數(shù)執(zhí)行后的異常檢查
    if (EG(exception)) {
        goto onerror;
    }
    //重復(fù)調(diào)用curl_multi_perform直到所有句柄的數(shù)據(jù)首發(fā)完成
    if (running_count) {
        rest_count = running_count;
        do {
            int max_fd, return_code;
            struct timeval tv;
            fd_set readfds;
            fd_set writefds;
            fd_set exceptfds;

            FD_ZERO(&readfds);
            FD_ZERO(&writefds);
            FD_ZERO(&exceptfds);

            curl_multi_fdset(multi->cm, &readfds, &writefds, &exceptfds, &max_fd);
            if (max_fd == -1) {
                /*  When  max_fd  returns  with  -1,  you  need  to  wait  a  while  and then proceed and call
                    curl_multi_perform anyway, How long to wait? I would suggest 100 milliseconds at least */
                tv.tv_sec = 0;
                tv.tv_usec = 50000; /* sleep 50ms */
                select(1, &readfds, &writefds, &exceptfds, &tv);
                while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
                continue;
            } 

            /* maybe we should use curl_multi_timeout like:
             * curl_multi_timeout(curlm, (long *)&curl_timeout);
             * if (curl_timeout == 0) {
             *    continue;
             * } else if (curl_timeout == -1) {
             *    tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
             *    tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
             * } else {
             *    tv.tv_sec  =  curl_timeout / 1000;
             *    tv.tv_usec = (curl_timeout % 1000) * 1000;
             * }
             */
            tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
            tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);

            return_code = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);
            if (return_code > 0) {
                while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
            } else if (-1 == return_code) {
                php_error_docref(NULL, E_WARNING, "select error '%s'", strerror(errno));
                goto onerror;
            } else {
                /* timeout */
                php_error_docref(NULL, E_WARNING, "select timeout %ldms reached", YAR_G(timeout));
                goto onerror;
            }

            //每完成任意一個(gè)請(qǐng)求就執(zhí)行一次回調(diào)
            if (rest_count > running_count) {
                int ret = php_yar_curl_multi_parse_response(multi, f);
                if (ret == -1) {
                    goto bailout;
                } else if (ret == 0) {
                    goto onerror;
                }
                rest_count = running_count;
            }
        } while (running_count);
    } else {
        //將各個(gè)連接的返回?cái)?shù)據(jù)分別打包成response
        int ret = php_yar_curl_multi_parse_response(multi, f);
        if (ret == -1) {
            goto bailout;
        } else if (ret == 0) {
            goto onerror;
        }
    }

    return 1;
onerror:
    return 0;
bailout:
    self->close(self);
    zend_bailout();
    return 0;
} /* }}} */

Close()

yar_transport_multi_interface_t->close()用于各種資源的清理。

void php_yar_curl_multi_close(yar_transport_multi_interface_t *self) /* {{{ */ {
    yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;

    if (multi->chs) {
        yar_transport_interface_t *p, *q;
        p = multi->chs;
        for( ; p;) {
            yar_curl_data_t *data = (yar_curl_data_t *)p->data;
            q = data->next;
            curl_multi_remove_handle(multi->cm, data->cp);
            p->close(p);
            p = q;
        }
    }
    curl_multi_cleanup(multi->cm);
    efree(multi);
    efree(self);
    return ;
} /* }}} */

數(shù)據(jù)提取和反序列化

php_yar_curl_multi_parse_response()用于從收包完成后的CURLM,解析出response并執(zhí)行回調(diào)。

static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) /* {{{ */ {
    int msg_in_sequence;
    CURLMsg *msg;

    //遍歷所有CURL的傳輸結(jié)果
    do {
        msg = curl_multi_info_read(multi->cm, &msg_in_sequence);
        if (msg && msg->msg == CURLMSG_DONE) {
            uint found = 0;
            yar_transport_interface_t *handle = multi->chs, *q = NULL;

            //遍歷傳輸器鏈表取出對(duì)應(yīng)的傳輸器
            while (handle) {
                if (msg->easy_handle == ((yar_curl_data_t*)handle->data)->cp) {
                    if (q) {
                        ((yar_curl_data_t *)q->data)->next = ((yar_curl_data_t*)handle->data)->next;
                    } else {
                        multi->chs = ((yar_curl_data_t*)handle->data)->next;
                    }
                    found = 1;
                    break;
                }
                q = handle;
                handle = ((yar_curl_data_t*)handle->data)->next;
            }

            if (found) {
                long http_code = 200;
                yar_response_t *response;
                yar_curl_data_t *data = (yar_curl_data_t *)handle->data;

                response = php_yar_response_instance();

                if (msg->data.result == CURLE_OK) {
                    curl_multi_remove_handle(multi->cm, data->cp);
                    //異常返回
                    if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK && http_code != 200) {
                        char buf[128];
                        uint len = snprintf(buf, sizeof(buf), "server responsed non-200 code '%ld'", http_code);

                        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
                        //調(diào)用失敗回調(diào)函數(shù)
                        if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
                            //用戶在回調(diào)里使用了die()
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return -1;
                        }
                        if (EG(exception)) {
                            //異常處理
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return 0;
                        }
                        handle->close(handle);
                        php_yar_response_destroy(response);
                        continue;
                    } else {
                        //成功返回
                        if (data->buf.s) {
                            char *msg = NULL;
                            zval *retval, rret;
                            yar_header_t *header;
                            char *payload;
                            size_t payload_len;

                            smart_str_0(&data->buf);

                            payload = ZSTR_VAL(data->buf.s);
                            payload_len = ZSTR_LEN(data->buf.s);

                            //協(xié)議頭解析
                            if (!(header = php_yar_protocol_parse(payload))) {
                                php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
                            } else {
                                /* skip over the leading header */
                                payload += sizeof(yar_header_t);
                                payload_len -= sizeof(yar_header_t);

                            //去掉協(xié)議頭后解析成isroe數(shù)組
                                if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &rret))) {
                                    php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
                                } else {
                                    //isroe數(shù)組轉(zhuǎn)response結(jié)構(gòu)
                                    php_yar_response_map_retval(response, retval);
                                    DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'", response->id, 7, payload, header->body_len, payload + 8);
                                    zval_ptr_dtor(retval);
                                }
                                if (msg) {
                                    efree(msg);
                                }
                            }
                        } else {
                            php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
                        }
                        //調(diào)用對(duì)應(yīng)回調(diào)函數(shù)
                        if (!f(data->calldata, response->status, response)) {
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return -1;
                        }
                        if (EG(exception)) {
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return 0;
                        }
                    }
                } else {
                    char *err = (char *)curl_easy_strerror(msg->data.result);
                    php_yar_response_set_error(response, YAR_ERR_TRANSPORT, err, strlen(err));
                    if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
                        handle->close(handle);
                        php_yar_response_destroy(response);
                        return -1;
                    }
                    if (EG(exception)) {
                        handle->close(handle);
                        php_yar_response_destroy(response);
                        return 0;
                    }
                }
                handle->close(handle);
                php_yar_response_destroy(response);
            } else {
                php_error_docref(NULL, E_WARNING, "unexpected transport info missed");
            }
        }
    } while (msg_in_sequence);

    return 1;
}
/* }}} */

回調(diào)用戶注冊(cè)方法

最后一個(gè)核心方法php_yar_concurrent_client_callback()用于回調(diào)用戶提供的回調(diào)函數(shù)

nt php_yar_concurrent_client_callback(yar_call_data_t *calldata, int status, yar_response_t *response) /* {{{ */ {
    zval code, retval, retval_ptr;
    zval callinfo, *callback, *func_params;
    zend_bool bailout = 0;
    uint params_count, i;

  
    //第一個(gè)條件分支對(duì)應(yīng)響應(yīng)回調(diào),第二個(gè)分支對(duì)應(yīng)所有請(qǐng)求都發(fā)出后的那次空回調(diào)
    if (calldata) {
                //根據(jù)請(qǐng)求結(jié)果選擇成功/失敗回調(diào)函數(shù),其中單個(gè)請(qǐng)求的回調(diào)函數(shù)優(yōu)先于Yar_Concurrent_Client(loop)的回調(diào)函數(shù)
        /* data callback */
        if (status == YAR_ERR_OKEY) {
            if (!Z_ISUNDEF(calldata->callback)) {
                callback = &calldata->callback;
            } else {
                callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
            }
            params_count = 2;
        } else {
            if (!Z_ISUNDEF(calldata->ecallback)) {
                callback = &calldata->ecallback;
            } else {
                callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), 0);
            }
            params_count = 3;
        }

        //沒(méi)有合適回調(diào)直接拋出Error或這打印遠(yuǎn)程服務(wù)方法的返回值
        if (Z_ISNULL_P(callback)) {
            if (status != YAR_ERR_OKEY) {
                if (!Z_ISUNDEF(response->err)) {
                    php_yar_client_handle_error(0, response);
                } else {
                    php_error_docref(NULL, E_WARNING, "[%d]:unknown Error", status);
                }
            } else if (!Z_ISUNDEF(response->retval)) {
                zend_print_zval(&response->retval, 1);
            }
            return 1;
        }

        if (status == YAR_ERR_OKEY) {
            if (Z_ISUNDEF(response->retval)) {
                php_yar_client_trigger_error(0, YAR_ERR_REQUEST, "%s", "server responsed empty response");
                return 1;
            }
            ZVAL_COPY(&retval, &response->retval);
        } else {
            ZVAL_LONG(&code, status);
            ZVAL_COPY(&retval, &response->err);
        }

        array_init(&callinfo);
        //回調(diào)函數(shù)的最后一個(gè)參數(shù)$callinfo,包含請(qǐng)求ID,uri,和遠(yuǎn)程方法名
        add_assoc_long_ex(&callinfo, "sequence", sizeof("sequence") - 1, calldata->sequence);
        add_assoc_str_ex(&callinfo, "uri", sizeof("uri") - 1, zend_string_copy(calldata->uri));
        add_assoc_str_ex(&callinfo, "method", sizeof("method") - 1, zend_string_copy(calldata->method));
    } else {
        callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
        if (Z_ISNULL_P(callback)) {
            return 1;
        }
        params_count = 2;
    }

   
    if (calldata && (status != YAR_ERR_OKEY)) {
         //失敗回調(diào)方法接受三個(gè)參數(shù) function($code,$retval,$callinfo)){}
        func_params = safe_emalloc(sizeof(zval), 3, 0);
        ZVAL_COPY_VALUE(&func_params[0], &code);
        ZVAL_COPY_VALUE(&func_params[1], &retval);
        ZVAL_COPY_VALUE(&func_params[2], &callinfo);
    } else if (calldata) {
        //成功的回調(diào)方法接受2個(gè)參數(shù) function($retval,$callinfo)){}
        func_params = safe_emalloc(sizeof(zval), 2, 0);
        ZVAL_COPY_VALUE(&func_params[0], &retval);
        ZVAL_COPY_VALUE(&func_params[1], &callinfo);
    } else {
       //所有請(qǐng)求都發(fā)出后的首次回調(diào)參數(shù)都是空
        func_params = safe_emalloc(sizeof(zval), 2, 0);
        ZVAL_NULL(&func_params[0]);
        ZVAL_NULL(&func_params[1]);
    }

    //調(diào)用回調(diào)方法,清理相關(guān)資源
    zend_try {
        if (call_user_function_ex(EG(function_table), NULL, callback,
                    &retval_ptr, params_count, func_params, 0, NULL) != SUCCESS) {
            for (i = 0; i < params_count; i++) {
                zval_ptr_dtor(&func_params[i]); 
            }
            efree(func_params);
            if (calldata) {
                php_error_docref(NULL, E_WARNING, "call to callback failed for request: '%s'", ZSTR_VAL(calldata->method));
            } else {
                php_error_docref(NULL, E_WARNING, "call to initial callback failed");
            }
            return 1;
        }
    } zend_catch {
        bailout = 1;
    } zend_end_try();

    if (!Z_ISUNDEF(retval_ptr)) {
        zval_ptr_dtor(&retval_ptr);
    }

    for (i = 0; i < params_count; i++) {
        zval_ptr_dtor(&func_params[i]); 
    }
    efree(func_params);
    return bailout? 0 : 1;
} /* }}} */

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 傳輸器結(jié)構(gòu) yar底層用一個(gè)_yar_transport_interface結(jié)構(gòu)表示一個(gè)傳輸器,處理網(wǎng)絡(luò)IO相關(guān)事...
    bromine閱讀 1,053評(píng)論 0 0
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • 序 Yar是鳥(niǎo)哥惠新宸寫(xiě)的一款并行RPC框架,是國(guó)內(nèi)PHP圈內(nèi)主流的RPC方案選擇,也是筆者公司服務(wù)化體系中的基礎(chǔ)...
    bromine閱讀 1,141評(píng)論 0 1
  • 高一分班,我和Y同學(xué)分到一個(gè)班。有一天發(fā)本子我不小心把Y同學(xué)的本子發(fā)到了他同桌那,他同桌把本子扔給他,我才知道他...
    txdyc_28456閱讀 355評(píng)論 1 1
  • webpack搭建React 1. 全局安裝Webpack, Babel, Webpack-dev-server:...
    H5日常記閱讀 659評(píng)論 0 0

友情鏈接更多精彩內(nèi)容