muduo源碼 ---ThreadPool介紹

muduo源碼分析系列 線程池的實(shí)現(xiàn)

分析線程池之前,先介紹線程
畢竟線程池里保存著每個(gè)線程
先分析Thread類

class Thread : noncopyable
{
 public:
  typedef std::function<void ()> ThreadFunc;

  explicit Thread(ThreadFunc, const string& name = string());
  // FIXME: make it movable in C++11
  ~Thread();

  void start();
  int join(); // return pthread_join()

  bool started() const { return started_; }
  // pthread_t pthreadId() const { return pthreadId_; }
  pid_t tid() const { return tid_; }
  const string& name() const { return name_; }

  static int numCreated() { return numCreated_.get(); }

 private:
  void setDefaultName();

  bool       started_;
  bool       joined_;
  pthread_t  pthreadId_;
  pid_t      tid_;
  ThreadFunc func_;
  string     name_;
  CountDownLatch latch_;

  static AtomicInt32 numCreated_;
};

仔細(xì)觀察其實(shí)就是把C11的thread相關(guān)的方法進(jìn)行了進(jìn)一步的封裝
但是有個(gè)地方 CountDownLatch是什么呢
舉個(gè)例子:在考試的時(shí)候 收卷老師必須要等到所有考生的卷子都收拾好了,才能離開教室。這就是latch的含義,意思是某個(gè)線程 必須要等待其他線程完成 才能執(zhí)行。直接看CountDownLatch的源碼

class CountDownLatch :    
{
 public:

  explicit CountDownLatch(int count);

  void wait();

  void countDown();

  int getCount() const;

 private:
  mutable MutexLock mutex_;
  Condition condition_ GUARDED_BY(mutex_);
  int count_ GUARDED_BY(mutex_);
};

看成員: 一個(gè)mutex_,一個(gè)條件變量condtion,一個(gè)公共count_

看方法

CountDownLatch::CountDownLatch(int count)
  : mutex_(),
    condition_(mutex_),
    count_(count)
{
}

void CountDownLatch::wait()
{
  MutexLockGuard lock(mutex_); //上鎖
  while (count_ > 0) //等待count變成0
  {
    condition_.wait();
  }
}

void CountDownLatch::countDown()
{
  MutexLockGuard lock(mutex_); //上鎖
  --count_; //減少count值
  if (count_ == 0)
  {
    condition_.notifyAll(); //count值為0了 喚醒等待的condtion_變量
  }
}

int CountDownLatch::getCount() const
{
  MutexLockGuard lock(mutex_);
  return count_;
}

看了實(shí)現(xiàn)其實(shí)很簡(jiǎn)單:本質(zhì)就是維護(hù)一個(gè)共享變量count_,這個(gè)count_理解成上面那個(gè)例子的學(xué)生,每次學(xué)生離開教室 那么就調(diào)用一次countDown方法,該方法將count-1,如果最后一個(gè)學(xué)生離開了那么count為0,則調(diào)用condtion_的notify方法,喚醒在wait里阻塞的的線程。

知道了latch的功能,就可以開始看Thread相關(guān)的方法了。

構(gòu)造函數(shù)

Thread::Thread(ThreadFunc func, const string& n)
  : started_(false),
    joined_(false),
    pthreadId_(0),
    tid_(0),
    func_(std::move(func)),
    name_(n),
    latch_(1) //latch為1 說(shuō)明只需要等待一個(gè)線程countDown即可
{
  setDefaultName();
}

來(lái)看最關(guān)鍵的start方法

void Thread::start()
{
  assert(!started_);
  started_ = true;
  // FIXME: move(func_)
  detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
  if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
  {
    started_ = false;
    delete data; // or no delete?
    LOG_SYSFATAL << "Failed in pthread_create";
  }
  else
  {
    latch_.wait();
    assert(tid_ > 0);
  }
}

這里構(gòu)造了一個(gè) ThreadData類,然后調(diào)用系統(tǒng)api,創(chuàng)建出一個(gè)新的線程,且這個(gè)線程執(zhí)行的函數(shù)是ThreadData里的detail::startThread()方法(執(zhí)行用戶的func)。
如果創(chuàng)建失敗,就delete掉,成功就等待latch里的計(jì)數(shù)器變?yōu)?(如果latch是大于0的話),否則就一直阻塞。

來(lái)看看這個(gè)類的聲明

ThreadData(ThreadFunc func,
             const string& name,
             pid_t* tid,
             CountDownLatch* latch)
    : func_(std::move(func)),
      name_(name),
      tid_(tid),
      latch_(latch)
  { }

  void runInThread()
  {
    *tid_ = muduo::CurrentThread::tid();
    tid_ = NULL;
    latch_->countDown(); //將latch_計(jì)數(shù)器-1 并且如果等于0的時(shí)候 喚醒 latch_.wait線程
    latch_ = NULL;

    muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();
    ::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
    try
    {
      func_(); //執(zhí)行func
      muduo::CurrentThread::t_threadName = "finished";
    }
    catch (const Exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
      abort();
    }
    catch (const std::exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      abort();
    }
    catch (...)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
      throw; // rethrow
    }
  }

