Live555源碼解析(3) - 服務(wù)開啟,愿者上鉤

上一篇Live555源碼解析(2) - RTSP協(xié)議概述對RTSP進(jìn)行了整體介紹,對會話交互過程及通常應(yīng)用場景做了示例。接下來,我們就從媒體服務(wù)器的本職工作服務(wù)開始談起。

1. 從服務(wù)器說起

要服務(wù),就必須有服務(wù)器,有開放給外界客戶端訪問的地址和端口。先放源碼:

TaskScheduler* scheduler = BasicTaskScheduler::createNew();
UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);

UserAuthenticationDatabase* authDB = NULL;

portNumBits rtspServerPortNum = 554;
//@1.1 建立RTSP服務(wù)器
RTSPServer* rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
if(rtspServer == NULL)
{
    rtspServerPortNum = 8554;
    rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
}
if(rtspServer == NULL)
{
    *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
    exit(1);
}

@1.1 建立RTSP服務(wù)器

createNew()函數(shù)有三個參數(shù),分別說明如下:

  • *env
    env對應(yīng)的UsageEnvironment、BasicUsageEnvironment主要為后臺運行機(jī)制,提供基礎(chǔ)支持。具體已在本系列Live555源碼解析(1) - Main 尋根問祖,留其筋骨中詳細(xì)說明,這里不再贅述。

  • rtspServerPortNum
    RTSP默認(rèn)知名端口為554,如被占用,則使用8554。如仍不能成功創(chuàng)建,則提示錯誤后退出程序。

  • authDB
    authDB對應(yīng)UserAuthenticationDatabase,默認(rèn)情況下不會定義ACCESS_CONTROL,即關(guān)閉認(rèn)證機(jī)制。因此此處不進(jìn)行分析,后續(xù)如有需求,可專門整理下互聯(lián)網(wǎng)認(rèn)證機(jī)制概況。

本句代碼中出現(xiàn)了兩個class,分別為RTSPServerDynamicRTSPServer。第一眼看應(yīng)該是繼承關(guān)系,確實也是繼承關(guān)系,只不過希望先留個更清晰的印象。

RTSPServer Class Hierarchy

2. DynamicRTSPServer::createNew()

DynamicRTSPServer* DynamicRTSPServer::createNew(UsageEnvironment& env, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds)
{
    //@2.1 創(chuàng)建Socket
    int ourSocket = setUpOurSocket(env, ourPort);
    if (ourSocket == -1) return NULL;

    //@2.2 DynamicRTSPServer構(gòu)造函數(shù)
    return new DynamicRTSPServer(env, ourSocket, ourPort, authDatabase,     reclamationTestSeconds);
}

@2.1 創(chuàng)建Socket

由于RTSPServer、RTSPServerSupportingHTTPStreaming、DynamicRTSPServer均未實現(xiàn)setUpOurSocket()函數(shù),因此這里實際調(diào)用的是父類GenericMediaServer中的setUpOurSocket()

int GenericMediaServer::setUpOurSocket(UsageEnvironment& env, Port& ourPort)
{
    int ourSocket = -1;
    do
    {
        //如果當(dāng)前Socket已被本地服務(wù)器占用,則不允許重復(fù)使用
        NoReuse dummy(env);
        
        //@2.1.1 創(chuàng)建TCPSocket
        ourSocket = setupStreamSocket(env, ourPort);
        if(ourSocket < 0) break;
            
        //@2.1.2 調(diào)整Socket發(fā)送Buffer大小
        if(!increaseSendBufferTo(env, ourSocket, 50*1024)) break;

        //@2.1.3 切換Socket模式為LISTEN
        if(listen(ourSocket, LISTEN_BACKLOG_SIZE) < 0)
        {
            env.setResultErrMsg("listen() failed: ");
            break;
        }
            
        //@2.1.4 校驗port值
        if(ourPort.num() == 0)
            if(!getSourcePort(env, ourSocket, ourPort)) break;
            return ourSocket;
    } while(0);
        
    //@2.1.5 異常情況,清場退出
    if(ourSocket != -1) ::closeSocket(ourSocket);
    return -1;
}

@2.1.1 創(chuàng)建TCPSocket

