brpc server端異步回包源碼分析(二)

大家好,我是dandyhuang。上回給大家介紹了brpc從客戶端到服務(wù)端整個(gè)收包的邏輯,詳情可見(jiàn)Brpc 服務(wù)端收包源碼分析(一),本次咱們介紹server端異步回包邏輯,同步直接response寫(xiě)數(shù)據(jù)即可。

server異步處理

// 異步回調(diào)
void helloServiceImpl::ServiceCb(brpc::Controller* cntl, ::hello::CommonResponse* response){
  LOG(INFO) << "response: " << response->ByteSize();
  if (cntl->ErrorCode() != 0) {
    LOG(ERROR) << " hello error code:" << cntl->ErrorCode() << " msg:" << cntl->ErrorText();
  }

  // std::unique_ptr makes sure cntl/response will be deleted before returning.
  std::unique_ptr<brpc::Controller> cntl_guard(cntl);
  std::unique_ptr<::hello::CommonResponse> response_guard(response);
  response_->Swap(response);
  // 回包
  done_->Run();
}

// 服務(wù)端
void helloServiceImpl::echo(::google::protobuf::RpcController* controller,
                               const ::hello::CommonRequest* request,
                               ::hello::CommonResponse* response,
                               ::google::protobuf::Closure* done) {
  // 初始化存儲(chǔ)客戶端發(fā)送的信息
  brpc::ClosureGuard done_guard(done);
  brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
  start_time_ = butil::gettimeofday_us();
  done_ = done;
  response_ = response;

    // channel初始化
  brpc::Channel channel;
  brpc::ChannelOptions options;
  options.protocol = FLAGS_protocol;
  options.connection_type = FLAGS_connection_type;
  options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
  options.max_retry = FLAGS_max_retry;
  if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
      LOG(ERROR) << "Fail to initialize channel";
      return -1;
  }

  auto cli_cntl = new brpc::Controller();
  ::hello::CommonResponse* cli_resp = new ::hello::CommonResponse();
    // 調(diào)用下游服務(wù)
  hello::CommonService_Stub stub(channel);
  auto cli_done = brpc::NewCallback(this, &helloServiceImpl::ServiceCb, cli_cntl, cli_resp);
  stub.echo(cli_cntl, request, cli_resp, cli_done);
    // 先釋放、并沒(méi)有立刻回包給客戶端
  done_guard.release();
}

~ClosureGuard() {
  // _done為NULL,不執(zhí)行run
  if (_done) {
    _done->Run();
  }
}

google::protobuf::Closure* release() {
  google::protobuf::Closure* const prev_done = _done;
  // 提前先將_done置空,析構(gòu)的時(shí)候,就不會(huì)回包
  _done = NULL;
  return prev_done;
}

??注意:我們這里當(dāng)服務(wù)端收到數(shù)據(jù)時(shí),我們brpc::ClosureGuard 將done節(jié)點(diǎn)存儲(chǔ)起來(lái),在調(diào)用下游服務(wù)后,再調(diào)用done_guard.release(),將done先釋放,但不回包。在處理完業(yè)務(wù)邏輯后,異步回調(diào),在調(diào)用done_->Run()。對(duì)客戶端進(jìn)行回包,從而達(dá)到異步的效果。

ProcessHttpRequest回包邏輯

void ProcessHttpRequest(InputMessageBase *msg) {
    Controller* cntl = new (std::nothrow) Controller;
    // cntl存儲(chǔ)
    HttpResponseSender resp_sender(cntl);
    // resp存儲(chǔ)
    google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
    resp_sender.own_response(res);
    // 最終都會(huì)存到HttpResponseSenderAsDone
    google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
   ···
    // res都會(huì)存儲(chǔ)到resp_sender中
    svc->CallMethod(method, cntl, req, res, done);
  
}

以http例子為,回包resdone都會(huì)存儲(chǔ)到HttpResponseSenderAsDone中,所以業(yè)務(wù)邏輯代碼中done_->Run();我們看它是如何回包的。

HttpResponseSenderAsDone->Run()

class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
    HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
    void Run() override { delete this; }
private:
    // 析構(gòu)delete
    HttpResponseSender _sender;
};