上面一堆函數(shù),本質(zhì)上最重要的還是最終執(zhí)行了用戶傳遞的func

void* startThread(void* obj)
{
  ThreadData* data = static_cast<ThreadData*>(obj);
  data->runInThread();
  delete data;
  return NULL;
}

threadDetail類里的一個(gè)方法,通過(guò)萬(wàn)能指針void*進(jìn)行強(qiáng)制轉(zhuǎn)化成目標(biāo)ThreadData類,然后執(zhí)行runInThread,執(zhí)行完delete掉這個(gè)data。

看完這些設(shè)計(jì)其實(shí)可以明白,為什么要這么設(shè)計(jì)呢?個(gè)人認(rèn)為還是考慮到線程子資源復(fù)用的問(wèn)題。把線程里執(zhí)行的函數(shù)封裝成data類,這樣每次執(zhí)行完畢只需要將data刪除掉,而不需要去重復(fù)分配和刪除掉線程。


image.png

并且 也能說(shuō)明了。為什么runInThread這里tid需要為空了,因?yàn)閳?zhí)行完這個(gè)data之后就不需要這個(gè)data對(duì)象的數(shù)據(jù)了,latch_置為null也是一樣的。

感嘆陳碩大佬的代碼功底

講完Thread可以開始講ThreadPool了

先上代碼

class ThreadPool : noncopyable
{
 public:
  typedef std::function<void ()> Task;

  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  ~ThreadPool();

  // Must be called before start().
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
  void setThreadInitCallback(const Task& cb)
  { threadInitCallback_ = cb; }

  void start(int numThreads);
  void stop();

  const string& name() const
  { return name_; }

  size_t queueSize() const;

  void run(Task f);

 private:
  bool isFull() const REQUIRES(mutex_);
  void runInThread();
  Task take();

  mutable MutexLock mutex_;
  Condition notEmpty_ GUARDED_BY(mutex_);
  Condition notFull_ GUARDED_BY(mutex_);
  string name_;
  Task threadInitCallback_;
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_);
  size_t maxQueueSize_;
  bool running_;
};

以上是類的聲明

接下來(lái)是對(duì)每個(gè)成員變量的解釋:
mutable MutexLock mutex_
這是作者把mutex互斥鎖進(jìn)行了一個(gè)封裝

class CAPABILITY("mutex") MutexLock : noncopyable
{
 public:
  MutexLock()
    : holder_(0)
  {
    MCHECK(pthread_mutex_init(&mutex_, NULL)); //初始化調(diào)用系統(tǒng)函數(shù)init
  }

  ~MutexLock()
  {
    assert(holder_ == 0);
    MCHECK(pthread_mutex_destroy(&mutex_)); //銷毀調(diào)用系統(tǒng)函數(shù) destroy
  }

  // must be called when locked, i.e. for assertion
  bool isLockedByThisThread() const
  {
    return holder_ == CurrentThread::tid(); //判斷這個(gè)鎖是否是當(dāng)前線程鎖住的
  }

  void assertLocked() const ASSERT_CAPABILITY(this)
  {
    assert(isLockedByThisThread());
  }

  // internal usage

  void lock() ACQUIRE()
  {
    MCHECK(pthread_mutex_lock(&mutex_)); //加鎖
    assignHolder();
  }

  void unlock() RELEASE()
  {
    unassignHolder();
    MCHECK(pthread_mutex_unlock(&mutex_)); //解鎖
  }

  pthread_mutex_t* getPthreadMutex() /* non-const */
  {
    return &mutex_;  /
  }

 private:
  friend class Condition;

  class UnassignGuard : noncopyable
  {
   public:
    explicit UnassignGuard(MutexLock& owner)
      : owner_(owner)
    {
      owner_.unassignHolder();
    }

    ~UnassignGuard()
    {
      owner_.assignHolder();
    }

   private:
    MutexLock& owner_;
  };

  void unassignHolder()
  {
    holder_ = 0;
  }

  void assignHolder()
  {
    holder_ = CurrentThread::tid();
  }

  pthread_mutex_t mutex_;
  pid_t holder_;
};

直接看這個(gè)Mutex類的成員對(duì)象:
pthread_mutex_t mutex_;
pid_t holder_;

也就是說(shuō) 這個(gè)Mutex類對(duì)pthread_mutex_t 和pid進(jìn)行了封裝,每個(gè)mutex對(duì)象都有一個(gè)鎖和 掌握該鎖的線程pid。

再看接下來(lái)的ThreadPool成員
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
兩個(gè)條件變量
string name_;
名稱
Task threadInitCallback_;
Task是 typedef std::function<void ()> Task;
本質(zhì)是function封裝的函數(shù)

線程池初始化執(zhí)行的函數(shù)
std::vector<std::unique_ptr<muduo::Thread>> threads_;
線程vector
std::deque<Task> queue_ GUARDED_BY(mutex_);
任務(wù)隊(duì)列
size_t maxQueueSize_;
隊(duì)列最大任務(wù)數(shù)量
bool running_;
是否在執(zhí)行