GroupsockHelper中定義了Socket的一些全局幫助函數(shù),使用時inlcude "GroupsockHelper.hh"即可,如此處的setupStreamSocket。

int setupStreamSocket(UsageEnvironment& env, Port port, Boolean makeNonBlocking)
{
    //@2.1.1.1 winsock使用前初始化
    if(!initializeWinsockIfNecessary())
    {
        socketErr(env, "Failed to initialize 'winsock': ");
        return -1;
    }

    //@2.1.1.2 創(chuàng)建IPv4 Socket
    int newSocket = createSocket(SOCK_STREAM);
    if(newSocket < 0)
    {
        socketErr(env, "unable to create stream socket: ");
        return newSocket;
    }

    //@2.1.1.3 SO_REUSEADDR
    int reuseFlag = groupsockPriv(env)->reuseFlag;
    reclaimGroupsockPriv(env);
    if(setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof  reuseFlag) < 0)
    {
        socketErr(env, "setsockopt(SO_REUSEADDR) error: ");
        closeSocket(newSocket);
        return -1;
    }

    //@2.1.1.4 SO_REUSEPORT
    if(setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuseFlag, sizeof reuseFlag) < 0)
    {
        socketErr(env, "setsockopt(SO_REUSEPORT) error: ");
        closeSocket(newSocket);
        return -1;
    }

    // 如已制定Port或Addr,則需顯式bind()至創(chuàng)建的socket
    if(port.num() != 0 || ReceivingInterfaceAddr != INADDR_ANY)
    {
        MAKE_SOCKADDR_IN(name, ReceivingInterfaceAddr, port.num());
        if(bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0)
        {
            char tmpBuffer[100];
            sprintf(tmpBuffer, "bind() error(port number: %d): ", ntohs(port.num());
            socketErr(env, tmpBuffer);
            closeSocket(newSocket);
            return -1;
        }
    }

    //@2.1.1.5 non-blocking
    if(makeNonBlocking)
    {
        if(!makeSocketNonBlocking(newSocket))
        {
            socketErr(env, "failed to make non-blocking: ");
            closeSocket(newSocket);
            return -1;
        }
    }
    return newSocket;
}

@2.1.1.1 winsock使用前初始化

winsock使用前必須調(diào)用初始化命令,通過WSAStartup()設(shè)置Socket最高版本。這里嘗試2.2和1.1版本,成功即可。
源碼位于groupsock/inet.c中。

@2.1.1.2 創(chuàng)建IPv4 Socket

調(diào)用socket()函數(shù)創(chuàng)建指定類型的IPv4 Socket,此處為SOCK_STREAM即TCP Socket。

sock = socket(AF_INET, type|SOCK_CLOEXEC, 0);

注意到此處嘗試設(shè)置SOCK_CLOEXEC屬性。

@2.1.1.3 SO_REUSEADDR

設(shè)置SO_REUSEADDR Flag,TCP的最主要目標(biāo)是保證數(shù)據(jù)傳輸?shù)目煽啃?,?dāng)TCP連接中出現(xiàn)分組丟失時,丟失的分組在2RTT后未收到ACK則會重傳以保證數(shù)據(jù)可靠。完成連接任務(wù)后,TCP開始進(jìn)行四路斷開,不幸的是,此時原迷路(被路由到其他網(wǎng)絡(luò))的分組又被路由回來了。那么TCP應(yīng)該如何對待這個分組?

如果不允許重用地址,也就是說不設(shè)置SO_REUSEADDR標(biāo)志,那么TCP連接將保持處于TIME_WAIT狀態(tài)2MSL時間以避免前述分組重復(fù)情況。2MSL(通常為30~120秒)內(nèi),所有重復(fù)分組直接被丟棄。如重復(fù)分組在2MSL時長內(nèi)仍未到達(dá),則將自然消亡,因此連接安全退出。

看起來,這是一種安全機(jī)制,保證了重復(fù)分組可以得到正確處理,但實踐中發(fā)生上述情況的概率很小,而如果均采取上述操作會引入新的麻煩。比如服務(wù)器所在主機(jī)異常重啟,服務(wù)進(jìn)程被動重啟,此時如不允許地址重用,則需等待2MSL,也就是說30~120秒,這個時長內(nèi),無法提供正常服務(wù)。這個代價是沒有必要的。

