Nginx stream(UDP)模塊分析

Nginx stream(UDP)模塊分析


ngx_stream_handler.c

<i class="icon-file"></i> ngx_stream_init_connection函數

代碼解讀:

  • 在ngx_stream_optimize_servers里設置有連接發(fā)生時的回調函數ngx_stream_init_connection.
  • 創(chuàng)建一個處理tcp的會話對象.
  • 創(chuàng)建ctx數組,用于存儲模塊的ctx數據,調用handler,處理tcp數據,收發(fā)等等,讀事件處理函數,執(zhí)行處理引擎.
  • 按階段執(zhí)行處理引擎ngx_stream_core_run_phases,調用各個模塊的handler.

ngx_stream_proxy_module.c

<i class="icon-file"></i> ngx_stream_proxy_pass函數

代碼解讀:

  • 解析proxy_pass指令,設置處理handler=ngx_stream_proxy_handler,在init建立連接之后會調用.
  • 獲取一個upstream{}塊的配置信息.

<i class="icon-file"></i> ngx_stream_proxy_handler函數

核心代碼解讀:

  • ngx_stream_init_connection->ngx_stream_init_session之后調用,處理請求.
static void
ngx_stream_proxy_handler(ngx_stream_session_t *s)
{
    u_char                           *p;
    ngx_str_t                        *host;
    ngx_uint_t                        i;
    ngx_connection_t                 *c;
    ngx_resolver_ctx_t               *ctx, temp;
    ngx_stream_upstream_t            *u;
    ngx_stream_core_srv_conf_t       *cscf;
    ngx_stream_proxy_srv_conf_t      *pscf;
    ngx_stream_upstream_srv_conf_t   *uscf, **uscfp;
    ngx_stream_upstream_main_conf_t  *umcf;

    // 獲取連接對象
    c = s->connection;

    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);

    ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
                   "proxy connection handler");

    // 創(chuàng)建連接上游的結構體
    // 里面有如何獲取負載均衡server、上下游buf等
    u = ngx_pcalloc(c->pool, sizeof(ngx_stream_upstream_t));
    if (u == NULL) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    // 關聯(lián)到會話對象
    s->upstream = u;

    s->log_handler = ngx_stream_proxy_log_error;

    u->peer.log = c->log;
    u->peer.log_error = NGX_ERROR_ERR;

    if (ngx_stream_proxy_set_local(s, u, pscf->local) != NGX_OK) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    // 連接的類型,tcp or udp
    u->peer.type = c->type;

    // 開始連接后端的時間
    // 準備開始連接,設置開始時間,秒數,沒有毫秒
    u->start_sec = ngx_time();

    // 連接的讀寫事件都設置為ngx_stream_proxy_downstream_handler
    // 注意這個連接是客戶端發(fā)起的連接,即下游
    // 當客戶端連接可讀或可寫時就會調用ngx_stream_proxy_downstream_handler
    c->write->handler = ngx_stream_proxy_downstream_handler;
    c->read->handler = ngx_stream_proxy_downstream_handler;

    // 使用數組,可能會連接多個上游服務器
    s->upstream_states = ngx_array_create(c->pool, 1,
                                          sizeof(ngx_stream_upstream_state_t));
    if (s->upstream_states == NULL) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    // 如果是tcp連接,那么創(chuàng)建一個緩沖區(qū),用來接收數據
    if (c->type == SOCK_STREAM) {
        p = ngx_pnalloc(c->pool, pscf->buffer_size);
        if (p == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        // 注意是給下游使用的緩沖區(qū)
        u->downstream_buf.start = p;
        u->downstream_buf.end = p + pscf->buffer_size;
        u->downstream_buf.pos = p;
        u->downstream_buf.last = p;

        // 連接可讀,表示客戶端有數據發(fā)過來
        // 加入到&ngx_posted_events
        // 稍后由ngx_stream_proxy_downstream_handler來處理
        if (c->read->ready) {
            ngx_post_event(c->read, &ngx_posted_events);
        }
    }

    // udp不需要,始終用一個固定大小的數組接收數據

    // proxy_pass支持復雜變量
    // 如果使用了"proxy_pass $xxx",那么就要解析復雜變量
    if (pscf->upstream_value) {
        if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    // 檢查proxy_pass的目標地址
    if (u->resolved == NULL) {

        uscf = pscf->upstream;

    } else {

#if (NGX_STREAM_SSL)
        u->ssl_name = u->resolved->host;
#endif

        host = &u->resolved->host;

        // 獲取上游的配置結構體
        // 在ngx_stream_proxy_pass里設置的
        //uscf = pscf->upstream;

        umcf = ngx_stream_get_module_main_conf(s, ngx_stream_upstream_module);

        uscfp = umcf->upstreams.elts;

        for (i = 0; i < umcf->upstreams.nelts; i++) {

            uscf = uscfp[i];

            if (uscf->host.len == host->len
                && ((uscf->port == 0 && u->resolved->no_port)
                     || uscf->port == u->resolved->port)
                && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
            {
                goto found;
            }
        }

        if (u->resolved->sockaddr) {

            if (u->resolved->port == 0
                && u->resolved->sockaddr->sa_family != AF_UNIX)
            {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "no port in upstream \"%V\"", host);
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }

            if (ngx_stream_upstream_create_round_robin_peer(s, u->resolved)
                != NGX_OK)
            {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }

            ngx_stream_proxy_connect(s);

            return;
        }

        if (u->resolved->port == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "no port in upstream \"%V\"", host);
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        temp.name = *host;

        cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);

        ctx = ngx_resolve_start(cscf->resolver, &temp);
        if (ctx == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        if (ctx == NGX_NO_RESOLVER) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "no resolver defined to resolve %V", host);
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        ctx->name = *host;
        ctx->handler = ngx_stream_proxy_resolve_handler;
        ctx->data = s;
        ctx->timeout = cscf->resolver_timeout;

        u->resolved->ctx = ctx;

        if (ngx_resolve_name(ctx) != NGX_OK) {
            u->resolved->ctx = NULL;
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

found:

    if (uscf == NULL) {
        ngx_log_error(NGX_LOG_ALERT, c->log, 0, "no upstream configuration");
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    u->upstream = uscf;

#if (NGX_STREAM_SSL)
    u->ssl_name = uscf->host;
#endif

    // 負載均衡算法初始化
    if (uscf->peer.init(s, uscf) != NGX_OK) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    // 準備開始連接,設置開始時間,毫秒
    u->peer.start_time = ngx_current_msec;

    // 設置負載均衡的重試次數
    if (pscf->next_upstream_tries
        && u->peer.tries > pscf->next_upstream_tries)
    {
        u->peer.tries = pscf->next_upstream_tries;
    }

    //u->proxy_protocol = pscf->proxy_protocol;

    // 最后啟動連接
    // 使用ngx_peer_connection_t連接上游服務器
    // 連接失敗,需要嘗試下一個上游server
    // 連接成功要調用init初始化上游
    ngx_stream_proxy_connect(s);
}

// 連接上游
// 使用ngx_peer_connection_t連接上游服務器
// 連接失敗,需要嘗試下一個上游server
// 連接成功要調用init初始化上游
static void
ngx_stream_proxy_connect(ngx_stream_session_t *s)
{
    ngx_int_t                     rc;
    ngx_connection_t             *c, *pc;
    ngx_stream_upstream_t        *u;
    ngx_stream_proxy_srv_conf_t  *pscf;

    // 獲取連接對象
    c = s->connection;

    c->log->action = "connecting to upstream";

    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);

    // 連接上游的結構體
    // 里面有如何獲取負載均衡server、上下游buf等
    u = s->upstream;

    u->connected = 0;
    u->proxy_protocol = pscf->proxy_protocol;

    // 何時會執(zhí)行這個?
    if (u->state) {
        u->state->response_time = ngx_current_msec - u->state->response_time;
    }

    // 把一個上游的狀態(tài)添加進會話的數組
    u->state = ngx_array_push(s->upstream_states);
    if (u->state == NULL) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_memzero(u->state, sizeof(ngx_stream_upstream_state_t));

    // 這兩個值置為-1,表示未初始化
    u->state->connect_time = (ngx_msec_t) -1;
    u->state->first_byte_time = (ngx_msec_t) -1;

    // 用來計算響應時間,保存當前的毫秒值
    // 之后連接成功后就會兩者相減
    u->state->response_time = ngx_current_msec;

    // 連接上游
    // 使用ngx_peer_connection_t連接上游服務器
    // 從upstream{}里獲取一個上游server地址
    // 從cycle的連接池獲取一個空閑連接
    // 設置連接的數據收發(fā)接口函數
    // 向epoll添加連接,即同時添加讀寫事件
    // 當與上游服務器有任何數據收發(fā)時都會觸發(fā)epoll
    // socket api調用連接上游服務器
    // 寫事件ready,即可以立即向上游發(fā)送數據
    rc = ngx_event_connect_peer(&u->peer);

    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "proxy connect: %i", rc);

    if (rc == NGX_ERROR) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    u->state->peer = u->peer.name;

    // 所有上游都busy
    if (rc == NGX_BUSY) {
        ngx_log_error(NGX_LOG_ERR, c->log, 0, "no live upstreams");
        ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY);
        return;
    }

    // 連接失敗,需要嘗試下一個上游server
    if (rc == NGX_DECLINED) {
        ngx_stream_proxy_next_upstream(s);
        return;
    }

    /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */

    // 連接“成功”,again/done表示正在連接過程中

    pc = u->peer.connection;

    pc->data = s;
    pc->log = c->log;
    pc->pool = c->pool;
    pc->read->log = c->log;
    pc->write->log = c->log;

    // 連接成功
    // 分配供上游讀取數據的緩沖區(qū)
    // 修改上游讀寫事件,不再測試連接,改為ngx_stream_proxy_upstream_handler
    // 實際是ngx_stream_proxy_process_connection(ev, !ev->write);
    if (rc != NGX_AGAIN) {
        ngx_stream_proxy_init_upstream(s);
        return;
    }

    // again,要再次嘗試連接

    // 設置上游的讀寫事件處理函數是ngx_stream_proxy_connect_handler
    // 第一次連接上游不成功后的handler
    // 當上游連接再次有讀寫事件發(fā)生時測試連接
    // 測試連接是否成功,失敗就再試下一個上游
    // 最后還是要調用init初始化上游
    pc->read->handler = ngx_stream_proxy_connect_handler;
    pc->write->handler = ngx_stream_proxy_connect_handler;

    // 連接上游先寫,所以設置寫事件的超時時間
    ngx_add_timer(pc->write, pscf->connect_timeout);
}
// 分配供上游讀取數據的緩沖區(qū)
// 進入此函數,肯定已經成功連接了上游服務器
// 修改上游讀寫事件,不再測試連接,改為ngx_stream_proxy_upstream_handler
// 實際是ngx_stream_proxy_process_connection(ev, !ev->write);
static void
ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
{
    int                           tcp_nodelay;
    u_char                       *p;
    ngx_chain_t                  *cl;
    ngx_connection_t             *c, *pc;
    ngx_log_handler_pt            handler;
    ngx_stream_upstream_t        *u;
    ngx_stream_core_srv_conf_t   *cscf;
    ngx_stream_proxy_srv_conf_t  *pscf;

    // u保存了上游相關的信息
    u = s->upstream;

    // pc是上游的連接對象
    pc = u->peer.connection;

    cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);

    if (pc->type == SOCK_STREAM
        && cscf->tcp_nodelay
        && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
    {
        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay");

        tcp_nodelay = 1;

        if (setsockopt(pc->fd, IPPROTO_TCP, TCP_NODELAY,
                       (const void *) &tcp_nodelay, sizeof(int)) == -1)
        {
            ngx_connection_error(pc, ngx_socket_errno,
                                 "setsockopt(TCP_NODELAY) failed");
            ngx_stream_proxy_next_upstream(s);
            return;
        }

        pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
    }

    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);