開始分析這個(gè)線程池的各個(gè)函數(shù)

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
    notEmpty_(mutex_),
    notFull_(mutex_),
    name_(nameArg),
    maxQueueSize_(0),
    running_(false)
{
}

成員初始化

void ThreadPool::start(int numThreads)
{
  assert(threads_.empty());
  running_ = true; //設(shè)置為運(yùn)行狀態(tài)
  threads_.reserve(numThreads); //為線程數(shù)量分配vector大小
  for (int i = 0; i < numThreads; ++i) 
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1);
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));  //new一個(gè)thread對(duì)象 加入到 vector中
    threads_[i]->start(); //執(zhí)行thread
  }
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();  //僅當(dāng)傳參是0且初始化函數(shù)是非空 執(zhí)行初始化函數(shù)
  }
}

start 方法 具體thread類后續(xù)分析

void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_); // 當(dāng)前線程池加鎖 防止別的線程使用
  running_ = false;  //設(shè)置為結(jié)束
  notEmpty_.notifyAll(); //喚醒條件
  notFull_.notifyAll();//喚醒條件
  }
  for (auto& thr : threads_)
  {
    thr->join();  //將所有thread結(jié)束掉
  }
}

stop方法
上面的
notEmpty_.notifyAll(); //喚醒所有等待 當(dāng)前條件變量的線程
notFull_.notifyAll();//喚醒所有等待當(dāng)前條件變量的線程

實(shí)際上就是 調(diào)用系統(tǒng)api

  void notifyAll()
  {
    MCHECK(pthread_cond_broadcast(&pcond_));  //pcond就是condtion中的成員你變量
  }

通俗的說(shuō)就是:在線程池中,多個(gè)線程可能會(huì)同時(shí)等待同一個(gè)條件變量 ,此時(shí)在等待的時(shí)候 會(huì)有多個(gè)線程被掛起,所以調(diào)用notifyAll把所有阻塞的線程喚醒,這樣才能進(jìn)行后續(xù)的join操作。
但是,在當(dāng)前線程池線程中,實(shí)際上這兩個(gè)條件變量更多的表示一個(gè)當(dāng)前的狀態(tài)。
notEmpty_在wait的情況 :說(shuō)明當(dāng)前的線程池并不處于非空的情況 ==》 當(dāng)前線程池是空的(queue是空的)
notEmpty在notify的情況:當(dāng)前線程池里的queue是非空,說(shuō)明有task任務(wù)需要執(zhí)行

同理notFull也是一樣

如果還不理解可以仔細(xì)google一下條件變量的用法

void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
    task(); //如果當(dāng)前線程池子沒(méi)有線程,直接使用線程池所在的線程執(zhí)行任務(wù)
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull() && running_) //如果當(dāng)前的線程池子里所有線程都被占用了
    {
      notFull_.wait();  //說(shuō)明當(dāng)前的線程池處于滿任務(wù)狀態(tài) 阻塞
    }
    if (!running_) return;
    assert(!isFull());

    queue_.push_back(std::move(task)); //任務(wù)隊(duì)列入隊(duì)
    notEmpty_.notify(); //喚醒所有使用notEmpty條件變量的線程
  }
}

run方法
實(shí)際上就是在運(yùn)行start方法之后,暴露給用戶使用的接口。
start方法:設(shè)置當(dāng)前線程池的線程數(shù)量。run方法,用戶通過(guò)封裝task傳遞給run方法,run方法里將task存入queue中,等待runInThread對(duì)task進(jìn)行Take()

ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();
  }
  Task task;
  if (!queue_.empty())
  {
    task = queue_.front();
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
      notFull_.notify(); 
    }
  }
  return task;
}

take方法,實(shí)際上就是從queue隊(duì)列中取出任務(wù),并且返回任務(wù)

void ThreadPool::runInThread()
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_(); //初始化函數(shù)
    }
    while (running_)
    {
      Task task(take());
      if (task)
      {
        task();
      }
    }
  }
  catch (const Exception &ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception &ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}

runInThread方法,執(zhí)行take方法,將task真正執(zhí)行

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概述 CPU、內(nèi)存和 IO 之間存在著巨大性能差異[https://zhuanlan.zhihu.com/p/58...
    bowen_wu閱讀 478評(píng)論 0 0
  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,018評(píng)論 1 19
  • Java 給多線程編程提供了內(nèi)置的支持。 一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每...
    tanghomvee閱讀 5,087評(píng)論 0 3
  • 12 線程池原理13 阻塞隊(duì)列14 鎖接口和類15 并發(fā)集合容器簡(jiǎn)介16 CopyOnWrite17 通信工具類1...
    碼代碼的小矮子閱讀 446評(píng)論 0 0
  • 引出系統(tǒng)編程 ============= 1.實(shí)際開發(fā)中:需要多個(gè)程序能夠"同時(shí)"運(yùn)行(一心多用) ...
    stalker丨閱讀 1,464評(píng)論 0 0

友情鏈接更多精彩內(nèi)容