HttpResponseSender::~HttpResponseSender() {
    // cntl信息
    Controller* cntl = _cntl.get();
    if (cntl == NULL) {
        return;
    }
    ControllerPrivateAccessor accessor(cntl);
    Span* span = accessor.span();
    if (span) {
        span->set_start_send_us(butil::cpuwide_time_us());
    }
    ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
    // 獲取套接字信息
    Socket* socket = accessor.get_sending_socket();
    // 回包的pb msg。就是我們填充respone返回的protobuf的數(shù)據(jù)
    const google::protobuf::Message* res = _res.get();
    // 判斷一下client是否連接關(guān)閉
    if (cntl->IsCloseConnection()) {
        socket->SetFailed();
        return;
    }
    // 請(qǐng)求的包頭
    const HttpHeader* req_header = &cntl->http_request();
    // 返回http的包頭
    HttpHeader* res_header = &cntl->http_response();
    res_header->set_version(req_header->major_version(),
                            req_header->minor_version());
    //后續(xù)都是設(shè)置一些http回包的包頭
    const std::string* content_type_str = &res_header->content_type();
    if (content_type_str->empty()) {
        // Use request's content_type if response's is not set.
        content_type_str = &req_header->content_type();
        res_header->set_content_type(*content_type_str);
    }
    bool is_grpc_ct = false;
    const HttpContentType content_type = ParseContentType(*content_type_str, &is_grpc_ct);
    const bool is_http2 = req_header->is_http2();
    const bool is_grpc = (is_http2 && is_grpc_ct);

    if (res != NULL &&
        cntl->response_attachment().empty() && res->GetDescriptor()->field_count() > 0 &&!cntl->Failed()) { 
        butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
        // 對(duì)body 是json和pb的轉(zhuǎn)化處理
        if (content_type == HTTP_CONTENT_PROTO) {
            if (!res->SerializeToZeroCopyStream(&wrapper)) {
                cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
            }
        } else {
            std::string err;
            json2pb::Pb2JsonOptions opt;
            opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
            opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
            opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
            opt.enum_option = (FLAGS_pb_enum_as_number
                               ? json2pb::OUTPUT_ENUM_BY_NUMBER
                               : json2pb::OUTPUT_ENUM_BY_NAME);
            if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
                cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
            }
        }
    }
        // 判斷是http1.0和1.1還是grpc等,對(duì)應(yīng)的包頭的設(shè)置
    if (!is_http2) {
        const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
        if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
            const std::string* req_conn =
                req_header->GetHeader(common->CONNECTION);
            if (req_header->before_http_1_1()) {
                if (req_conn != NULL &&
                    strcasecmp(req_conn->c_str(), "keep-alive") == 0) {
                    res_header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
                }
            } else {
                if (req_conn != NULL &&
                    strcasecmp(req_conn->c_str(), "close") == 0) {
                    res_header->SetHeader(common->CONNECTION, common->CLOSE);
                }
            }
        } // else user explicitly set Connection:close, clients of
        // HTTP 1.1/1.0/0.9 should all close the connection.
    } else if (is_grpc) {
        // status code is always 200 according to grpc protocol
        res_header->set_status_code(HTTP_STATUS_OK);
    }
    
    bool grpc_compressed = false;
    // 失敗處理
    if (cntl->Failed()) {
        cntl->response_attachment().clear();
        if (!is_grpc) {
            // Set status-code with default value(converted from error code)
            // if user did not set it.
            if (res_header->status_code() == HTTP_STATUS_OK) {
                res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
            }
            // Fill ErrorCode into header
            res_header->SetHeader(common->ERROR_CODE,
                                  butil::string_printf("%d", cntl->ErrorCode()));
            res_header->RemoveHeader(common->CONTENT_ENCODING);
            res_header->set_content_type(common->CONTENT_TYPE_TEXT);
            cntl->response_attachment().append(cntl->ErrorText());
        }
       // chunked 協(xié)議的處理
    } else if (cntl->has_progressive_writer()) {
        // Transfer-Encoding is supported since HTTP/1.1
        if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
            res_header->SetHeader("Transfer-Encoding", "chunked");
        }
        if (!cntl->response_attachment().empty()) {
            LOG(ERROR) << "response_attachment(size="
                       << cntl->response_attachment().size() << ") will be"
                " ignored when CreateProgressiveAttachment() was called";
        }
        // gzip壓縮處理 
    } else if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
        const size_t response_size = cntl->response_attachment().size();
        if (response_size >= (size_t)FLAGS_http_body_compress_threshold
            && (is_http2 || SupportGzip(cntl))) {
            TRACEPRINTF("Compressing response=%lu", (unsigned long)response_size);
            butil::IOBuf tmpbuf;
            if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
                cntl->response_attachment().swap(tmpbuf);
                if (is_grpc) {
                    grpc_compressed = true;
                    res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
                } else {
                    res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP);
                }
            } else {
                LOG(ERROR) << "Fail to gzip the http response, skip compression.";
            }
        }
    } else {
        // TODO(gejun): Support snappy (grpc)
        LOG_IF(ERROR, cntl->response_compress_type() != COMPRESS_TYPE_NONE)
            << "Unknown compress_type=" << cntl->response_compress_type()
            << ", skip compression.";
    }

    int rc = -1;
    // Have the risk of unlimited pending responses, in which case, tell
    // users to set max_concurrency.
    Socket::WriteOptions wopt;
    wopt.ignore_eovercrowded = true;
    // h2協(xié)議
    if (is_http2) {
        if (is_grpc) {
            // Append compressed and length before body
            AddGrpcPrefix(&cntl->response_attachment(), grpc_compressed);
        }
        // grpc協(xié)議構(gòu)建
        SocketMessagePtr<H2UnsentResponse> h2_response(
                H2UnsentResponse::New(cntl, _h2_stream_id, is_grpc));
        if (h2_response == NULL) {
            LOG(ERROR) << "Fail to make http2 response";
            errno = EINVAL;
            rc = -1;
        } else {
            if (FLAGS_http_verbose) {
                LOG(INFO) << '\n' << *h2_response;
            }
            if (span) {
                span->set_response_size(h2_response->EstimatedByteSize());
            }
            // 發(fā)送數(shù)據(jù)
            rc = socket->Write(h2_response, &wopt);
        }
    } else {
        // http協(xié)議
        butil::IOBuf* content = NULL;
        if (cntl->Failed() || !cntl->has_progressive_writer()) {
            content = &cntl->response_attachment();
        }
        butil::IOBuf res_buf;
        // 構(gòu)建http協(xié)議回包,包頭+body
        MakeRawHttpResponse(&res_buf, res_header, content);
        if (FLAGS_http_verbose) {
            PrintMessage(res_buf, false, !!content);
        }
        if (span) {
            span->set_response_size(res_buf.size());
        }
        // 發(fā)送數(shù)據(jù)
        rc = socket->Write(&res_buf, &wopt);
    }

    if (rc != 0) {
        // EPIPE is common in pooled connections + backup requests.
        const int errcode = errno;
        PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
        cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
        return;
    }
    if (span) {
        // TODO: this is not sent
        span->set_sent_us(butil::cpuwide_time_us());
    }
}