因此,不管是BSD socket還是Windows Winsock,通常都會設(shè)置SO_REUSEADDR標(biāo)志,當(dāng)然必須在bind()前設(shè)置才能生效。

@2.1.1.4 SO_REUSEPORT

SO_REUSEPORT支持多個進(jìn)程或者線程綁定到同一端口,以提高服務(wù)器程序的性能,它解決了如下問題:

  • 允許多個套接字bind()/listen() 同一個TCP/UDP端口
  • 每一個線程擁有自己的服務(wù)器套接字
  • 在服務(wù)器套接字上沒有了鎖的競爭
  • 內(nèi)核層面實現(xiàn)負(fù)載均衡
  • 安全層面,監(jiān)聽同一個端口的套接字只能位于同一個用戶下面

@2.1.1.5 non-blocking

默認(rèn)情況下,TCP socket是阻塞(blocking)模式的,例如當(dāng)你調(diào)用recv()來讀取流數(shù)據(jù)時,該函數(shù)會保持阻塞,直到至少有一個字節(jié)數(shù)據(jù)可讀或出現(xiàn)異常錯誤才會返回。而非阻塞(non-blocking)模式時,操作不必等到有結(jié)果才返回,而是以異步的形式在滿足條件后返回,這對于處理多個socket時非常有效。

所以至此,如成功從setupStreamSocket()返回,那么我們得到的一定是一個具有CLOEXEC、REUSEADDR、REUSEPORT且non-blocking的Socket。

@2.1.2 調(diào)整Socket發(fā)送Buffer大小

為了確保Socket發(fā)送Buffer足夠大,這里設(shè)置為50KB。同樣內(nèi)部是通過GroupsockHelper提供的幫助函數(shù)完成的。這里先說明下為什么要有緩存的存在。

眾所周知,TCP的賣點在可靠性,可靠不意味著不出錯,否則IP協(xié)議就足夠傳輸了??煽恐傅氖侨蒎e的能力很強(qiáng),既然要容錯,就必須有備份,否則無法支持重傳、滑動窗口、擁塞控制等機(jī)制運行。既然有備份,那就必須有緩存,也就是說buffer的存在。

事實上,每個Socket都擁有自己的Receive Buffer和Send Buffer,對應(yīng)Read和Write操作。順便說明一下,Read/Write返回僅僅是指和Buffer的交互過程完成,比如Write所有數(shù)據(jù)到Send Buffer后即可返回成功,而從Send Buffer到對端Receive Buffer,則需要由TCP協(xié)議內(nèi)部眾多機(jī)制參與完成。

@2.1.3 切換Socket模式為LISTEN

socket創(chuàng)建后默認(rèn)為主動狀態(tài),而listen()用于將socket切換為被動監(jiān)聽狀態(tài)。所謂被動監(jiān)聽,是指當(dāng)無客戶端請求時,socket處于睡眠狀態(tài),直到出現(xiàn)請求,才會被喚醒來響應(yīng)請求。

需要注意的是第二個參數(shù)LISTEN_BACKLOG_SIZE,它代表著內(nèi)核應(yīng)該為相應(yīng)socket排隊的最大連接個數(shù)。通常內(nèi)核為每個socket維護(hù)兩個隊列:

  • 未完成連接隊列
  • 已完成連接隊列


此處圖片及內(nèi)容參考或截取自《Unix網(wǎng)絡(luò)編程卷1》p107,這是本好書,建議閱讀。

@2.1.4 校驗port值

如未指定port值,則有內(nèi)核確認(rèn)port端口,此時應(yīng)顯式獲取,更新變量ourPort以便后續(xù)使用。自此,第一部分setUpOurSocket操作完成,收獲的是端口號為554或8554,具有CLOEXEC、REUSEADDR、REUSEPORT且non-blocking標(biāo)志,發(fā)送緩存為50KB,已切換至Listen狀態(tài)的Socket。

@2.2 DynamicRTSPServer構(gòu)造函數(shù)

有了靜態(tài)的Socket后,就有了對外提供服務(wù)的渠道或窗口,但此時服務(wù)本身并未建設(shè)完成,因此第二部分的工作就是鋪設(shè)基礎(chǔ)服務(wù)。

