傳輸器結(jié)構(gòu)
yar底層用一個_yar_transport_interface結(jié)構(gòu)表示一個傳輸器,處理網(wǎng)絡(luò)IO相關(guān)事宜。
typedef struct _yar_transport_interface {
void *data;//這個變量在并發(fā)調(diào)用章節(jié)才會講到,暫時不用管
int (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
int (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
int (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
int (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;
yar_transport_interface_t由yar_transport_t生成
typedef struct _yar_transport {
const char *name;
struct _yar_transport_interface * (*init)();
void (*destroy)(yar_transport_interface_t *self);
yar_transport_multi_t *multi;
} yar_transport_t;
兩者關(guān)系如圖。

前面提過Yar是典型的OO風(fēng)格的C代碼。雖然說是C語言的實現(xiàn),但要理解這里幾個相關(guān)結(jié)構(gòu)必須要用面向?qū)ο蟮难酃狻?/p>
yar_transport_t結(jié)構(gòu)體類型相當(dāng)于一個工廠接口,聲明了一個工廠骨架。
yar_transport_t類型的兩個變量相當(dāng)于 兩個 工廠接口的實現(xiàn)對象,不同的變量使用不同的函數(shù)指針形成了多態(tài)。yar_transport_t管理著yar_transport_interface結(jié)構(gòu)體變量的構(gòu)造和析構(gòu),產(chǎn)生具體的yar_transport_interface類型的實例,每個yar_transport_interface實例都有自己的成員變量。
還有一種相似的理解思路。
yar_transport_t是一個抽象類。
yar_transport_t類型的兩個變量是該抽象類的兩個實現(xiàn)子類,yar_transport_t 聲明的方法都是類方法和類變量,對應(yīng)PHP中的 __construct()和__destruct()
yar_transport_interface則是yar_transport_t這個抽象類的實例,其結(jié)構(gòu)體上的成員對應(yīng)過的是實例的成員變量和成員方法。
但無論是哪一種,理解的核心都是以下兩點:
1.yar_transport_interface是一個傳輸器對象,有自己的獨立數(shù)據(jù)和狀態(tài)。
2.yar_transport_t類型的變量和yar_transport_t類型構(gòu)成了多態(tài),用于生成不同類型的yar_transport_interface對象。
后文統(tǒng)一使用第一種方式去描述yar的傳輸器模塊。
傳輸器工廠類
yar_transport_t->name表示傳輸器工廠的名字,下文會用傳輸器工廠的名字借代該工廠構(gòu)造的傳輸器。
Yar目前支持Http,Tcp,UnixSock(僅單機內(nèi)調(diào)用可用)三種傳輸方式/協(xié)議,Http目前的傳輸器使用Curl實現(xiàn),傳輸器(工廠)的名字為curl,另外兩種兩種使用socket實現(xiàn),傳輸器名字為sock,不過由于PHP目前沒有Tcp/UnixSock的Yar Rpc Server,所以這個sock傳輸器很少會用到。
init()和destroy()是函數(shù)指針,用于構(gòu)造和銷毀 yar_transport_interface_t實例。
而multi()是另外一個工廠,用于構(gòu)造一個并發(fā)調(diào)度器實例,后續(xù)并發(fā)調(diào)用章節(jié)我們也會提到。
傳輸器類
傳輸器有sock和curl兩種,前者應(yīng)用場景較少。
但是考慮其比較簡單,沒有并發(fā)調(diào)用模式,這里將以其為示例先講述傳輸器類中同步RPC調(diào)用模式的相關(guān)方法 ,至于curl傳輸器以及 并發(fā)RPC調(diào)用 在后面會用另外一篇文章單獨分析。實際上兩種傳輸器的接口方法語義和調(diào)用方式都有一定差異。
Open()
//socket.c
int php_yar_socket_open(yar_transport_interface_t *self, zend_string *address, long options, char **err_msg) /* {{{ */ {
yar_socket_data_t *data = (yar_socket_data_t *)self->data;
struct timeval tv;
php_stream *stream = NULL;
zend_string *errstr = NULL;
char *persistent_key = NULL;
int err;
tv.tv_sec = (ulong)(YAR_G(connect_timeout) / 1000);
tv.tv_usec = (ulong)((YAR_G(connect_timeout) % 1000)? (YAR_G(connect_timeout) % 1000) * 1000 : 0);
//直到我看到這塊源碼,我才知道原來Yar是支持長連接的,長連接開關(guān)的來源為ini文件中的`yar.allow_persistent`,這個配置在php.net上也是找不到的,curl傳輸器同樣支持該配置。
if (options & YAR_PROTOCOL_PERSISTENT) {
data->persistent = 1;
spprintf(&persistent_key, 0, "yar_%s", ZSTR_VAL(address));
} else {
data->persistent = 0;
}
//發(fā)起連接或根據(jù)uri獲取可復(fù)用的舊連接
stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);
if (persistent_key) {
efree(persistent_key);
}
if (stream == NULL) {
spprintf(err_msg, 0, "Unable to connect to %s (%s)", ZSTR_VAL(address), strerror(errno));
efree(errstr);
return 0;
}
//sock傳輸器是同步阻塞的,不支持并行調(diào)用
php_stream_set_option(stream, PHP_STREAM_OPTION_BLOCKING, 0, NULL);
#if ZEND_DEBUG
stream->__exposed++;
#endif
data->stream = stream;
return 1;
} /* }}} */
yar_transport_interface->open()用于發(fā)起連接或初始化網(wǎng)絡(luò)IO相關(guān)的資源。
雖然這個socket傳輸器的名字叫'sock',但是socket方式也沒有直接使用socket()系函數(shù),而是使用了PHP提供的 流 機制封裝的統(tǒng)一接口,簡化相關(guān)通信細(xì)節(jié)。
Send()
//socket.c
int php_yar_socket_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) /* {{{ */ {
fd_set rfds;
zend_string *payload;
struct timeval tv;
int ret = -1, fd, retval;
char buf[SEND_BUF_SIZE];
yar_header_t header = {0};
//self變量相當(dāng)于python方法的self參數(shù),也類似于手動傳遞一個PHP 的$this指針供方法使用,面向?qū)ο箫L(fēng)格的C常見做法。
yar_socket_data_t *data = (yar_socket_data_t *)self->data;
FD_ZERO(&rfds);
//php的流轉(zhuǎn)文件描述符
if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
PHP_SAFE_FD_SET(fd, &rfds);
} else {
spprintf(msg, 0, "Unable cast socket fd form stream (%s)", strerror(errno));
return 0;
}
//request變量序列化
if (!(payload = php_yar_request_pack(request, msg))) {
return 0;
}
DEBUG_C(ZEND_ULONG_FMT": pack request by '%.*s', result len '%ld', content: '%.32s'",
request->id, 7, ZSTR_VAL(payload), ZSTR_LEN(payload), ZSTR_VAL(payload) + 8);
//構(gòu)建header變量
/* for tcp/unix RPC, we need another way to supports auth */
php_yar_protocol_render(&header, request->id, "Yar PHP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);
memcpy(buf, (char *)&header, sizeof(yar_header_t));
tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
//php_select()是select()的別名宏,等待socket文件描述符可寫
retval = php_select(fd+1, NULL, &rfds, NULL, &tv);
if (retval == -1) {
zend_string_release(payload);
spprintf(msg, 0, "select error '%s'", strerror(errno));
return 0;
} else if (retval == 0) {
zend_string_release(payload);
spprintf(msg, 0, "select timeout '%ld' seconds reached", YAR_G(timeout));
return 0;
}
//通過buff緩沖區(qū)循環(huán)發(fā)送所有報文(協(xié)議頭+載荷)
if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
size_t bytes_left = 0, bytes_sent = 0;
if (ZSTR_LEN(payload) > (sizeof(buf) - sizeof(yar_header_t))) {
memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), sizeof(buf) - sizeof(yar_header_t));
if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(buf), 0, NULL, 0)) < 0) {
zend_string_release(payload);
spprintf(msg, 0, "unable to send data");
return 0;
}
} else {
memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), ZSTR_LEN(payload));
if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(yar_header_t) + ZSTR_LEN(payload), 0, NULL, 0)) < 0) {
zend_string_release(payload);
spprintf(msg, 0, "unable to send data");
return 0;
}
}
bytes_sent = ret - sizeof(yar_header_t);
bytes_left = ZSTR_LEN(payload) - bytes_sent;
wait_io:
if (bytes_left) {
retval = php_select(fd+1, NULL, &rfds, NULL, &tv);
if (retval == -1) {
zend_string_release(payload);
spprintf(msg, 0, "select error '%s'", strerror(errno));
return 0;
} else if (retval == 0) {
zend_string_release(payload);
spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
return 0;
}
if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
bytes_left -= ret;
bytes_sent += ret;
}
}
goto wait_io;
}
}
zend_string_release(payload);
return ret < 0? 0 : 1;
} /* }}} */
yar_transport_interface->send()接受一個request參數(shù),用于實際的IO發(fā)包。
備注:對于curl傳輸器,由于libcurl的api限制因為其send()并未進(jìn)行實際的發(fā)包,實際上發(fā)包收包都在exec()中
Exec()
//socket.c
yar_response_t * php_yar_socket_exec(yar_transport_interface_t* self, yar_request_t *request) /* {{{ */ {
fd_set rfds;
struct timeval tv;
yar_header_t *header;
yar_response_t *response;
int fd, retval, recvd;
size_t len = 0, total_recvd = 0;
char *msg, buf[RECV_BUF_SIZE], *payload = NULL;
yar_socket_data_t *data = (yar_socket_data_t *)self->data;
response = ecalloc(1, sizeof(yar_response_t));
//php stram轉(zhuǎn)socket fd
FD_ZERO(&rfds);
if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
PHP_SAFE_FD_SET(fd, &rfds);
} else {
len = snprintf(buf, sizeof(buf), "Unable cast socket fd form stream (%s)", strerror(errno));
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
return response;
}
tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
//根據(jù)協(xié)議頭長度循環(huán)收包
wait_io:
retval = php_select(fd+1, &rfds, NULL, NULL, &tv);
if (retval == -1) {
len = snprintf(buf, sizeof(buf), "Unable to select %d '%s'", fd, strerror(errno));
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
return response;
} else if (retval == 0) {
len = snprintf(buf, sizeof(buf), "select timeout %ldms reached", YAR_G(timeout));
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
return response;
}
if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
zval *retval, rret;
//尚未解析出載荷信息/未解析出協(xié)議頭
if (!payload) {
if ((recvd = php_stream_xport_recvfrom(data->stream, buf, sizeof(buf), 0, NULL, NULL, NULL)) > 0) {
//解析出協(xié)議頭
if (!(header = php_yar_protocol_parse(buf))) {
php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
return response;
}
//根據(jù)協(xié)議頭獲取載荷長度并分配相應(yīng)內(nèi)存接受載荷數(shù)據(jù)
payload = emalloc(header->body_len);
len = header->body_len;
total_recvd = recvd - sizeof(yar_header_t);
memcpy(payload, buf + sizeof(yar_header_t), total_recvd);
if (recvd < (sizeof(yar_header_t) + len)) {
goto wait_io;
}
} else if (recvd < 0) {
/* this should never happen */
goto wait_io;
}
} else {
if ((recvd = php_stream_xport_recvfrom(data->stream, payload + total_recvd, len - total_recvd, 0, NULL, NULL, NULL)) > 0) {
total_recvd += recvd;
}
if (total_recvd < len) {
goto wait_io;
}
}
//此處已經(jīng)收包結(jié)束
if (len) {
//調(diào)用相應(yīng)打包器反序列化載荷數(shù)據(jù)成一個PHP數(shù)組變量到`rret`和`retval`中,具體參考response章節(jié)
if (!(retval = php_yar_packager_unpack(payload, len, &msg, &rret))) {
php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
efree(msg);
return response;
}
//isore數(shù)組變量轉(zhuǎn)response結(jié)構(gòu)
php_yar_response_map_retval(response, retval);
DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'",
response->id, 7, payload, header->body_len, payload + 8);
efree(payload);
zval_ptr_dtor(retval);
} else {
php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
}
return response;
} else {
goto wait_io;
}
} /* }}} */
yar_transport_interface->exec()用于IO收包,并將返回的數(shù)據(jù)封裝成response變量返回。
close()
//socket.c
void php_yar_socket_close(yar_transport_interface_t* self) /* {{{ */ {
yar_socket_data_t *data = (yar_socket_data_t *)self->data;
if (!data) {
return;
}
//長連接模式不關(guān)閉連接
if (!data->persistent && data->stream) {
php_stream_close(data->stream);
}
efree(data);
efree(self);
return;
}
/* }}} */
yar_transport_interface->exec()->close()用于相關(guān)資源的回收和釋放。
yar_transport_interface的其他接口方法
yar_transport_interface->exec()->setopt()用于相關(guān)配置項的配置,socket傳輸器沒有實現(xiàn)并使用該接口,curl在exec()方法中用于設(shè)置超時信息。
yar_transport_interface->exec()->call_data()目前僅并發(fā)調(diào)用會用到,這個我們在并發(fā)調(diào)用章節(jié)再提及。