根據(jù)control和resp message信息,構(gòu)造http包數(shù)據(jù)。最終套接字socket->Write;

Socket::Write寫(xiě)數(shù)據(jù)

int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
    WriteOptions opt;
    if (options_in) {
        opt = *options_in;
    }
    // 判斷數(shù)據(jù)是否為空
    if (data->empty()) {
        return SetError(opt.id_wait, EINVAL);
    }
    if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
        LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
                   << " is too large";
        return SetError(opt.id_wait, EOVERFLOW);
    }
    if (Failed()) {
        const int rc = ConductError(opt.id_wait);
        if (rc <= 0) {
            return rc;
        }
    }

    if (!opt.ignore_eovercrowded && _overcrowded) {
        return SetError(opt.id_wait, EOVERCROWDED);
    }
        // 獲取WriteRequest
    WriteRequest* req = butil::get_object<WriteRequest>();
    if (!req) {
        return SetError(opt.id_wait, ENOMEM);
    }
        // 數(shù)據(jù)存儲(chǔ)
    req->data.swap(*data);
    req->next = WriteRequest::UNCONNECTED;
    req->id_wait = opt.id_wait;
    // 設(shè)置pipe個(gè)數(shù)等信息,如redis的pipeline調(diào)用
    req->set_pipelined_count_and_user_message(
        opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
    return StartWrite(req, opt);
}