DynamicRTSPServer::DynamicRTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,
        UserAuthenticationDatabase* authDatabase, unsigned reclamtionTestSeconds)
    //@2.2.1 RTSPServerSupportingHTTPStreaming構(gòu)造函數(shù)
    : RTSPServerSupportingHTTPStreaming(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds)
{}

第一小節(jié)中,已經(jīng)給出了DynamicRTSPServerRTSPServer等類間繼承關(guān)系圖,結(jié)合這段代碼,也可以看出除了調(diào)用基類RTSPServerSupportingHTTPStreaming構(gòu)造函數(shù)外,并無其他內(nèi)容。

@2.2.1 RTSPServerSupportingHTTPStreaming構(gòu)造函數(shù)

類似地,除調(diào)用基類構(gòu)造函數(shù)外,無其他初始化內(nèi)容。

RTSPServerSupportingHTTPStreaming::RTSPServerSupportingHTTPStreaming(UsageEnvironment& env,                 int ourSocket, Port rtspPort, UserAuthenticationDatabase* authDatabase, 
        unsigned reclamationTestSeconds)
//@2.2.1.1 RTSPServer構(gòu)造函數(shù)
: RTSPServer(env, ourSocket, rtspPort, authDatabase, reclamationTestSeconds) 
{}

@2.2.1.1 RTSPServer構(gòu)造函數(shù)

RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,  
            UserAuthenticationDatabase* authDatabase,
            unsigned reclamationSeconds)
    //@2.2.1.1.1 GenericMediaServer構(gòu)造函數(shù)
    : GenericMediaServer(env, ourSocket, ourPort, reclamationSeconds)
    , fHTTPServerSocket(-1), fHTTPServerPort(0)
    , fAllowStreamingRTPOverTCP(True)
    , fClientConnectionsForHTTPTunneling(NULL) // will get created if needed
    , fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS))
    , fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS))
    , fRegisterOrDeregisterRequestCounter(0)
    //@2.2.1.1.2 認(rèn)證機(jī)制
    , fAuthDB(authDatabase)
{
}
@2.2.1.1.1 GenericMediaServer構(gòu)造函數(shù)
GenericMediaServer::GenericMediaServer(UsageEnvironment& env, int ourSocket,                                        Port ourPort, unsigned reclamationSeconds)
    //@2.2.1.1.1.1 Medium構(gòu)造函數(shù)
    : Medium(env)
    , fServerSocket(ourSocket), fServerPort(ourPort) 
    , fReclamationSeconds(reclamationSeconds)
    //@2.2.1.1.1.2 HASH Tables
    , fServerMediaSessions(HashTable::create(STRING_HASH_KEYS))
    , fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS))
    , fClientSessions(HashTable::create(STRING_HASH_KEYS))
{
    //@2.2.1.1.1.3 ignore SIGPIPE信號
    ignoreSigPipeOnSocket(fServerSocket); 
    //@2.2.1.1.1.4 鋪設(shè)服務(wù)-incomingConnectionHandler
    env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket,     
                                incomingConnectionHandler, this); 
}
@2.2.1.1.1.1 Medium構(gòu)造函數(shù)
Medium::Medium(UsageEnvironment& env)
    :fEnviron(env), fNextTask(NULL) 
{
    MediaLookupTable::ourMedia(env)->generateNewName(fMediumName, mediumNameMaxLen);
    env.setResultMsg(fMediumName);
    MediaLookupTable::ourMedia(env)->addNew(this, fMediumName);
}

根父類,是Media Server中大多數(shù)類的父類。MediaLookupTable是一個類,主要用于方便根據(jù)名稱進(jìn)行Medium的查找,內(nèi)部使用,不進(jìn)行詳細(xì)說明。

@2.2.1.1.1.2 HASH Tables

顧名思義,三張表分別保存著服務(wù)器媒體會話、客戶端連接以及客戶端會話。各表內(nèi)容變動API分別為:

  • ServerMediaSessions

    • addServerMediaSession()
    • removeServerMediaSession()
  • ClientConnections

    • ClientConnection()構(gòu)造函數(shù)
    • ~ClientConnection()析構(gòu)函數(shù)
  • ClientSessions

    • createNewClientSessionWithId()
    • closeAllClientSessionsForServerMediaSession()
