異步任務(wù)的封裝
//yar_transport.h
typedef struct _yar_call_data {
ulong sequence;
zend_string *uri;
zend_string *method;
zval callback;
zval ecallback;
zval parameters;
zval options;
} yar_call_data_t;
Yar用yar_call_data_t表示一個(gè)異步任務(wù),sequence是從1開(kāi)始的任務(wù)ID,除了sequence,其他基本上就是對(duì)應(yīng)Yar_Concurrent_Client::call()上的同名參數(shù)。為方便作為一個(gè)PHP變量使用該結(jié)構(gòu)體,該結(jié)構(gòu)體平時(shí)被Yar包裝成一個(gè)le_calldata型Resource。
異步客戶端
//從c源碼反推出的的PHP類定義,沒(méi)有PHP定義文件
class Yar_Concurrent_Client{
/** @var Resource[] le_calldata資源數(shù)組 */
protected static $_callstack;
/** @var Callable RPC成功的回調(diào) */
protected static $_callback;
/** @var Callable RPC失敗的回調(diào) */
protected static $_error_callback;
/** @var boolean $_start true 異步調(diào)用執(zhí)行中 false 未執(zhí)行 */
protected static $_start;
public static call($uri , $method, $parameters=null, $callback=null,$error_callback=null,$options):int
public static loop($callback=null , $error_callback=null):bool
}
Yar_Concurrent_Client定義如上,
看下Yar_Concurrent_Client::call()用于注冊(cè)一個(gè)異步RPC調(diào)用
//yar_client.c
/* {{{ proto Yar_Concurrent_Client::call($uri, $method, $parameters = NULL, $callback = NULL, $error_callback = NULL, $options = array()) */
PHP_METHOD(yar_concurrent_client, call) {
zend_string *uri, *method;
zend_string *name = NULL;
long sequence;
zval *callstack, item, *status;
zval *error_callback = NULL, *callback = NULL, *parameters = NULL, *options = NULL;
yar_call_data_t *entry;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "SS|a!z!za",
&uri, &method, ¶meters, &callback, &error_callback, &options) == FAILURE) {
return;
}
if (!ZSTR_LEN(uri)) {
php_error_docref(NULL, E_WARNING, "first parameter is expected to be a valid rpc server uri");
return;
}
if (strncasecmp(ZSTR_VAL(uri), "http://", sizeof("http://") - 1)
&& strncasecmp(ZSTR_VAL(uri), "https://", sizeof("https://") - 1)) {
php_error_docref(NULL, E_WARNING, "only http protocol is supported in concurrent client for now");
return;
}
if (!method->len) {
php_error_docref(NULL, E_WARNING, "second parameter is expected to be a valid rpc api name");
return;
}
//回調(diào)有效性檢查
if (callback && !Z_ISNULL_P(callback) &&
!zend_is_callable(callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fourth parameter is expected to be a valid callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
name = NULL;
}
if (error_callback && !Z_ISNULL_P(error_callback) &&
!zend_is_callable(error_callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fifth parameter is expected to be a valid error callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
}
//執(zhí)行中的concurrent_client不能添加任務(wù)
status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
if (Z_TYPE_P(status) == IS_TRUE) {
php_error_docref(NULL, E_WARNING, "concurrent client has already started");
RETURN_FALSE;
}
entry = ecalloc(1, sizeof(yar_call_data_t));
entry->uri = zend_string_copy(uri);
entry->method = zend_string_copy(method);
if (callback && !Z_ISNULL_P(callback)) {
ZVAL_COPY(&entry->callback, callback);
}
if (error_callback && !Z_ISNULL_P(error_callback)) {
ZVAL_COPY(&entry->ecallback, error_callback);
}
if (parameters && IS_ARRAY == Z_TYPE_P(parameters)) {
ZVAL_COPY(&entry->parameters, parameters);
}
if (options && IS_ARRAY == Z_TYPE_P(options)) {
ZVAL_COPY(&entry->options, options);
}
callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
//初始化Yar_Concurrent_Client::_callstack;
if (Z_ISNULL_P(callstack)) {
zval tmp;
array_init(&tmp);
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), &tmp);
zval_ptr_dtor(&tmp);
}
//用yar_call_data_t生成一個(gè)le_calldata資源,寫(xiě)入_callstack屬性
ZVAL_RES(&item, zend_register_resource(entry, le_calldata));
sequence = zend_hash_next_free_element(Z_ARRVAL_P(callstack));
entry->sequence = sequence + 1;
zend_hash_next_index_insert(Z_ARRVAL_P(callstack), &item);
RETURN_LONG(entry->sequence);
}
/* }}} */
整個(gè)call調(diào)用,除去防御代碼,其實(shí)就做了一件事,構(gòu)造一個(gè) le_calldata型Resource,并添加到Yar_Concurrent_Client::_callstack數(shù)組中。
接著是Yar_Concurrent_Client::loop()
//yar_client.c
PHP_METHOD(yar_concurrent_client, loop) {
zend_string *name = NULL;
zval *callstack;
zval *callback = NULL, *error_callback = NULL;
zval *status;
uint ret = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "|zz", &callback, &error_callback) == FAILURE) {
return;
}
status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
if (Z_TYPE_P(status) == IS_TRUE) {
php_error_docref(NULL, E_WARNING, "concurrent client has already started");
RETURN_FALSE;
}
if (callback && !Z_ISNULL_P(callback) &&
!zend_is_callable(callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "first argument is expected to be a valid callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
name = NULL;
}
if (error_callback && !Z_ISNULL_P(error_callback) &&
!zend_is_callable(error_callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "second argument is expected to be a valid error callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
}
callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
if (Z_ISNULL_P(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
RETURN_TRUE;
}
//回調(diào)函數(shù)寫(xiě)入$_callback,$_error_callback 類成員變量
if (callback && !Z_ISNULL_P(callback)) {
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), callback);
}
if (error_callback && !Z_ISNULL_P(error_callback)) {
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), error_callback);
}
//更新$_start類成員變量并執(zhí)行 php_yar_concurrent_client_hand()
ZVAL_BOOL(status, 1);
ret = php_yar_concurrent_client_handle(callstack);
ZVAL_BOOL(status, 0);
RETURN_BOOL(ret);
}
可見(jiàn)除了設(shè)置回調(diào)函數(shù),唯一實(shí)際有效的操作是調(diào)用php_yar_concurrent_client_hand()
//yar_client.c
int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
char *msg;
zval *calldata;
zend_ulong sequence;
yar_request_t *request;
const yar_transport_t *factory;
yar_transport_interface_t *transport;
yar_transport_multi_interface_t *multi;
//socket(Tcp/unixsock協(xié)議)傳輸器目前不支持并發(fā)調(diào)用
factory = php_yar_transport_get(ZEND_STRL("curl"));
multi = factory->multi->init();//調(diào)用curl_multi_init()初始化 yar-curl并發(fā)調(diào)度器
//遍歷Yar_Concurrent_Client::_callstack 中的yar_call_data_t 結(jié)構(gòu)體
ZEND_HASH_FOREACH_NUM_KEY_VAL(Z_ARRVAL_P(callstack), sequence, calldata) {
yar_call_data_t *entry;
long flags = 0;
entry = (yar_call_data_t *)zend_fetch_resource(Z_RES_P(calldata), "Yar Call Data", le_calldata);
if (!entry) {
continue;
}
//下面的流程和curl的同步client大同小異
//構(gòu)造request
if (Z_ISUNDEF(entry->parameters)) {
array_init(&entry->parameters);
}
transport = factory->init();
if (YAR_G(allow_persistent)) {
if (!Z_ISUNDEF(entry->options)) {
zval *flag = php_yar_client_get_opt(&entry->options, YAR_OPT_PERSISTENT);
if (flag && (Z_TYPE_P(flag) == IS_TRUE || (Z_TYPE_P(flag) == IS_LONG && Z_LVAL_P(flag)))) {
flags |= YAR_PROTOCOL_PERSISTENT;
}
}
}
if (!(request = php_yar_request_instance(entry->method,
&entry->parameters, Z_ISUNDEF(entry->options)? NULL: & entry->options))) {
transport->close(transport);
factory->destroy(transport);
return 0;
}
msg = (char*)&entry->options;
//打開(kāi)并初始化一個(gè)libc-CURL句柄
if (!transport->open(transport, entry->uri, flags, &msg)) {
php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
transport->close(transport);
factory->destroy(transport);
efree(msg);
return 0;
}
DEBUG_C(ZEND_ULONG_FMT": call api '%s' at (%c)'%s' with '%d' parameters",
request->id, ZSTR_VAL(request->method), (flags & YAR_PROTOCOL_PERSISTENT)? 'p' : 'r', entry->uri,
zend_hash_num_elements(Z_ARRVAL(request->parameters)));
//設(shè)置libc-CURL要發(fā)送的數(shù)據(jù)
if (!transport->send(transport, request, &msg)) {
php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
transport->close(transport);
factory->destroy(transport);
efree(msg);
return 0;
}
//將entry存到transport實(shí)例的data對(duì)象中,方便后續(xù)使用
transport->calldata(transport, entry);
//注冊(cè)異步任務(wù)
multi->add(multi, transport);
php_yar_request_destroy(request);
} ZEND_HASH_FOREACH_END();
//發(fā)包收包 執(zhí)行回調(diào)方法
if (!multi->exec(multi, php_yar_concurrent_client_callback)) {
multi->close(multi);
return 0;
}
//資源釋放
multi->close(multi);
return 1;
} /* }}} */
并發(fā)調(diào)度器
上面的流程大體上和之前介紹的同步調(diào)用的流程類似,但是多了一個(gè)multi = factory->multi->init();和該multi變量的相關(guān)調(diào)用。
multi相關(guān)結(jié)構(gòu)如下:
//yar_transport.h
//并發(fā)調(diào)度器
typedef struct _yar_transport_multi_interface {
//對(duì)象成員變量
void *data;
//對(duì)象方法
int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;
//并發(fā)調(diào)度器工廠
typedef struct _yar_transport_multi {
struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;
我們稱呼上面的那個(gè)(yar_transport_multi_interface_t)類型的變量為為 并發(fā)調(diào)度器。
看過(guò)傳輸器章節(jié)的結(jié)構(gòu)關(guān)系分析,這幾個(gè)結(jié)構(gòu)理解起來(lái)應(yīng)該也毫無(wú)壓力。
由于C語(yǔ)言中實(shí)現(xiàn)“繼承”所用的"父類"的結(jié)構(gòu)和“子類實(shí)例”的結(jié)構(gòu)是完全一致的,我們不能像PHP/JAVA一樣隨意往子類中添加成員變量(其實(shí)還是有辦法的,參考PHP內(nèi)核的zend_function相關(guān)結(jié)構(gòu)的實(shí)現(xiàn),思路不一樣),所以上面的(void *)data變量并不是對(duì)應(yīng)我們理解的對(duì)象中的某個(gè)變量,而是所有變量,子類對(duì)象通過(guò)存放不同類型的指針來(lái)“持有"不同數(shù)量和不同類型的成員變量。
curl的并發(fā)傳輸器實(shí)例中,其該data指針的類型為yar_curl_multi_data_t:
//curl.c
typedef struct _yar_curl_multi_data_t {
CURLM *cm;//libcurl的批處理句柄,也是curl并發(fā)調(diào)度器的核心
yar_transport_interface_t *chs;//curl傳輸器實(shí)例
} yar_curl_multi_data_t;
另外yar_transport_interface_t也有一個(gè)data用于同樣的用途,curl"子類"實(shí)例中其類型為_yar_curl_data_t:
typedef struct _yar_curl_data_t {
CURL *cp;//libcurl-CURL普通句柄
char persistent;//是否長(zhǎng)連接
smart_str buf;.//返回報(bào)文存放用的buf
smart_str postfield;//post報(bào)文
php_url *host;
yar_call_data_t *calldata;//異步任務(wù)
yar_curl_plink_t *plink;//長(zhǎng)連接用的
struct curl_slist *headers;//http header頭
yar_transport_interface_t *next;//傳輸器鏈表
#if LIBCURL_VERSION_NUM < 0x071100
zend_string *address;
#endif
} yar_curl_data_t;
相關(guān)數(shù)據(jù)結(jié)構(gòu)介紹完了,這里看下curl傳輸器的實(shí)現(xiàn)下multl的成員方法.
Init()
yar_transport_curl_multi->init()代表并發(fā)調(diào)度器的初始化,即構(gòu)造器,除了內(nèi)存分配和成員方法設(shè)置,核心就是調(diào)用libcurl的curl_multi_init生成一個(gè)libcurl-CURLM句柄
yar_transport_multi_interface_t * php_yar_curl_multi_init() /* {{{ */ {
yar_transport_multi_interface_t *multi = emalloc(sizeof(yar_transport_multi_interface_t));
yar_curl_multi_data_t *data = ecalloc(1, sizeof(yar_curl_multi_data_t));
data->cm = curl_multi_init();
multi->data = data;
multi->add = php_yar_curl_multi_add_handle;
multi->exec = php_yar_curl_multi_exec;
multi->close = php_yar_curl_multi_close;
return multi;
} /* }}} */
Add()
yar_transport_multi_interface_t->add()用于向異步調(diào)度器注冊(cè)一個(gè)異步調(diào)用。
int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) /* {{{ */ {
yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
//libcurl相關(guān)配置
php_yar_curl_prepare(handle);
//往CURLM棧添加CURL句柄
curl_multi_add_handle(multi->cm, data->cp);
//將最后注冊(cè)的傳輸器添加到傳輸器鏈表表頭
if (multi->chs) {
data->next = multi->chs;
multi->chs = handle;
} else {
multi->chs = handle;
}
return 1;
} /* }}} */
Exec()
yar_transport_multi_interface_t->exec()執(zhí)行異步調(diào)度器,基于select事件模型,使用curl_multi_perform收發(fā)所有數(shù)據(jù)。
其實(shí)Yar差不多從初版開(kāi)始就有一個(gè)epoll事件模型實(shí)現(xiàn)的異步調(diào)度器,不過(guò)啟用需要自行在config.h開(kāi)啟ENABLE_EPOLL宏。
int php_yar_curl_multi_exec(yar_transport_multi_interface_t *self, yar_concurrent_client_callback *f) /* {{{ */ {
int running_count, rest_count;
yar_curl_multi_data_t *multi;
multi = (yar_curl_multi_data_t *)self->data;
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
//調(diào)用注冊(cè)的回調(diào)函數(shù),Yar回調(diào)回調(diào)函數(shù)有兩個(gè)時(shí)機(jī)
//一個(gè)在某個(gè)請(qǐng)求返回報(bào)文接受完畢時(shí),這個(gè)是我們一般意義上理解的回調(diào)函數(shù)
//另一個(gè)是Yar的特色處理方式,在一個(gè)在所有請(qǐng)求發(fā)出后,這時(shí)會(huì)用空參數(shù)回調(diào)Yar_Concurrent_Client的回調(diào)函數(shù)讓用戶先行處理其他事情。第二種情況對(duì)應(yīng)的就是下面這面這句。
if (!f(NULL, YAR_ERR_OKEY, NULL)) {
goto bailout;
}
//用戶回調(diào)函數(shù)執(zhí)行后的異常檢查
if (EG(exception)) {
goto onerror;
}
//重復(fù)調(diào)用curl_multi_perform直到所有句柄的數(shù)據(jù)首發(fā)完成
if (running_count) {
rest_count = running_count;
do {
int max_fd, return_code;
struct timeval tv;
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
curl_multi_fdset(multi->cm, &readfds, &writefds, &exceptfds, &max_fd);
if (max_fd == -1) {
/* When max_fd returns with -1, you need to wait a while and then proceed and call
curl_multi_perform anyway, How long to wait? I would suggest 100 milliseconds at least */
tv.tv_sec = 0;
tv.tv_usec = 50000; /* sleep 50ms */
select(1, &readfds, &writefds, &exceptfds, &tv);
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
continue;
}
/* maybe we should use curl_multi_timeout like:
* curl_multi_timeout(curlm, (long *)&curl_timeout);
* if (curl_timeout == 0) {
* continue;
* } else if (curl_timeout == -1) {
* tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
* tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
* } else {
* tv.tv_sec = curl_timeout / 1000;
* tv.tv_usec = (curl_timeout % 1000) * 1000;
* }
*/
tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
return_code = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);
if (return_code > 0) {
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
} else if (-1 == return_code) {
php_error_docref(NULL, E_WARNING, "select error '%s'", strerror(errno));
goto onerror;
} else {
/* timeout */
php_error_docref(NULL, E_WARNING, "select timeout %ldms reached", YAR_G(timeout));
goto onerror;
}
//每完成任意一個(gè)請(qǐng)求就執(zhí)行一次回調(diào)
if (rest_count > running_count) {
int ret = php_yar_curl_multi_parse_response(multi, f);
if (ret == -1) {
goto bailout;
} else if (ret == 0) {
goto onerror;
}
rest_count = running_count;
}
} while (running_count);
} else {
//將各個(gè)連接的返回?cái)?shù)據(jù)分別打包成response
int ret = php_yar_curl_multi_parse_response(multi, f);
if (ret == -1) {
goto bailout;
} else if (ret == 0) {
goto onerror;
}
}
return 1;
onerror:
return 0;
bailout:
self->close(self);
zend_bailout();
return 0;
} /* }}} */
Close()
yar_transport_multi_interface_t->close()用于各種資源的清理。
void php_yar_curl_multi_close(yar_transport_multi_interface_t *self) /* {{{ */ {
yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
if (multi->chs) {
yar_transport_interface_t *p, *q;
p = multi->chs;
for( ; p;) {
yar_curl_data_t *data = (yar_curl_data_t *)p->data;
q = data->next;
curl_multi_remove_handle(multi->cm, data->cp);
p->close(p);
p = q;
}
}
curl_multi_cleanup(multi->cm);
efree(multi);
efree(self);
return ;
} /* }}} */
數(shù)據(jù)提取和反序列化
php_yar_curl_multi_parse_response()用于從收包完成后的CURLM,解析出response并執(zhí)行回調(diào)。
static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) /* {{{ */ {
int msg_in_sequence;
CURLMsg *msg;
//遍歷所有CURL的傳輸結(jié)果
do {
msg = curl_multi_info_read(multi->cm, &msg_in_sequence);
if (msg && msg->msg == CURLMSG_DONE) {
uint found = 0;
yar_transport_interface_t *handle = multi->chs, *q = NULL;
//遍歷傳輸器鏈表取出對(duì)應(yīng)的傳輸器
while (handle) {
if (msg->easy_handle == ((yar_curl_data_t*)handle->data)->cp) {
if (q) {
((yar_curl_data_t *)q->data)->next = ((yar_curl_data_t*)handle->data)->next;
} else {
multi->chs = ((yar_curl_data_t*)handle->data)->next;
}
found = 1;
break;
}
q = handle;
handle = ((yar_curl_data_t*)handle->data)->next;
}
if (found) {
long http_code = 200;
yar_response_t *response;
yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
response = php_yar_response_instance();
if (msg->data.result == CURLE_OK) {
curl_multi_remove_handle(multi->cm, data->cp);
//異常返回
if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK && http_code != 200) {
char buf[128];
uint len = snprintf(buf, sizeof(buf), "server responsed non-200 code '%ld'", http_code);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
//調(diào)用失敗回調(diào)函數(shù)
if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
//用戶在回調(diào)里使用了die()
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
//異常處理
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
handle->close(handle);
php_yar_response_destroy(response);
continue;
} else {
//成功返回
if (data->buf.s) {
char *msg = NULL;
zval *retval, rret;
yar_header_t *header;
char *payload;
size_t payload_len;
smart_str_0(&data->buf);
payload = ZSTR_VAL(data->buf.s);
payload_len = ZSTR_LEN(data->buf.s);
//協(xié)議頭解析
if (!(header = php_yar_protocol_parse(payload))) {
php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
} else {
/* skip over the leading header */
payload += sizeof(yar_header_t);
payload_len -= sizeof(yar_header_t);
//去掉協(xié)議頭后解析成isroe數(shù)組
if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &rret))) {
php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
} else {
//isroe數(shù)組轉(zhuǎn)response結(jié)構(gòu)
php_yar_response_map_retval(response, retval);
DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'", response->id, 7, payload, header->body_len, payload + 8);
zval_ptr_dtor(retval);
}
if (msg) {
efree(msg);
}
}
} else {
php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
}
//調(diào)用對(duì)應(yīng)回調(diào)函數(shù)
if (!f(data->calldata, response->status, response)) {
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
}
} else {
char *err = (char *)curl_easy_strerror(msg->data.result);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, err, strlen(err));
if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
}
handle->close(handle);
php_yar_response_destroy(response);
} else {
php_error_docref(NULL, E_WARNING, "unexpected transport info missed");
}
}
} while (msg_in_sequence);
return 1;
}
/* }}} */
回調(diào)用戶注冊(cè)方法
最后一個(gè)核心方法php_yar_concurrent_client_callback()用于回調(diào)用戶提供的回調(diào)函數(shù)
nt php_yar_concurrent_client_callback(yar_call_data_t *calldata, int status, yar_response_t *response) /* {{{ */ {
zval code, retval, retval_ptr;
zval callinfo, *callback, *func_params;
zend_bool bailout = 0;
uint params_count, i;
//第一個(gè)條件分支對(duì)應(yīng)響應(yīng)回調(diào),第二個(gè)分支對(duì)應(yīng)所有請(qǐng)求都發(fā)出后的那次空回調(diào)
if (calldata) {
//根據(jù)請(qǐng)求結(jié)果選擇成功/失敗回調(diào)函數(shù),其中單個(gè)請(qǐng)求的回調(diào)函數(shù)優(yōu)先于Yar_Concurrent_Client(loop)的回調(diào)函數(shù)
/* data callback */
if (status == YAR_ERR_OKEY) {
if (!Z_ISUNDEF(calldata->callback)) {
callback = &calldata->callback;
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
}
params_count = 2;
} else {
if (!Z_ISUNDEF(calldata->ecallback)) {
callback = &calldata->ecallback;
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), 0);
}
params_count = 3;
}
//沒(méi)有合適回調(diào)直接拋出Error或這打印遠(yuǎn)程服務(wù)方法的返回值
if (Z_ISNULL_P(callback)) {
if (status != YAR_ERR_OKEY) {
if (!Z_ISUNDEF(response->err)) {
php_yar_client_handle_error(0, response);
} else {
php_error_docref(NULL, E_WARNING, "[%d]:unknown Error", status);
}
} else if (!Z_ISUNDEF(response->retval)) {
zend_print_zval(&response->retval, 1);
}
return 1;
}
if (status == YAR_ERR_OKEY) {
if (Z_ISUNDEF(response->retval)) {
php_yar_client_trigger_error(0, YAR_ERR_REQUEST, "%s", "server responsed empty response");
return 1;
}
ZVAL_COPY(&retval, &response->retval);
} else {
ZVAL_LONG(&code, status);
ZVAL_COPY(&retval, &response->err);
}
array_init(&callinfo);
//回調(diào)函數(shù)的最后一個(gè)參數(shù)$callinfo,包含請(qǐng)求ID,uri,和遠(yuǎn)程方法名
add_assoc_long_ex(&callinfo, "sequence", sizeof("sequence") - 1, calldata->sequence);
add_assoc_str_ex(&callinfo, "uri", sizeof("uri") - 1, zend_string_copy(calldata->uri));
add_assoc_str_ex(&callinfo, "method", sizeof("method") - 1, zend_string_copy(calldata->method));
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
if (Z_ISNULL_P(callback)) {
return 1;
}
params_count = 2;
}
if (calldata && (status != YAR_ERR_OKEY)) {
//失敗回調(diào)方法接受三個(gè)參數(shù) function($code,$retval,$callinfo)){}
func_params = safe_emalloc(sizeof(zval), 3, 0);
ZVAL_COPY_VALUE(&func_params[0], &code);
ZVAL_COPY_VALUE(&func_params[1], &retval);
ZVAL_COPY_VALUE(&func_params[2], &callinfo);
} else if (calldata) {
//成功的回調(diào)方法接受2個(gè)參數(shù) function($retval,$callinfo)){}
func_params = safe_emalloc(sizeof(zval), 2, 0);
ZVAL_COPY_VALUE(&func_params[0], &retval);
ZVAL_COPY_VALUE(&func_params[1], &callinfo);
} else {
//所有請(qǐng)求都發(fā)出后的首次回調(diào)參數(shù)都是空
func_params = safe_emalloc(sizeof(zval), 2, 0);
ZVAL_NULL(&func_params[0]);
ZVAL_NULL(&func_params[1]);
}
//調(diào)用回調(diào)方法,清理相關(guān)資源
zend_try {
if (call_user_function_ex(EG(function_table), NULL, callback,
&retval_ptr, params_count, func_params, 0, NULL) != SUCCESS) {
for (i = 0; i < params_count; i++) {
zval_ptr_dtor(&func_params[i]);
}
efree(func_params);
if (calldata) {
php_error_docref(NULL, E_WARNING, "call to callback failed for request: '%s'", ZSTR_VAL(calldata->method));
} else {
php_error_docref(NULL, E_WARNING, "call to initial callback failed");
}
return 1;
}
} zend_catch {
bailout = 1;
} zend_end_try();
if (!Z_ISUNDEF(retval_ptr)) {
zval_ptr_dtor(&retval_ptr);
}
for (i = 0; i < params_count; i++) {
zval_ptr_dtor(&func_params[i]);
}
efree(func_params);
return bailout? 0 : 1;
} /* }}} */