EMQX源碼閱讀(三)

本次主要走一下客戶端創(chuàng)建連接和接收數(shù)據(jù)的流程。

接上篇,創(chuàng)建Socket成功后,回調(diào)函數(shù):emqx_connection, start_link, [Options -- SockOpts]。

emqx_connection.erl:
本模塊為一個gen_server模塊,所以它會給每一個客戶端啟動一個進程,并在初始化時,從acceptor接管Socket套接字。
init callback:

init(Parent, Transport, RawSocket, Options) ->
    case Transport:wait(RawSocket) of
        {ok, Socket} ->
            run_loop(Parent, init_state(Transport, Socket, Options));
        {error, Reason} ->
            ok = Transport:fast_close(RawSocket),
            exit_on_sock_error(Reason)
    end.

這里重點函數(shù)有兩個:init_state/3和run_loop/2
init_state,顧名思義,是將進程state中的數(shù)據(jù)或?qū)ο蟪跏蓟渲兄饕刑捉幼中畔?、Frame、Parse、Channel等相關(guān)幫助模塊的初始化、GC初始化等等。
run_loop,處理Socket數(shù)據(jù)的輪詢函數(shù):

  1. 通過activate_socket,設(shè)置允許接收數(shù)據(jù)包的個數(shù),有效的控制接收流量
  2. 調(diào)用hibernate/2
hibernate(Parent, State) ->
    proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).

這里重點查了一下proc_lib:hibeernate的用法,類似erlang:hibernate,意思是,使調(diào)用進程處于等待狀態(tài),當(dāng)有數(shù)據(jù)接收時,喚醒并調(diào)用MFA。

  1. wakeup_from_hib/2
    其中是一個receive的流程,處理收到的數(shù)據(jù):
receive
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, State);
        Msg ->
            process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
    after
        IdleTimeout ->
            hibernate(Parent, cancel_stats_timer(State))
    end.
  1. process_msg:
    主要處理分包,解MQTT協(xié)議包。并將完整的解析后數(shù)據(jù),交給channel處理。
%%--------------------------------------------------------------------
%% Handle incoming packet

handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
    ok = inc_incoming_stats(Packet),
    ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
    with_channel(handle_in, [Packet], State);

handle_incoming(FrameError, State) ->
    with_channel(handle_in, [FrameError], State).

%%--------------------------------------------------------------------
%% With Channel

with_channel(Fun, Args, State = #state{channel = Channel}) ->
    case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
        ok -> {ok, State};
        {ok, NChannel} ->
            {ok, State#state{channel = NChannel}};
        {ok, Replies, NChannel} ->
            {ok, next_msgs(Replies), State#state{channel = NChannel}};
        {shutdown, Reason, NChannel} ->
            shutdown(Reason, State#state{channel = NChannel});
        {shutdown, Reason, Packet, NChannel} ->
            NState = State#state{channel = NChannel},
            ok = handle_outgoing(Packet, NState),
            shutdown(Reason, NState)
    end.

emqx_channel.erl & emqx_session.erl:
為什么將這兩個放一起說呢,是因為他們倆是配合做事的。
主要是處理MQTT的各種協(xié)議包了:CONNECT,SUBSCRIBE,PUBLISH,UNSUB,DISCONN等等。有興趣的同學(xué)可以深入進去,看看每一個協(xié)議包的處理流程,本次就不再贅述了。

截取connect的流程性代碼片段:
handle_in:

handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
    case pipeline([fun enrich_conninfo/2,
                   fun check_connect/2,
                   fun enrich_client/2,
                   fun set_logger_meta/2,
                   fun check_banned/2,
                   fun auth_connect/2
                  ], ConnPkt, Channel#channel{conn_state = connecting}) of
        {ok, NConnPkt, NChannel} ->
            process_connect(NConnPkt, NChannel);
        {error, ReasonCode, NChannel} ->
            handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
    end;

process_connect:

%%--------------------------------------------------------------------
%% Process Connect
%%--------------------------------------------------------------------

process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
                Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
    case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
        {ok, #{session := Session, present := false}} ->
            NChannel = Channel#channel{session = Session},
            handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
        {ok, #{session := Session, present := true, pendings := Pendings}} ->
            %%TODO: improve later.
            Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
            NChannel = Channel#channel{session  = Session,
                                       resuming = true,
                                       pendings = Pendings1
                                      },
            handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
        {error, client_id_unavailable} ->
            handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
        {error, Reason} ->
            ?LOG(error, "Failed to open session due to ~p", [Reason]),
            handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
    end.

可以簡單看出來:

  1. 收到connect請求后,會嘗試建立session數(shù)據(jù)
  2. connect的返回結(jié)果,是調(diào)用handle_out
-spec(handle_out(atom(), term(), channel())
      -> {ok, channel()}
       | {ok, replies(), channel()}
       | {shutdown, Reason :: term(), channel()}
       | {shutdown, Reason :: term(), replies(), channel()}).
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
    AckProps = run_fold([fun enrich_connack_caps/2,
                         fun enrich_server_keepalive/2,
                         fun enrich_assigned_clientid/2
                        ], #{}, Channel),
    AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
    return_connack(AckPacket,
                   ensure_keepalive(AckProps,
                                    ensure_connected(ConnPkt, Channel)));

handle_out(connack, {ReasonCode, _ConnPkt},
           Channel = #channel{conninfo   = ConnInfo,
                              clientinfo = ClientInfo}) ->
    ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
    AckPacket = ?CONNACK_PACKET(
                   case maps:get(proto_ver, ConnInfo) of
                       ?MQTT_PROTO_V5 -> ReasonCode;
                       _Other -> emqx_reason_codes:compat(connack, ReasonCode)
                   end),
    shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容