#if (NGX_STREAM_SSL)

    if (pc->type == SOCK_STREAM && pscf->ssl) {

        if (u->proxy_protocol) {
            if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
                return;
            }

            u->proxy_protocol = 0;
        }

        if (pc->ssl == NULL) {
            ngx_stream_proxy_ssl_init_connection(s);
            return;
        }
    }

#endif

    // c是到客戶端即下游的連接對象
    c = s->connection;

    if (c->log->log_level >= NGX_LOG_INFO) {
        ngx_str_t  str;
        u_char     addr[NGX_SOCKADDR_STRLEN];

        str.len = NGX_SOCKADDR_STRLEN;
        str.data = addr;

        if (ngx_connection_local_sockaddr(pc, &str, 1) == NGX_OK) {
            handler = c->log->handler;
            c->log->handler = NULL;

            ngx_log_error(NGX_LOG_INFO, c->log, 0,
                          "%sproxy %V connected to %V",
                          pc->type == SOCK_DGRAM ? "udp " : "",
                          &str, u->peer.name);

            c->log->handler = handler;
        }
    }

    // 計算連接使用的時間,毫秒值
    u->state->connect_time = ngx_current_msec - u->state->response_time;

    if (u->peer.notify) {
        u->peer.notify(&u->peer, u->peer.data,
                       NGX_STREAM_UPSTREAM_NOTIFY_CONNECT);
    }

    c->log->action = "proxying connection";

    // 檢查給上游使用的緩沖區(qū)
    if (u->upstream_buf.start == NULL) {

        // 分配供上游讀取數據的緩沖區(qū)
        p = ngx_pnalloc(c->pool, pscf->buffer_size);
        if (p == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        u->upstream_buf.start = p;
        u->upstream_buf.end = p + pscf->buffer_size;
        u->upstream_buf.pos = p;
        u->upstream_buf.last = p;
    }

    // 此時u里面上下游都有緩沖區(qū)了

    // udp處理
    // if (c->type == SOCK_DGRAM) {
    //     // 使用客戶端連接的buffer計算收到的字節(jié)數
    //     s->received = c->buffer->last - c->buffer->pos;

    //     // nginx 1.11.x刪除了此行代碼??!
    //     // downstream_buf直接就是客戶端連接的buffer
    //     u->downstream_buf = *c->buffer;


    // 客戶端里已經發(fā)來了一些數據
    if (c->buffer && c->buffer->pos < c->buffer->last) {
        ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
                       "stream proxy add preread buffer: %uz",
                       c->buffer->last - c->buffer->pos);

        // 拿一個鏈表節(jié)點
        cl = ngx_chain_get_free_buf(c->pool, &u->free);
        if (cl == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        // 把連接的緩沖區(qū)關聯(lián)到鏈表節(jié)點里
        *cl->buf = *c->buffer;

        cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
        cl->buf->flush = 1;

        // udp特殊處理,直接是最后一塊數據,所以Nginx暫不支持udp會話
        cl->buf->last_buf = (c->type == SOCK_DGRAM);

        // 把數據掛到upstream_out里,要發(fā)給上游
        cl->next = u->upstream_out;
        u->upstream_out = cl;
    }

    if (u->proxy_protocol) {
        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
                       "stream proxy add PROXY protocol header");

        cl = ngx_chain_get_free_buf(c->pool, &u->free);
        if (cl == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
        if (p == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        cl->buf->pos = p;

        p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
        if (p == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        cl->buf->last = p;
        cl->buf->temporary = 1;
        cl->buf->flush = 0;
        cl->buf->last_buf = 0;
        cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;

        cl->next = u->upstream_out;
        u->upstream_out = cl;

        u->proxy_protocol = 0;
    }

    if (c->type == SOCK_DGRAM && pscf->responses == 0) {
        pc->read->ready = 0;
        pc->read->eof = 1;
    }

    // 進入此函數,肯定已經成功連接了上游服務器
    u->connected = 1;

    // 修改上游讀寫事件,不再測試連接,改為ngx_stream_proxy_upstream_handler
    // 實際是ngx_stream_proxy_process_connection(ev, !ev->write);
    pc->read->handler = ngx_stream_proxy_upstream_handler;
    pc->write->handler = ngx_stream_proxy_upstream_handler;

    if (pc->read->ready || pc->read->eof) {
        ngx_post_event(pc->read, &ngx_posted_events);
    }

    // 參數表示上游連接,上游可寫
    ngx_stream_proxy_process(s, 0, 1);
}
// 處理上下游的數據收發(fā)
// from_upstream參數標記是否是上游,使用的是ev->write
// 上游下游的可讀可寫回調函數都調用了該函數
// 下游可寫事件,from_ups =1 表示從上游讀寫到下游
// 下游可讀事件,from_ups =0 表示從下游讀寫到上游
// 上游可寫事件,from_ups =0 表示從下游讀寫到上游
// 上游可讀事件,from_ups =1  表示從上游讀寫到下游
// 這個ev 其實是不一樣的。分表代表了上游和下游 
static void
ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
{
    ngx_connection_t             *c, *pc;
    ngx_stream_session_t         *s;
    ngx_stream_upstream_t        *u;
    ngx_stream_proxy_srv_conf_t  *pscf;

    // 連接、會話、上游
    c = ev->data;
    s = c->data;
    u = s->upstream;

    // c是下游連接,pc是上游連接
    c = s->connection;
    pc = u->peer.connection;

    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);

    // 超時處理,沒有delay則失敗
    if (ev->timedout) {
        ev->timedout = 0;

        if (ev->delayed) {
            ev->delayed = 0;

            if (!ev->ready) {
                if (ngx_handle_read_event(ev, 0) != NGX_OK) {
                    ngx_stream_proxy_finalize(s,
                                              NGX_STREAM_INTERNAL_SERVER_ERROR);
                    return;
                }

                if (u->connected && !c->read->delayed && !pc->read->delayed) {
                    ngx_add_timer(c->write, pscf->timeout);
                }

                return;
            }

        } else {
            if (s->connection->type == SOCK_DGRAM) {
                if (pscf->responses == NGX_MAX_INT32_VALUE) {

                    /*
                     * successfully terminate timed out UDP session
                     * with unspecified number of responses
                     */

                    pc->read->ready = 0;
                    pc->read->eof = 1;

                    ngx_stream_proxy_process(s, 1, 0);
                    return;
                }

                if (u->received == 0) {
                    ngx_stream_proxy_next_upstream(s);
                    return;
                }
            }

            ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
            ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
            return;
        }

    } else if (ev->delayed) {

        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
                       "stream connection delayed");

        if (ngx_handle_read_event(ev, 0) != NGX_OK) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        }

        return;
    }

    if (from_upstream && !u->connected) {
        return;
    }

    // 核心處理函數,處理兩個連接的數據收發(fā)
    ngx_stream_proxy_process(s, from_upstream, ev->write);
}