構(gòu)造WriteRequest,調(diào)用StartWrite

StartWrite

int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
    // Release fence makes sure the thread getting request sees *req
    WriteRequest* const prev_head =
        _write_head.exchange(req, butil::memory_order_release);
    if (prev_head != NULL) {
        req->next = prev_head;
        return 0;
    }

    int saved_errno = 0;
    bthread_t th;
    SocketUniquePtr ptr_for_keep_write;
    ssize_t nw = 0;

    // We've got the right to write.
    req->next = NULL;
    
    // 判斷fd是否斷開(kāi)鏈接了
    int ret = ConnectIfNot(opt.abstime, req);
    if (ret < 0) {
        saved_errno = errno;
        SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
        goto FAIL_TO_WRITE;
    } else if (ret == 1) {
        return 0;
    }

    // 記錄未發(fā)送的數(shù)據(jù)大小,pipe個(gè)數(shù)大小,如果發(fā)送失敗,可以繼續(xù)發(fā)送
    req->Setup(this);
    if (_conn) {
        butil::IOBuf* data_arr[1] = { &req->data };
        nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
    } else {
        // 看是發(fā)送數(shù)據(jù)writev寫(xiě)多個(gè)非連續(xù)緩沖區(qū)
        nw = req->data.cut_into_file_descriptor(fd());
    }
    if (nw < 0) {
        // 判斷發(fā)送的失敗是否正常
        if (errno != EAGAIN && errno != EOVERCROWDED) {
            saved_errno = errno;
            // EPIPE is common in pooled connections + backup requests.
            PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
            SetFailed(saved_errno, "Fail to write into %s: %s", 
                      description().c_str(), berror(saved_errno));
            goto FAIL_TO_WRITE;
        }
    } else {
        // 將已發(fā)送的數(shù)據(jù)進(jìn)行記錄
        AddOutputBytes(nw);
    }
    // 判斷有新的請(qǐng)求,沒(méi)有新請(qǐng)求,則返回。有則繼續(xù)寫(xiě)入
    if (IsWriteComplete(req, true, NULL)) {
        ReturnSuccessfulWriteRequest(req);
        return 0;
    }

KEEPWRITE_IN_BACKGROUND:
    ReAddress(&ptr_for_keep_write);
    req->socket = ptr_for_keep_write.release();
    // 如果發(fā)送失敗,或者還有剩余的包,繼續(xù)發(fā)送
    if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
                                 KeepWrite, req) != 0) {
        LOG(FATAL) << "Fail to start KeepWrite";
        KeepWrite(req);
    }
    return 0;

FAIL_TO_WRITE:
    // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
    // `on_reset' callback inside the id object) so that we immediately
    // know this socket has failed inside the `on_reset' callback
    ReleaseAllFailedWriteRequests(req);
    errno = saved_errno;
    return -1;
}

cut_into_file_descriptor()中,_ref_at分好數(shù)據(jù),開(kāi)始調(diào)用系統(tǒng)函數(shù)writev寫(xiě)多個(gè)非連續(xù)緩沖區(qū)數(shù)據(jù)。寫(xiě)成功,則清空iobuf中的數(shù)據(jù),如果寫(xiě)失敗。正常的異常錯(cuò)誤。繼續(xù)keepwrite寫(xiě)。IsWriteComplete中判斷是否有沒(méi)有新的請(qǐng)求。沒(méi)有新請(qǐng)求,則返回。有則執(zhí)行,并調(diào)用KeepWrite繼續(xù)寫(xiě)。 而KeepWrite和StartWrite邏輯大致相同。

總結(jié)

整個(gè)邏輯處理,還是比較清晰。收到回調(diào)函數(shù),在處理完業(yè)務(wù)邏輯后,校驗(yàn)一些包的合法性等。 就直接調(diào)用writev函數(shù)回包給客戶端了。至此,整個(gè)服務(wù)端的處理邏輯我們都分析完了,下期我們來(lái)分析一下brpc 客戶端是如何調(diào)用的。

大家可以添加我的wx一起探討

我是dandyhuang_,碼字不易,點(diǎn)個(gè)小贊,只希望大家能更加明白

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容