@2.2.1.1.1.3 ignore SIGPIPE信號

先說明下SIGPIPE信號如何產(chǎn)生:當(dāng)socket對端已調(diào)用close進(jìn)行了完全關(guān)閉時,本地Write發(fā)出的保溫會導(dǎo)致對端回復(fù)RST報文,假如本地再次調(diào)用Write,則會生成SIGPIPE信號,導(dǎo)致進(jìn)程退出。
為了避免進(jìn)程退出,既可以捕獲SIGPIPE信號對其進(jìn)行處理,也可以簡單忽略。這樣第二次調(diào)用Write時會返回-1,且錯誤值設(shè)為SIGPIPE,以便程序知道對端已經(jīng)關(guān)閉。

@2.2.1.1.1.4 鋪設(shè)服務(wù)-incomingConnectionHandler

實際調(diào)用UsageEnvironment中TaskScheduler類的成員函數(shù)。

void turnOnBackgroundReadHandling(int socketNum,
                BackgroundHandlerProc* handlerProc, void* clientData) 
{
    setBackgroundHandling(socketNum, SOCKET_READABLE, handlerProc, clientData);
}

該函數(shù)又在BasicTaskScheduler類中實現(xiàn)。

void BasicTaskScheduler::setBackgroundHandling(int socketNum,
    int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData)
{
    if(socketNum < 0) return;
    
    FD_CLR((unsigned)socketNum, &fReadSet);
    FD_CLR((unsigned)socketNum, &fWriteSet);
    FD_CLR((unsigned)socketNum, &fExceptionSet);
    if(conditionSet == 0)
    {
        fHandlers->clearHandler(socketNum);
        if(socketNum+1 == fMaxNumSockets)
            --fMaxNumSockets;
    }
    else
    {
        fHandlers->assignHandler(socketNum, conditionSet, handlerProc, clientData);
        if(socketNum+1 > fMaxNumSockets)
            fMaxNumSockets = socketNum + 1;
        if(conditionSet&SOCKET_READABLE) FD_SET((unsigned)socketNum, &fReadSet);
        if(conditionSet&SOCKET_WRITABLE) FD_SET((unsigned)socketNum, &fWriteSet);
        if(conditionSet&SOCKET_EXCEPTION) FD_SET((unsigned)socketNum, &fExceptionSet);
    }                           
}

如果你還能記得Live555源碼解析(1) - Main 尋根問祖,留其筋骨提到main()函數(shù)中的Loop SingleStep(),那么ReadSet、WriteSet、ExceptionSet一定還有印象。事實上,這三個Set用于多Sockets Select。

還需要注意的一句是assignHandler(),作用對象為fHandlers,該對象類型為HandlerSet,同樣在Live555源碼解析(1) - Main 尋根問祖,留其筋骨中也有詳細(xì)說明,此處不再贅述。

只需要記住,Set和fHandlers兩個變量配合的結(jié)果是為了當(dāng)socket滿足指定條件時,調(diào)用設(shè)定的處理函數(shù)。對應(yīng)到此處,是指當(dāng)Socket可讀時,調(diào)用incomingConnectionHandler函數(shù)。完成判斷、調(diào)用邏輯的實現(xiàn),就在BasicTaskScheduler::SingleStep()中Socket I/O循環(huán)部分。

@2.2.1.1.2 認(rèn)證機(jī)制

雖然本篇并不討論認(rèn)證機(jī)制,但仍然需要注意的是,認(rèn)證機(jī)制在這一層完成。

3. 總結(jié)

從前面兩節(jié)的代碼分析來看,暫時只鋪設(shè)了一項服務(wù)incomingConnectionHandler,同時準(zhǔn)備了一些哈希表以供程序運行中使用,分別為ServerMediaSessions、ClientConnections、ClientSessions,也在@2.2.1.1.1.2 HASH Tables小節(jié)中列出了會修改表內(nèi)容的API。這就是能從DynamicRTSPServer create所發(fā)掘的所有動線。

下一篇中,將基于這項服務(wù),尋找服務(wù)的使用動機(jī)、時機(jī),以及所引起的其他連鎖反應(yīng)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容