這個(gè)版本的web server比第4版稍微做了一點(diǎn)改進(jìn),那就是由主線程統(tǒng)一接收連接,然后連接的處理由子線程來(lái)完成.因此,這里就引入了條件變量以及同步互斥的問(wèn)題.
同步機(jī)制
muduo庫(kù)中有一個(gè)關(guān)于同步機(jī)制的封裝,我這里就直接采用了.我這里來(lái)介紹一下這個(gè)封裝吧.
下面是Conditon這個(gè)類的代碼:
class Condition : noncopyable
{
private:
MutexLock& mutex_; /* 之前的鎖的一個(gè)引用 */
pthread_cond_t pcond_; /* 系統(tǒng)定義的條件變量的類型 */
... ...
}
這個(gè)類的構(gòu)造函數(shù)用于初始化同步變量:
explicit Condition(MutexLock& mutex)
: mutex_(mutex)
{
pthread_cond_init(&pcond_, NULL); /* 初始化同步變量 */
}
析構(gòu)函數(shù)就銷毀掉同步變量:
~Condition()
{
pthread_cond_destroy(&pcond_); /* 銷毀條件變量 */
}
等待某個(gè)條件:
void wait()
{
MutexLock::UnassignGuard ug(mutex_);
pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); /* 等待Mutex */
}
通知單個(gè)線程:
void notify()
{
pthread_cond_signal(&pcond_); /* 喚醒一個(gè)線程 */
}
條件變量只有一種正確的使用方式,幾乎不可能用錯(cuò),對(duì)于
wait端:
- 必須與
mutex一起使用,該布爾表達(dá)式的讀寫需受此mutex保護(hù). - 在
mutex已經(jīng)上鎖的時(shí)候才能調(diào)用wait(). - 把判斷布爾條件和
wait()放到while循環(huán)中.
寫成代碼是這個(gè)樣子的:
MutexLock mutex;
Condition cond(mutex);
std::deque<int> queue;
int dequeue() {
MutexLockGuard lock(mutex); /* 加鎖 */
while (queue.empty()) {
cond.wait();
}
assert(!queue.empty());
int top = queue.front();
queue.pop_front();
return top;
}
對(duì)于
sinal/broadcast端:
- 不一定要在
mutex已經(jīng)上鎖的情況下調(diào)用signal(理論上). - 在
signal之前一般要修改布爾表達(dá)式. - 修改布爾表達(dá)式通常要用
mutex保護(hù). - 注意區(qū)分
signal和broadcast:"broadcast"通常用于表明狀態(tài)變化,而signal表示資源可用.
寫成代碼是:
void enqueue(int x)
{
MutexLockGuard lock(mutex); // 加鎖
queue.push_back(x);
cond.signal(); // 可以移出臨界區(qū)之外
}
以上引自linux多線程服務(wù)端編程.
我來(lái)談一下我的理解:
cond中之所以需要mutex,是因?yàn)樵趫?zhí)行到
while (condition) {
cond.wait();
}
時(shí),需要將cond中持有的mutex解鎖.一旦接收到signal,它需要重新?lián)寠Z這個(gè)mutex,搶到了,才能從wait函數(shù)中返回.
為什么cond.wait()要放入while循環(huán)中呢?一方面是因?yàn)?code>spurious wakeup,之所以會(huì)有這個(gè)東西,是速度的考量,一般來(lái)說(shuō),即使沒有spurious wakeup,你也要這么寫代碼,舉個(gè)栗子.
在生產(chǎn)者消費(fèi)者模型之中,消費(fèi)者1獲得鎖,發(fā)現(xiàn)queue為空,wait,消費(fèi)者2獲得鎖,發(fā)現(xiàn)queue為空,wait,生產(chǎn)者3獲得鎖,將生產(chǎn)的產(chǎn)品放入queue,調(diào)用signal,并且釋放了mutex,t1,t2被喚醒,可以預(yù)見的是,這兩者只會(huì)有一個(gè)獲得鎖,消費(fèi)完這個(gè)產(chǎn)品,然后另一個(gè)獲得鎖,發(fā)現(xiàn)為空,還是得繼續(xù)等待,這就是while的由來(lái),當(dāng)然,至于signal為什么會(huì)喚醒多個(gè)線程,man手冊(cè)上就是這么說(shuō)的.
我們的代碼
```cpp
/*-
* 線程池的加強(qiáng)版本.主要是主線程統(tǒng)一接收連接,其余都是工作者線程,這里的布局非常類似于一個(gè)生產(chǎn)者.
* 多個(gè)消費(fèi)者.
*/
#define MAXNCLI 100
MutexLock mutex; /* 全局的鎖 */
Condition cond(mutex); /* 全局的條件變量 */
int clifd[MAXNCLI], iget, iput;
int main(int argc, char *argv[])
{
int listenfd = Open_listenfd(8080); /* 8080號(hào)端口監(jiān)聽 */
signal(SIGPIPE, SIG_IGN);
pthread_t tids[10];
void* thread_main(void *);
for (int i = 0; i < 10; ++i) {
int *arg = (int *)Malloc(sizeof(int));
*arg = i;
Pthread_create(&tids[i], NULL, thread_main, (void *)arg);
}
struct sockaddr cliaddr; /* 用于存儲(chǔ)對(duì)方的ip信息 */
socklen_t clilen;
for (; ; ) {
int connfd = Accept(listenfd, &cliaddr, &clilen);
{
MutexLockGuard lock(mutex); /* 加鎖 */
clifd[iput] = connfd; /* 涉及到對(duì)共享變量的修改,要加鎖 */
if (++iput == MAXNCLI) iput = 0;
if (iput == iget) unix_error("clifd is not big enough!\n");
}
cond.notify(); /* 通知一個(gè)線程有數(shù)據(jù)啦! */
}
return 0;
}
線程的代碼是這樣的:
void*
thread_main(void *arg)
{
int connfd;
printf("thread %d starting\n", *(int *)arg);
Free(arg);
for ( ; ;) {
{
MutexLockGuard lock(mutex); /* 加鎖 */
while (iget == iput) { /* 沒有新的連接到來(lái) */
/*-
* 代碼必須用while循環(huán)來(lái)等待條件變量,原因是spurious wakeup
*/
cond.wait(); /* 這一步會(huì)原子地unlock mutex并進(jìn)入等待,wait執(zhí)行完畢會(huì)自動(dòng)重新加鎖 */
}
connfd = clifd[iget]; /* 獲得連接套接字 */
if (++iget == MAXNCLI) iget = 0;
}
doit(connfd);
close(connfd);
}
}
總結(jié)
這個(gè)版本在原來(lái)的版本上增加了同步互斥操作,在某種程度上增加了難度.
具體代碼還是看這里吧!:https://github.com/lishuhuakai/Spweb