異步日志
該類
AsyncLogging
成員變量
- MutexLock mutex_;
互斥量,線程安全的添加日志。 - Condition cond_;
條件變量,當(dāng)有日志添加進(jìn)來的時候通知。當(dāng)日志緩沖區(qū)沒準(zhǔn)備好的時候等待 - Thread thread_;
為添加日志單獨(dú)建一個線程。 - string basename_;
日志文件頭部文件名 - size_t rollSize_;
更換新日志文件的間隔 - bool running_;
確定日志文件是否在運(yùn)行。
其實(shí)主要是條件變量的循環(huán)的判斷條件。 - const int flushInterval_;
沖刷間隔 - CountDownLatch latch_;
當(dāng)線程開始運(yùn)行以后,才繼續(xù)下面的程序
應(yīng)該是怕日志線程沒準(zhǔn)備好,但卻添加日志? - BufferVector buffers_;
已更換下來的日志文件的存儲區(qū)。
FixedBuffer<muduo::detail::kLargeBuffer> Buffer
boost::ptr_vector<Buffer> BufferVector;
使用的仍然是一種指針的容器。 - BufferPtr nextBuffer_,BufferPtr currentBuffer_;
下一塊日志的緩沖區(qū),和日志的緩沖區(qū)
構(gòu)造函數(shù)
AsyncLogging(const string& basename,size_t rollSize,int flushInterval = 3)
: flushInterval_(flushInterval),
running_(false),
basename_(basename),
rollSize_(rollSize),
thread_(boost::bind(&AsyncLogging::threadFunc, this), "Logging"),
//直接初始化日志線程
latch_(1),
//日志線程中的函數(shù)運(yùn)行起來以后再返回
mutex_(),
//MutexLock默認(rèn)初始化
cond_(mutex_),
//條件變量,初始化,需要在mutex_后面
currentBuffer_(new Buffer),
//LogStream中FixedBuffer里面的
nextBuffer_(new Buffer),
//就是一個char []
buffers_()
//boost的指針容器
{
currentBuffer_->bzero();
//置空char []
nextBuffer_->bzero();
buffers_.reserve(16);
//調(diào)整日志容器的大小
}
append
添加日志的函數(shù)
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
//線程安全的添加日志
if (currentBuffer_->avail() > len)
//如果緩沖區(qū)剩余大小足夠在寫入
{
currentBuffer_->append(logline, len);
}
else
{
buffers_.push_back(currentBuffer_.release());
//如果緩沖區(qū)大小不夠,那么當(dāng)前指針指向的緩沖區(qū)存入容器,
//同時釋放當(dāng)前指針的指向
if (nextBuffer_)
//如果下一個指針指向的緩沖區(qū)存在,那么指向給當(dāng)前指針
{
currentBuffer_ = boost::ptr_container::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
//如果下一塊緩沖區(qū)沒準(zhǔn)備好,那么當(dāng)前指針新建一個緩沖區(qū)
}
currentBuffer_->append(logline, len);
//經(jīng)過上面這些操作,就是準(zhǔn)備好了緩沖區(qū),添加
cond_.notify();
//不太清楚作用
}
}
threadFunc()
運(yùn)行在日志線程中的函數(shù)。
主要是:
- 循環(huán)向磁盤中寫入文件。
- 如果上次寫入到這次寫入之間沒有添加日志,那么就等待固定時間。
- 進(jìn)入循環(huán)之先初始化了兩個緩沖區(qū),用來賦值給當(dāng)前緩沖區(qū)指針和下一個緩沖區(qū)指針。
這兩個緩沖區(qū)將一直存在,寫入在緩沖區(qū)容器的前兩個元素,交換以后,也是在待寫入緩沖區(qū)的前兩個元素,然后被pop出來。所以這兩個緩沖區(qū)一直存在 - 兩個緩沖區(qū),一個類的成員變量,一個是將要寫入到文件的待寫入緩沖區(qū)容器。
緩沖區(qū)容器交還給待寫入緩沖區(qū)容器,然后待寫入緩沖區(qū)容器在清空的時候留兩個緩沖區(qū)pop出來給兩個緩沖區(qū)指針,省去初始化的時間?
void AsyncLogging::threadFunc()
//在日志線程中運(yùn)行的函數(shù)
{
assert(running_ == true);
latch_.countDown();
//當(dāng)線程運(yùn)行起來以后,Thread的start函數(shù)才能繼續(xù)往下執(zhí)行
LogFile output(basename_, rollSize_, false); //初始化日志文件
BufferPtr newBuffer1(new Buffer); //兩個互換的緩沖區(qū)
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero(); //重置
newBuffer2->bzero();
BufferVector buffersToWrite; //緩沖區(qū)容器要被寫入文件
buffersToWrite.reserve(16); //大小
while (running_) //循環(huán)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage!
//緩沖容器如果是空的話,也就切換容器這段時間沒添加日志
{
cond_.waitForSeconds(flushInterval_); //就是定期寫入
}
//等一會,然后不管有沒有添加日志,都放入緩沖區(qū)
buffers_.push_back(currentBuffer_.release());
//不是空的話,之前寫過,直接將當(dāng)前緩沖的緩沖存到緩沖容器中
currentBuffer_ = boost::ptr_container::move(newBuffer1);
//將兩個互換緩沖一個賦值給單錢指針
buffersToWrite.swap(buffers_);
//緩沖容器和寫入緩沖容器互換
if (!nexttBuffer_)
//如果下一個指針為空,那么賦值
{
nextBuffer_ = boost::ptr_container::move(newBuffer2);
}
}
assert(!buffersToWrite.empty()); //不是空,繼續(xù)
if (buffersToWrite.size() > 25)
//如果待寫入緩沖容器太多,那么直接扔掉記錄
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
//但是為什么保存了前2個??
}
for (size_t i = 0; i < buffersToWrite.size(); ++i)
//待緩沖容器中日志寫入到文件
{
output.append(buffersToWrite[i].data(), buffersToWrite[i].length());
}
if (buffersToWrite.size() > 2)
//如果之前沒扔記錄應(yīng)該多余2,否則就兩個
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
//保留兩個元素,其他銷毀
}
if (!newBuffer1)
//互換緩沖區(qū)1不存在
{
assert(!buffersToWrite.empty());
//這個是省去了創(chuàng)建新緩沖區(qū)的時間,直接使用舊的緩沖區(qū),
newBuffer1 = buffersToWrite.pop_back();
newBuffer1->reset();
//重置,使用舊的緩沖區(qū)
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
//清除緩沖區(qū),寫入到文件
output.flush();
}
output.flush();
//停止運(yùn)行以后寫入一次
}
另外還有一個start(),stop()函數(shù),啟動和停止。
嗯,思想很厲害。
連個一直存在的緩沖區(qū)。
兩個一直一直在不停交換的緩沖區(qū)容器。
兩個緩沖區(qū)容器不停交換,這樣可以縮短互斥量鎖住的時間,交換完了就可以解鎖互斥量