// 核心處理函數,處理兩個連接的數據收發(fā)
// 可以處理上下游的數據收發(fā)
// 參數標記是否是上游、是否寫數據
// 最終都會調用到這個處理函數
// 無論是上游可讀可寫事件還是下游可讀可寫事件都會調用該函數
// from_ups == 1 可能是上游的可讀事件(從上游讀內容),也可能是下游的可寫事件(寫的內容來自上游)
static void
ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
    ngx_uint_t do_write)
{
    off_t                        *received, limit;
    size_t                        size, limit_rate;
    ssize_t                       n;
    ngx_buf_t                    *b;
    ngx_int_t                     rc;
    ngx_uint_t                    flags;
    ngx_msec_t                    delay;
    ngx_chain_t                  *cl, **ll, **out, **busy;
    ngx_connection_t             *c, *pc, *src, *dst;
    ngx_log_handler_pt            handler;
    ngx_stream_upstream_t        *u;
    ngx_stream_proxy_srv_conf_t  *pscf;

    // u是上游結構體
    u = s->upstream;

    // c是下游的連接
    c = s->connection;

    // pc是上游的連接,如果連接失敗就是nullptr
    pc = u->connected ? u->peer.connection : NULL;

    // nginx處于即將停止狀態(tài),連接是udp
    // 使用連接的log記錄日志
    if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) {

        /* socket is already closed on worker shutdown */

        handler = c->log->handler;
        c->log->handler = NULL;

        ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown");

        c->log->handler = handler;

        ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
        return;
    }

    // 取proxy模塊的配置
    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);

    // 根據上下游狀態(tài)決定來源和目標
    // 以及緩沖區(qū)、限速等
    // 注意使用的緩沖區(qū)指針
    if (from_upstream) {
        // 數據下行
        src = pc;
        dst = c;

        // 緩沖區(qū)是upstream_buf,即上游來的數據
        b = &u->upstream_buf;

        limit_rate = pscf->download_rate;
        received = &u->received;
        out = &u->downstream_out;
        busy = &u->downstream_busy;

    } else {
        // 數據上行
        src = c;
        dst = pc;

        // 緩沖區(qū)是downstream_buf,即下游來的數據
        // 早期downstream_buf直接就是客戶端連接的buffer
        // 現在是一個正常分配的buffer
        b = &u->downstream_buf;

        limit_rate = pscf->upload_rate;
        received = &s->received;
        out = &u->upstream_out;
        busy = &u->upstream_busy;
    }

    // b指向當前需要操作的緩沖區(qū)
    // 死循環(huán)操作,直到出錯或者again
    for ( ;; ) {

        // 如果要求寫,那么把緩沖區(qū)里的數據發(fā)到dst
        if (do_write && dst) {

            // 條件是有數據,且dst連接是可寫的
            if (*out || *busy || dst->buffered) {

                // 調用filter過濾鏈表,過濾數據最后發(fā)出去
                rc = ngx_stream_top_filter(s, *out, from_upstream);

                if (rc == NGX_ERROR) {
                    if (c->type == SOCK_DGRAM && !from_upstream) {
                        ngx_stream_proxy_next_upstream(s);
                        return;
                    }

                    ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
                    return;
                }

                // 調整緩沖區(qū)鏈表,節(jié)約內存使用
                ngx_chain_update_chains(c->pool, &u->free, busy, out,
                                      (ngx_buf_tag_t) &ngx_stream_proxy_module);

                if (*busy == NULL) {
                    b->pos = b->start;
                    b->last = b->start;
                }

                // n = ngx_again,需要等待可寫才能再次發(fā)送
            }
        }

        // size是緩沖區(qū)的剩余可用空間
        size = b->end - b->last;

        // 如果緩沖區(qū)有剩余,且src還可以讀數據
        if (size && src->read->ready && !src->read->delayed
            && !src->read->error)
        {

            // 限速處理
            if (limit_rate) {
                limit = (off_t) limit_rate * (ngx_time() - u->start_sec + 1)
                        - *received;

                if (limit <= 0) {
                    src->read->delayed = 1;
                    delay = (ngx_msec_t) (- limit * 1000 / limit_rate + 1);
                    ngx_add_timer(src->read, delay);
                    break;
                }

                if ((off_t) size > limit) {
                    size = (size_t) limit;
                }
            }

            // 盡量讀滿緩沖區(qū)
            n = src->recv(src, b->last, size);

            // nginx 1.11.x代碼不同,只判斷NGX_AGAIN

            // 如果不可讀,或者已經讀完
            // break結束for循環(huán)
            if (n == NGX_AGAIN) {
                break;
            }

            // 出錯,標記為eof
            if (n == NGX_ERROR) {
                if (c->type == SOCK_DGRAM && u->received == 0) {
                    ngx_stream_proxy_next_upstream(s);
                    return;
                }

                src->read->eof = 1;
                n = 0;
            }

            // nginx 1.11.x代碼不同,判斷n >= 0
            // 讀取了n字節(jié)的數據
            if (n >= 0) {

                // 限速
                if (limit_rate) {
                    delay = (ngx_msec_t) (n * 1000 / limit_rate);

                    if (delay > 0) {
                        src->read->delayed = 1;
                        ngx_add_timer(src->read, delay);
                    }
                }

                if (from_upstream) {
                    if (u->state->first_byte_time == (ngx_msec_t) -1) {
                        u->state->first_byte_time = ngx_current_msec
                                                    - u->state->response_time;
                    }
                }

                // udp處理
                if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
                {
                    src->read->ready = 0;
                    src->read->eof = 1;
                }

                // 找到鏈表末尾
                for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }

                // 把讀到的數據掛到鏈表末尾
                cl = ngx_chain_get_free_buf(c->pool, &u->free);
                if (cl == NULL) {
                    ngx_stream_proxy_finalize(s,
                                              NGX_STREAM_INTERNAL_SERVER_ERROR);
                    return;
                }

                *ll = cl;

                cl->buf->pos = b->last;
                cl->buf->last = b->last + n;
                cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;

                cl->buf->temporary = (n ? 1 : 0);
                cl->buf->last_buf = src->read->eof;
                cl->buf->flush = 1;

                // 增加接收的數據字節(jié)數
                *received += n;

                // 緩沖區(qū)的末尾指針移動,表示收到了n字節(jié)新數據
                b->last += n;

                // 有數據,那么就可以繼續(xù)向dst發(fā)送
                do_write = 1;

                // 回到for循環(huán)開頭,繼續(xù)發(fā)送數據
                continue;
            }
        }   // 讀數據部分結束

        break;
    }   // for循環(huán)結束

    // 這時應該是src已經讀完,數據也發(fā)送完
    // 讀取出錯也會有eof標志
    if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
        handler = c->log->handler;
        c->log->handler = NULL;

        ngx_log_error(NGX_LOG_INFO, c->log, 0,
                      "%s%s disconnected"
                      ", bytes from/to client:%O/%O"
                      ", bytes from/to upstream:%O/%O",
                      src->type == SOCK_DGRAM ? "udp " : "",
                      from_upstream ? "upstream" : "client",
                      s->received, c->sent, u->received, pc ? pc->sent : 0);

        c->log->handler = handler;

        // 在這里記錄訪問日志
        ngx_stream_proxy_finalize(s, NGX_STREAM_OK);

        return;
    }

    // 如果eof就要關閉讀事件
    flags = src->read->eof ? NGX_CLOSE_EVENT : 0;

    if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) {
        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
        return;
    }

    if (dst) {
        if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }

        if (!c->read->delayed && !pc->read->delayed) {
            ngx_add_timer(c->write, pscf->timeout);

        } else if (c->write->timer_set) {
            ngx_del_timer(c->write);
        }
    }
}


?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容