大家好,我是dandyhuang。上回給大家介紹了brpc從客戶端到服務(wù)端整個(gè)收包的邏輯,詳情可見(jiàn)Brpc 服務(wù)端收包源碼分析(一),本次咱們介紹server端異步回包邏輯,同步直接response寫(xiě)數(shù)據(jù)即可。
server異步處理
// 異步回調(diào)
void helloServiceImpl::ServiceCb(brpc::Controller* cntl, ::hello::CommonResponse* response){
LOG(INFO) << "response: " << response->ByteSize();
if (cntl->ErrorCode() != 0) {
LOG(ERROR) << " hello error code:" << cntl->ErrorCode() << " msg:" << cntl->ErrorText();
}
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<::hello::CommonResponse> response_guard(response);
response_->Swap(response);
// 回包
done_->Run();
}
// 服務(wù)端
void helloServiceImpl::echo(::google::protobuf::RpcController* controller,
const ::hello::CommonRequest* request,
::hello::CommonResponse* response,
::google::protobuf::Closure* done) {
// 初始化存儲(chǔ)客戶端發(fā)送的信息
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
start_time_ = butil::gettimeofday_us();
done_ = done;
response_ = response;
// channel初始化
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
auto cli_cntl = new brpc::Controller();
::hello::CommonResponse* cli_resp = new ::hello::CommonResponse();
// 調(diào)用下游服務(wù)
hello::CommonService_Stub stub(channel);
auto cli_done = brpc::NewCallback(this, &helloServiceImpl::ServiceCb, cli_cntl, cli_resp);
stub.echo(cli_cntl, request, cli_resp, cli_done);
// 先釋放、并沒(méi)有立刻回包給客戶端
done_guard.release();
}
~ClosureGuard() {
// _done為NULL,不執(zhí)行run
if (_done) {
_done->Run();
}
}
google::protobuf::Closure* release() {
google::protobuf::Closure* const prev_done = _done;
// 提前先將_done置空,析構(gòu)的時(shí)候,就不會(huì)回包
_done = NULL;
return prev_done;
}
??注意:我們這里當(dāng)服務(wù)端收到數(shù)據(jù)時(shí),我們brpc::ClosureGuard 將done節(jié)點(diǎn)存儲(chǔ)起來(lái),在調(diào)用下游服務(wù)后,再調(diào)用done_guard.release(),將done先釋放,但不回包。在處理完業(yè)務(wù)邏輯后,異步回調(diào),在調(diào)用done_->Run()。對(duì)客戶端進(jìn)行回包,從而達(dá)到異步的效果。
ProcessHttpRequest回包邏輯
void ProcessHttpRequest(InputMessageBase *msg) {
Controller* cntl = new (std::nothrow) Controller;
// cntl存儲(chǔ)
HttpResponseSender resp_sender(cntl);
// resp存儲(chǔ)
google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
resp_sender.own_response(res);
// 最終都會(huì)存到HttpResponseSenderAsDone
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
···
// res都會(huì)存儲(chǔ)到resp_sender中
svc->CallMethod(method, cntl, req, res, done);
}
以http例子為,回包res和done都會(huì)存儲(chǔ)到HttpResponseSenderAsDone中,所以業(yè)務(wù)邏輯代碼中done_->Run();我們看它是如何回包的。
HttpResponseSenderAsDone->Run()
class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override { delete this; }
private:
// 析構(gòu)delete
HttpResponseSender _sender;
};
HttpResponseSender::~HttpResponseSender() {
// cntl信息
Controller* cntl = _cntl.get();
if (cntl == NULL) {
return;
}
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
// 獲取套接字信息
Socket* socket = accessor.get_sending_socket();
// 回包的pb msg。就是我們填充respone返回的protobuf的數(shù)據(jù)
const google::protobuf::Message* res = _res.get();
// 判斷一下client是否連接關(guān)閉
if (cntl->IsCloseConnection()) {
socket->SetFailed();
return;
}
// 請(qǐng)求的包頭
const HttpHeader* req_header = &cntl->http_request();
// 返回http的包頭
HttpHeader* res_header = &cntl->http_response();
res_header->set_version(req_header->major_version(),
req_header->minor_version());
//后續(xù)都是設(shè)置一些http回包的包頭
const std::string* content_type_str = &res_header->content_type();
if (content_type_str->empty()) {
// Use request's content_type if response's is not set.
content_type_str = &req_header->content_type();
res_header->set_content_type(*content_type_str);
}
bool is_grpc_ct = false;
const HttpContentType content_type = ParseContentType(*content_type_str, &is_grpc_ct);
const bool is_http2 = req_header->is_http2();
const bool is_grpc = (is_http2 && is_grpc_ct);
if (res != NULL &&
cntl->response_attachment().empty() && res->GetDescriptor()->field_count() > 0 &&!cntl->Failed()) {
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
// 對(duì)body 是json和pb的轉(zhuǎn)化處理
if (content_type == HTTP_CONTENT_PROTO) {
if (!res->SerializeToZeroCopyStream(&wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
}
} else {
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
}
}
}
// 判斷是http1.0和1.1還是grpc等,對(duì)應(yīng)的包頭的設(shè)置
if (!is_http2) {
const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
const std::string* req_conn =
req_header->GetHeader(common->CONNECTION);
if (req_header->before_http_1_1()) {
if (req_conn != NULL &&
strcasecmp(req_conn->c_str(), "keep-alive") == 0) {
res_header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
}
} else {
if (req_conn != NULL &&
strcasecmp(req_conn->c_str(), "close") == 0) {
res_header->SetHeader(common->CONNECTION, common->CLOSE);
}
}
} // else user explicitly set Connection:close, clients of
// HTTP 1.1/1.0/0.9 should all close the connection.
} else if (is_grpc) {
// status code is always 200 according to grpc protocol
res_header->set_status_code(HTTP_STATUS_OK);
}
bool grpc_compressed = false;
// 失敗處理
if (cntl->Failed()) {
cntl->response_attachment().clear();
if (!is_grpc) {
// Set status-code with default value(converted from error code)
// if user did not set it.
if (res_header->status_code() == HTTP_STATUS_OK) {
res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
}
// Fill ErrorCode into header
res_header->SetHeader(common->ERROR_CODE,
butil::string_printf("%d", cntl->ErrorCode()));
res_header->RemoveHeader(common->CONTENT_ENCODING);
res_header->set_content_type(common->CONTENT_TYPE_TEXT);
cntl->response_attachment().append(cntl->ErrorText());
}
// chunked 協(xié)議的處理
} else if (cntl->has_progressive_writer()) {
// Transfer-Encoding is supported since HTTP/1.1
if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
res_header->SetHeader("Transfer-Encoding", "chunked");
}
if (!cntl->response_attachment().empty()) {
LOG(ERROR) << "response_attachment(size="
<< cntl->response_attachment().size() << ") will be"
" ignored when CreateProgressiveAttachment() was called";
}
// gzip壓縮處理
} else if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
const size_t response_size = cntl->response_attachment().size();
if (response_size >= (size_t)FLAGS_http_body_compress_threshold
&& (is_http2 || SupportGzip(cntl))) {
TRACEPRINTF("Compressing response=%lu", (unsigned long)response_size);
butil::IOBuf tmpbuf;
if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
cntl->response_attachment().swap(tmpbuf);
if (is_grpc) {
grpc_compressed = true;
res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP);
}
} else {
LOG(ERROR) << "Fail to gzip the http response, skip compression.";
}
}
} else {
// TODO(gejun): Support snappy (grpc)
LOG_IF(ERROR, cntl->response_compress_type() != COMPRESS_TYPE_NONE)
<< "Unknown compress_type=" << cntl->response_compress_type()
<< ", skip compression.";
}
int rc = -1;
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
// h2協(xié)議
if (is_http2) {
if (is_grpc) {
// Append compressed and length before body
AddGrpcPrefix(&cntl->response_attachment(), grpc_compressed);
}
// grpc協(xié)議構(gòu)建
SocketMessagePtr<H2UnsentResponse> h2_response(
H2UnsentResponse::New(cntl, _h2_stream_id, is_grpc));
if (h2_response == NULL) {
LOG(ERROR) << "Fail to make http2 response";
errno = EINVAL;
rc = -1;
} else {
if (FLAGS_http_verbose) {
LOG(INFO) << '\n' << *h2_response;
}
if (span) {
span->set_response_size(h2_response->EstimatedByteSize());
}
// 發(fā)送數(shù)據(jù)
rc = socket->Write(h2_response, &wopt);
}
} else {
// http協(xié)議
butil::IOBuf* content = NULL;
if (cntl->Failed() || !cntl->has_progressive_writer()) {
content = &cntl->response_attachment();
}
butil::IOBuf res_buf;
// 構(gòu)建http協(xié)議回包,包頭+body
MakeRawHttpResponse(&res_buf, res_header, content);
if (FLAGS_http_verbose) {
PrintMessage(res_buf, false, !!content);
}
if (span) {
span->set_response_size(res_buf.size());
}
// 發(fā)送數(shù)據(jù)
rc = socket->Write(&res_buf, &wopt);
}
if (rc != 0) {
// EPIPE is common in pooled connections + backup requests.
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
}
if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
}
根據(jù)control和resp message信息,構(gòu)造http包數(shù)據(jù)。最終套接字socket->Write;
Socket::Write寫(xiě)數(shù)據(jù)
int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
WriteOptions opt;
if (options_in) {
opt = *options_in;
}
// 判斷數(shù)據(jù)是否為空
if (data->empty()) {
return SetError(opt.id_wait, EINVAL);
}
if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
<< " is too large";
return SetError(opt.id_wait, EOVERFLOW);
}
if (Failed()) {
const int rc = ConductError(opt.id_wait);
if (rc <= 0) {
return rc;
}
}
if (!opt.ignore_eovercrowded && _overcrowded) {
return SetError(opt.id_wait, EOVERCROWDED);
}
// 獲取WriteRequest
WriteRequest* req = butil::get_object<WriteRequest>();
if (!req) {
return SetError(opt.id_wait, ENOMEM);
}
// 數(shù)據(jù)存儲(chǔ)
req->data.swap(*data);
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
// 設(shè)置pipe個(gè)數(shù)等信息,如redis的pipeline調(diào)用
req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
return StartWrite(req, opt);
}
構(gòu)造WriteRequest,調(diào)用StartWrite
StartWrite
int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
// Release fence makes sure the thread getting request sees *req
WriteRequest* const prev_head =
_write_head.exchange(req, butil::memory_order_release);
if (prev_head != NULL) {
req->next = prev_head;
return 0;
}
int saved_errno = 0;
bthread_t th;
SocketUniquePtr ptr_for_keep_write;
ssize_t nw = 0;
// We've got the right to write.
req->next = NULL;
// 判斷fd是否斷開(kāi)鏈接了
int ret = ConnectIfNot(opt.abstime, req);
if (ret < 0) {
saved_errno = errno;
SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
goto FAIL_TO_WRITE;
} else if (ret == 1) {
return 0;
}
// 記錄未發(fā)送的數(shù)據(jù)大小,pipe個(gè)數(shù)大小,如果發(fā)送失敗,可以繼續(xù)發(fā)送
req->Setup(this);
if (_conn) {
butil::IOBuf* data_arr[1] = { &req->data };
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
// 看是發(fā)送數(shù)據(jù)writev寫(xiě)多個(gè)非連續(xù)緩沖區(qū)
nw = req->data.cut_into_file_descriptor(fd());
}
if (nw < 0) {
// 判斷發(fā)送的失敗是否正常
if (errno != EAGAIN && errno != EOVERCROWDED) {
saved_errno = errno;
// EPIPE is common in pooled connections + backup requests.
PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
SetFailed(saved_errno, "Fail to write into %s: %s",
description().c_str(), berror(saved_errno));
goto FAIL_TO_WRITE;
}
} else {
// 將已發(fā)送的數(shù)據(jù)進(jìn)行記錄
AddOutputBytes(nw);
}
// 判斷有新的請(qǐng)求,沒(méi)有新請(qǐng)求,則返回。有則繼續(xù)寫(xiě)入
if (IsWriteComplete(req, true, NULL)) {
ReturnSuccessfulWriteRequest(req);
return 0;
}
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
// 如果發(fā)送失敗,或者還有剩余的包,繼續(xù)發(fā)送
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
return 0;
FAIL_TO_WRITE:
// `SetFailed' before `ReturnFailedWriteRequest' (which will calls
// `on_reset' callback inside the id object) so that we immediately
// know this socket has failed inside the `on_reset' callback
ReleaseAllFailedWriteRequests(req);
errno = saved_errno;
return -1;
}
cut_into_file_descriptor()中,_ref_at分好數(shù)據(jù),開(kāi)始調(diào)用系統(tǒng)函數(shù)writev寫(xiě)多個(gè)非連續(xù)緩沖區(qū)數(shù)據(jù)。寫(xiě)成功,則清空iobuf中的數(shù)據(jù),如果寫(xiě)失敗。正常的異常錯(cuò)誤。繼續(xù)keepwrite寫(xiě)。IsWriteComplete中判斷是否有沒(méi)有新的請(qǐng)求。沒(méi)有新請(qǐng)求,則返回。有則執(zhí)行,并調(diào)用KeepWrite繼續(xù)寫(xiě)。 而KeepWrite和StartWrite邏輯大致相同。
總結(jié)
整個(gè)邏輯處理,還是比較清晰。收到回調(diào)函數(shù),在處理完業(yè)務(wù)邏輯后,校驗(yàn)一些包的合法性等。 就直接調(diào)用writev函數(shù)回包給客戶端了。至此,整個(gè)服務(wù)端的處理邏輯我們都分析完了,下期我們來(lái)分析一下brpc 客戶端是如何調(diào)用的。
大家可以添加我的wx一起探討
我是dandyhuang_,碼字不易,點(diǎn)個(gè)小贊,只希望大家能更加明白