一起來(lái)寫web server 05 -- 多線程進(jìn)階版本


這個(gè)版本的web server比第4版稍微做了一點(diǎn)改進(jìn),那就是由主線程統(tǒng)一接收連接,然后連接的處理由子線程來(lái)完成.因此,這里就引入了條件變量以及同步互斥的問(wèn)題.

同步機(jī)制

muduo庫(kù)中有一個(gè)關(guān)于同步機(jī)制的封裝,我這里就直接采用了.我這里來(lái)介紹一下這個(gè)封裝吧.

下面是Conditon這個(gè)類的代碼:

class Condition : noncopyable
{
    private:
        MutexLock& mutex_; /* 之前的鎖的一個(gè)引用 */
        pthread_cond_t pcond_; /* 系統(tǒng)定義的條件變量的類型 */
        ... ...
}

這個(gè)類的構(gòu)造函數(shù)用于初始化同步變量:

explicit Condition(MutexLock& mutex)
        : mutex_(mutex)
    {
        pthread_cond_init(&pcond_, NULL); /* 初始化同步變量 */
    }

析構(gòu)函數(shù)就銷毀掉同步變量:

~Condition()
    {
        pthread_cond_destroy(&pcond_); /* 銷毀條件變量 */
    }

等待某個(gè)條件:

void wait()
    {
        MutexLock::UnassignGuard ug(mutex_);
        pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); /* 等待Mutex */
    }

通知單個(gè)線程:

void notify()
    {
        pthread_cond_signal(&pcond_); /* 喚醒一個(gè)線程 */
    }

條件變量只有一種正確的使用方式,幾乎不可能用錯(cuò),對(duì)于wait端:

  1. 必須與mutex一起使用,該布爾表達(dá)式的讀寫需受此mutex保護(hù).
  2. mutex已經(jīng)上鎖的時(shí)候才能調(diào)用wait().
  3. 把判斷布爾條件和wait()放到while循環(huán)中.
    寫成代碼是這個(gè)樣子的:
MutexLock mutex;
Condition cond(mutex);
std::deque<int> queue;

int dequeue() {
    MutexLockGuard lock(mutex); /* 加鎖 */
    while (queue.empty()) {
        cond.wait(); 
    }
    assert(!queue.empty());
    int top = queue.front();
    queue.pop_front();
    return top;
}

對(duì)于sinal/broadcast端:

  1. 不一定要在mutex已經(jīng)上鎖的情況下調(diào)用signal(理論上).
  2. signal之前一般要修改布爾表達(dá)式.
  3. 修改布爾表達(dá)式通常要用mutex保護(hù).
  4. 注意區(qū)分signalbroadcast:"broadcast"通常用于表明狀態(tài)變化,而signal表示資源可用.
    寫成代碼是:
void enqueue(int x) 
{
    MutexLockGuard lock(mutex); // 加鎖
    queue.push_back(x);
    cond.signal(); // 可以移出臨界區(qū)之外
}

以上引自linux多線程服務(wù)端編程.

我來(lái)談一下我的理解:

cond中之所以需要mutex,是因?yàn)樵趫?zhí)行到

while (condition) {
 cond.wait();
}

時(shí),需要將cond中持有的mutex解鎖.一旦接收到signal,它需要重新?lián)寠Z這個(gè)mutex,搶到了,才能從wait函數(shù)中返回.

為什么cond.wait()要放入while循環(huán)中呢?一方面是因?yàn)?code>spurious wakeup,之所以會(huì)有這個(gè)東西,是速度的考量,一般來(lái)說(shuō),即使沒有spurious wakeup,你也要這么寫代碼,舉個(gè)栗子.

在生產(chǎn)者消費(fèi)者模型之中,消費(fèi)者1獲得鎖,發(fā)現(xiàn)queue為空,wait,消費(fèi)者2獲得鎖,發(fā)現(xiàn)queue為空,wait,生產(chǎn)者3獲得鎖,將生產(chǎn)的產(chǎn)品放入queue,調(diào)用signal,并且釋放了mutex,t1,t2被喚醒,可以預(yù)見的是,這兩者只會(huì)有一個(gè)獲得鎖,消費(fèi)完這個(gè)產(chǎn)品,然后另一個(gè)獲得鎖,發(fā)現(xiàn)為空,還是得繼續(xù)等待,這就是while的由來(lái),當(dāng)然,至于signal為什么會(huì)喚醒多個(gè)線程,man手冊(cè)上就是這么說(shuō)的.

我們的代碼

```cpp
/*-
* 線程池的加強(qiáng)版本.主要是主線程統(tǒng)一接收連接,其余都是工作者線程,這里的布局非常類似于一個(gè)生產(chǎn)者.
* 多個(gè)消費(fèi)者.
*/

#define MAXNCLI 100

MutexLock mutex; /* 全局的鎖 */
Condition cond(mutex); /* 全局的條件變量 */
int clifd[MAXNCLI], iget, iput;

int main(int argc, char *argv[])
{
    int listenfd = Open_listenfd(8080); /* 8080號(hào)端口監(jiān)聽 */
    signal(SIGPIPE, SIG_IGN);
    pthread_t tids[10];
    void* thread_main(void *);

    for (int i = 0; i < 10; ++i) {
        int *arg = (int *)Malloc(sizeof(int));
        *arg = i;
        Pthread_create(&tids[i], NULL, thread_main, (void *)arg);
    }
    struct sockaddr cliaddr; /* 用于存儲(chǔ)對(duì)方的ip信息 */
    socklen_t clilen;
    for (; ; ) {
        int connfd = Accept(listenfd, &cliaddr, &clilen);
        {
            MutexLockGuard lock(mutex); /* 加鎖 */
            clifd[iput] = connfd; /* 涉及到對(duì)共享變量的修改,要加鎖 */
            if (++iput == MAXNCLI) iput = 0;
            if (iput == iget) unix_error("clifd is not big enough!\n");
        }
        cond.notify(); /* 通知一個(gè)線程有數(shù)據(jù)啦! */
    }
    return 0;
}

線程的代碼是這樣的:

void*
thread_main(void *arg)
{
    int connfd;
    printf("thread %d starting\n", *(int *)arg);
    Free(arg);
    for ( ; ;) {
        {
            MutexLockGuard lock(mutex); /* 加鎖 */
            while (iget == iput) { /* 沒有新的連接到來(lái) */
                /*-
                * 代碼必須用while循環(huán)來(lái)等待條件變量,原因是spurious wakeup
                */
                cond.wait(); /* 這一步會(huì)原子地unlock mutex并進(jìn)入等待,wait執(zhí)行完畢會(huì)自動(dòng)重新加鎖 */
            }
            connfd = clifd[iget]; /* 獲得連接套接字 */
            if (++iget == MAXNCLI) iget = 0;
        }
        doit(connfd);
        close(connfd);
    }
}

總結(jié)

這個(gè)版本在原來(lái)的版本上增加了同步互斥操作,在某種程度上增加了難度.

具體代碼還是看這里吧!:https://github.com/lishuhuakai/Spweb

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容