Android 中的 libnbaio (Non-Blocking Audio I/O 的縮寫) 庫主要是為非阻塞的音頻 I/O 設計的,但現(xiàn)在它也包含了一些接口的阻塞實現(xiàn),因而它的主要作用也變成了為各種音頻 I/O 設施提供統(tǒng)一的讀寫接口。libnbaio 庫主要用在 audioflinger 和一些音頻 HAL 模塊的實現(xiàn)中。libnbaio 庫提供的主要組件是 Pipe 和 MonoPipe,其中 Pipe 支持單個寫者,和 N 個讀者,MonoPipe 支持單個寫者和單個讀者。
Pipe 支持的特性如下:
沒有互斥量,在 SCHED_NORMAL 和 SCHED_FIFO 的線程之間使用是安全的。
-
寫入:
- 非阻塞
- 如果沒有足夠的數(shù)據(jù)則返回實際傳輸?shù)膸瑪?shù)
- 如果消費數(shù)據(jù)的速度不夠塊,則覆蓋數(shù)據(jù)
-
讀?。?/p>
- 非阻塞
- 如果沒有足夠的數(shù)據(jù)則返回實際傳輸?shù)膸瑪?shù)
- 如果讀取速度跟不上,將會丟失數(shù)據(jù)
MonoPipe 支持的特性如下:
沒有互斥量,在 SCHED_NORMAL 和 SCHED_FIFO 的線程之間使用是安全的。
-
寫入:
- 是否阻塞是可選的
- 如果配置為阻塞,則在返回前將等待,直到空間可用
- 如果配置為非阻塞,則將返回實際傳輸?shù)膸瑪?shù),并且不會覆蓋數(shù)據(jù)
-
讀?。?/p>
- 非阻塞
- 如果沒有足夠的數(shù)據(jù)則返回實際傳輸?shù)膸瑪?shù)
- 從不丟失數(shù)據(jù)
這里看一下 libnbaio 庫的設計和實現(xiàn),代碼分析基于 android-12.1.0_r27 版進行。
libnbaio 庫提供的主要抽象是 NBAIO_Sink 和 NBAIO_Source,它們分別用于寫入音頻數(shù)據(jù)和讀取音頻數(shù)據(jù)。libnbaio 庫各個部分的繼承層次結構如下圖:

LibsndfileSource 和 LibsndfileSink 分別封裝了 libsndfile 以 SFM_READ 和 SFM_WRITE 模式打開的文件來實現(xiàn) NBAIO_Source 和 NBAIO_Sink。這兩個組件,由于 libsndfile 許可證不兼容,默認不會被編進 Android 系統(tǒng)中。在整個 Android 系統(tǒng)中,找不到使用它們的地方,隨著 Android 系統(tǒng)的發(fā)展,它們大概已處于年久失修不可用的狀態(tài)。
AudioStreamInSource 和 AudioStreamOutSink 分別封裝了 Audio HAL 的 StreamInHalInterface 和 StreamOutHalInterface 接口實現(xiàn) NBAIO_Source 和 NBAIO_Sink,它們不是多線程安全的。
Pipe 和 PipeReader 配合使用,分別用于寫入音頻數(shù)據(jù)和讀取音頻數(shù)據(jù)。MonoPipe 和 MonoPipeReader 配合使用,分別用于寫入音頻數(shù)據(jù)和讀取音頻數(shù)據(jù)。
SourceAudioBufferProvider 封裝 NBAIO_Source 并最終繼承自 AudioBufferProvider,它主要用于將 libnbaio 庫的那些組件接進 Android 系統(tǒng)的其它音頻模塊里,如混音器等。
MonoPipe、MonoPipeReader、Pipe 和 PipeReader 實現(xiàn)相關組件的結構如下圖:

在基本結構方面,MonoPipe 和 MonoPipeReader 及 Pipe 和 PipeReader 基本相同,但它們分別實現(xiàn)不同的讀寫策略。Pipe 分配一塊內存來給 audio_utils_fifo 管理,用于交換數(shù)據(jù);讀寫數(shù)據(jù)最終都通過 audio_utils_fifo 進行;Pipe 包含一個 audio_utils_fifo_writer 對象,audio_utils_fifo_writer 對象持有對 audio_utils_fifo 對象的引用,audio_utils_fifo_writer 用于向 audio_utils_fifo 寫入音頻數(shù)據(jù);PipeReader 持有 Pipe 對象的引用,且包含一個 audio_utils_fifo_reader 對象;audio_utils_fifo_reader 對象持有對 Pipe 的 audio_utils_fifo 對象的引用,用于從 audio_utils_fifo 讀取音頻數(shù)據(jù);Pipe 和 PipeReader 通過共用的 audio_utils_fifo 對象交換數(shù)據(jù),它們分別扮演音頻數(shù)據(jù)的寫者和讀者角色。MonoPipe 和 MonoPipeReader 與 Pipe 和 PipeReader 有著相同的基本結構和數(shù)據(jù)交換方式。從數(shù)據(jù)流的角度來看,這些組件有如下這樣的結構:

來看一下 MonoPipe 和 MonoPipeReader 與 Pipe 和 PipeReader 等的具體實現(xiàn)。Pipe 類的定義 (位于 frameworks/av/media/libnbaio/include/media/nbaio/Pipe.h) 如下:
class Pipe : public NBAIO_Sink {
friend class PipeReader;
public:
// maxFrames will be rounded up to a power of 2, and all slots are available. Must be >= 2.
// buffer is an optional parameter specifying the virtual address of the pipe buffer,
// which must be of size roundup(maxFrames) * Format_frameSize(format) bytes.
Pipe(size_t maxFrames, const NBAIO_Format& format, void *buffer = NULL);
// If a buffer was specified in the constructor, it is not automatically freed by destructor.
virtual ~Pipe();
// NBAIO_Port interface
//virtual ssize_t negotiate(const NBAIO_Format offers[], size_t numOffers,
// NBAIO_Format counterOffers[], size_t& numCounterOffers);
//virtual NBAIO_Format format() const;
// NBAIO_Sink interface
//virtual int64_t framesWritten() const;
//virtual int64_t framesUnderrun() const;
//virtual int64_t underruns() const;
// The write side of a pipe permits overruns; flow control is the caller's responsibility.
// It doesn't return +infinity because that would guarantee an overrun.
virtual ssize_t availableToWrite() { return mMaxFrames; }
virtual ssize_t write(const void *buffer, size_t count);
//virtual ssize_t writeVia(writeVia_t via, size_t total, void *user, size_t block);
private:
const size_t mMaxFrames; // always a power of 2
void * const mBuffer;
audio_utils_fifo mFifo;
audio_utils_fifo_writer mFifoWriter;
volatile int32_t mReaders; // number of PipeReader clients currently attached to this Pipe
const bool mFreeBufferInDestructor;
};
Pipe 對象在構造時,需要傳入最大幀數(shù),音頻數(shù)據(jù)格式,和可選的一塊用于交換數(shù)據(jù)的緩沖區(qū)。最大幀數(shù)將被向上對齊到 2 的次冪。數(shù)據(jù)緩沖區(qū)的大小必須是 向上對齊之后的最大幀數(shù) roundup(maxFrames) * 幀大小 Format_frameSize(format) 個字節(jié)。Pipe 對象構造時,如果沒有傳分配好的內存塊進來,則會分配一塊內存,在 Pipe 對象析構時,釋放分配的內存。Pipe 的構造和析構函數(shù)定義 (位于 frameworks/av/media/libnbaio/Pipe.cpp) 如下:
Pipe::Pipe(size_t maxFrames, const NBAIO_Format& format, void *buffer) :
NBAIO_Sink(format),
// TODO fifo now supports non-power-of-2 buffer sizes, so could remove the roundup
mMaxFrames(roundup(maxFrames)),
mBuffer(buffer == NULL ? malloc(mMaxFrames * Format_frameSize(format)) : buffer),
mFifo(mMaxFrames, Format_frameSize(format), mBuffer, false /*throttlesWriter*/),
mFifoWriter(mFifo),
mReaders(0),
mFreeBufferInDestructor(buffer == NULL)
{
}
Pipe::~Pipe()
{
ALOG_ASSERT(android_atomic_acquire_load(&mReaders) == 0);
if (mFreeBufferInDestructor) {
free(mBuffer);
}
}
Pipe 和 PipeReader 的讀寫操作的線程安全語義,主要由底層的 audio_utils_fifo_reader、audio_utils_fifo_writer 和 audio_utils_fifo 實現(xiàn)。Pipe 的 write() 函數(shù)實現(xiàn) (位于 frameworks/av/media/libnbaio/Pipe.cpp) 如下:
ssize_t Pipe::write(const void *buffer, size_t count)
{
// count == 0 is unlikely and not worth checking for
if (CC_UNLIKELY(!mNegotiated)) {
return NEGOTIATE;
}
ssize_t actual = mFifoWriter.write(buffer, count);
ALOG_ASSERT(actual <= count);
if (actual <= 0) {
return actual;
}
mFramesWritten += (size_t) actual;
return actual;
}
用于從 Pipe 寫入數(shù)據(jù)的緩沖區(qū)中讀取數(shù)據(jù)的 PipeReader 類的定義 (位于 frameworks/av/media/libnbaio/include/media/nbaio/PipeReader.h) 如下:
class PipeReader : public NBAIO_Source {
public:
// Construct a PipeReader and associate it with a Pipe
// FIXME make this constructor a factory method of Pipe.
PipeReader(Pipe& pipe);
virtual ~PipeReader();
// NBAIO_Port interface
//virtual ssize_t negotiate(const NBAIO_Format offers[], size_t numOffers,
// NBAIO_Format counterOffers[], size_t& numCounterOffers);
//virtual NBAIO_Format format() const;
// NBAIO_Source interface
//virtual size_t framesRead() const;
virtual int64_t framesOverrun() { return mFramesOverrun; }
virtual int64_t overruns() { return mOverruns; }
virtual ssize_t availableToRead();
virtual ssize_t read(void *buffer, size_t count);
virtual ssize_t flush();
// NBAIO_Source end
#if 0 // until necessary
Pipe& pipe() const { return mPipe; }
#endif
private:
Pipe& mPipe;
audio_utils_fifo_reader mFifoReader;
int64_t mFramesOverrun;
int64_t mOverruns;
};
PipeReader 只對單個線程是安全的。不同的線程可以通過不同的 PipeReader 對象讀取相同 Pipe 的數(shù)據(jù),但不同線程不能通過相同的 PipeReader 對象讀取數(shù)據(jù)。PipeReader 的各個成員函數(shù)定義 (位于 frameworks/av/media/libnbaio/PipeReader.cpp) 如下:
PipeReader::PipeReader(Pipe& pipe) :
NBAIO_Source(pipe.mFormat),
mPipe(pipe), mFifoReader(mPipe.mFifo, false /*throttlesWriter*/, false /*flush*/),
mFramesOverrun(0),
mOverruns(0)
{
android_atomic_inc(&pipe.mReaders);
}
PipeReader::~PipeReader()
{
#if !LOG_NDEBUG
int32_t readers =
#else
(void)
#endif
android_atomic_dec(&mPipe.mReaders);
ALOG_ASSERT(readers > 0);
}
ssize_t PipeReader::availableToRead()
{
if (CC_UNLIKELY(!mNegotiated)) {
return NEGOTIATE;
}
size_t lost;
ssize_t avail = mFifoReader.available(&lost);
if (avail == -EOVERFLOW || lost > 0) {
mFramesOverrun += lost;
++mOverruns;
avail = OVERRUN;
}
return avail;
}
ssize_t PipeReader::read(void *buffer, size_t count)
{
size_t lost;
ssize_t actual = mFifoReader.read(buffer, count, NULL /*timeout*/, &lost);
ALOG_ASSERT(actual <= count);
if (actual == -EOVERFLOW || lost > 0) {
mFramesOverrun += lost;
++mOverruns;
actual = OVERRUN;
}
if (actual <= 0) {
return actual;
}
mFramesRead += (size_t) actual;
return actual;
}
ssize_t PipeReader::flush()
{
if (CC_UNLIKELY(!mNegotiated)) {
return NEGOTIATE;
}
size_t lost;
ssize_t flushed = mFifoReader.flush(&lost);
if (flushed == -EOVERFLOW || lost > 0) {
mFramesOverrun += lost;
++mOverruns;
flushed = OVERRUN;
}
if (flushed <= 0) {
return flushed;
}
mFramesRead += (size_t) flushed; // we consider flushed frames as read, but not lost frames
return flushed;
}
PipeReader 的讀取操作發(fā)現(xiàn)寫入數(shù)據(jù)有溢出時,即使讀取到了一部分數(shù)據(jù),也會返回錯誤,它獲得可讀取數(shù)據(jù)量和 flush() 操作也有類似語義。
MonoPipe 的音頻數(shù)據(jù)寫入語義在其 write() 操作中實現(xiàn),該函數(shù)定義 (位于 frameworks/av/media/libnbaio/MonoPipe.cpp) 如下:
ssize_t MonoPipe::write(const void *buffer, size_t count)
{
if (CC_UNLIKELY(!mNegotiated)) {
return NEGOTIATE;
}
size_t totalFramesWritten = 0;
while (count > 0) {
ssize_t actual = mFifoWriter.write(buffer, count);
ALOG_ASSERT(actual <= count);
if (actual < 0) {
if (totalFramesWritten == 0) {
return actual;
}
break;
}
size_t written = (size_t) actual;
totalFramesWritten += written;
if (!mWriteCanBlock || mIsShutdown) {
break;
}
count -= written;
buffer = (char *) buffer + (written * mFrameSize);
// TODO Replace this whole section by audio_util_fifo's setpoint feature.
// Simulate blocking I/O by sleeping at different rates, depending on a throttle.
// The throttle tries to keep the mean pipe depth near the setpoint, with a slight jitter.
uint32_t ns;
if (written > 0) {
ssize_t avail = mFifoWriter.available();
ALOG_ASSERT(avail <= mMaxFrames);
if (avail < 0) {
// don't return avail as status, because totalFramesWritten > 0
break;
}
size_t filled = mMaxFrames - (size_t) avail;
// FIXME cache these values to avoid re-computation
if (filled <= mSetpoint / 2) {
// pipe is (nearly) empty, fill quickly
ns = written * ( 500000000 / Format_sampleRate(mFormat));
} else if (filled <= (mSetpoint * 3) / 4) {
// pipe is below setpoint, fill at slightly faster rate
ns = written * ( 750000000 / Format_sampleRate(mFormat));
} else if (filled <= (mSetpoint * 5) / 4) {
// pipe is at setpoint, fill at nominal rate
ns = written * (1000000000 / Format_sampleRate(mFormat));
} else if (filled <= (mSetpoint * 3) / 2) {
// pipe is above setpoint, fill at slightly slower rate
ns = written * (1150000000 / Format_sampleRate(mFormat));
} else if (filled <= (mSetpoint * 7) / 4) {
// pipe is overflowing, fill slowly
ns = written * (1350000000 / Format_sampleRate(mFormat));
} else {
// pipe is severely overflowing
ns = written * (1750000000 / Format_sampleRate(mFormat));
}
} else {
ns = count * (1350000000 / Format_sampleRate(mFormat));
}
if (ns > 999999999) {
ns = 999999999;
}
struct timespec nowTs;
bool nowTsValid = !clock_gettime(CLOCK_MONOTONIC, &nowTs);
// deduct the elapsed time since previous write() completed
if (nowTsValid && mWriteTsValid) {
time_t sec = nowTs.tv_sec - mWriteTs.tv_sec;
long nsec = nowTs.tv_nsec - mWriteTs.tv_nsec;
ALOGE_IF(sec < 0 || (sec == 0 && nsec < 0),
"clock_gettime(CLOCK_MONOTONIC) failed: was %ld.%09ld but now %ld.%09ld",
mWriteTs.tv_sec, mWriteTs.tv_nsec, nowTs.tv_sec, nowTs.tv_nsec);
if (nsec < 0) {
--sec;
nsec += 1000000000;
}
if (sec == 0) {
if ((long) ns > nsec) {
ns -= nsec;
} else {
ns = 0;
}
}
}
if (ns > 0) {
const struct timespec req = {0, static_cast<long>(ns)};
nanosleep(&req, NULL);
}
// record the time that this write() completed
if (nowTsValid) {
mWriteTs = nowTs;
if ((mWriteTs.tv_nsec += ns) >= 1000000000) {
mWriteTs.tv_nsec -= 1000000000;
++mWriteTs.tv_sec;
}
}
mWriteTsValid = nowTsValid;
}
mFramesWritten += totalFramesWritten;
return totalFramesWritten;
}
如果是同步寫入,沒有足夠的空間寫入全部數(shù)據(jù)時,會根據(jù)當前緩沖區(qū)中已經(jīng)寫入的數(shù)據(jù)的水位計算休眠時間,并休眠等待,否則返回實際寫入的音頻幀個數(shù)。MonoPipeReader 的讀取操作不關心寫入數(shù)據(jù)的溢出,相關操作實現(xiàn) (位于 frameworks/av/media/libnbaio/MonoPipeReader.cpp) 如下:
ssize_t MonoPipeReader::availableToRead()
{
if (CC_UNLIKELY(!mNegotiated)) {
return NEGOTIATE;
}
ssize_t ret = mFifoReader.available();
ALOG_ASSERT(ret <= mPipe->mMaxFrames);
return ret;
}
ssize_t MonoPipeReader::read(void *buffer, size_t count)
{
// count == 0 is unlikely and not worth checking for explicitly; will be handled automatically
ssize_t actual = mFifoReader.read(buffer, count);
ALOG_ASSERT(actual <= count);
if (CC_UNLIKELY(actual <= 0)) {
return actual;
}
mFramesRead += (size_t) actual;
return actual;
}
AudioStreamInSource 是對 StreamInHalInterface 的封裝,它通過后者讀取數(shù)據(jù),讀取的數(shù)據(jù)格式會按照從 StreamInHalInterface 獲得的數(shù)據(jù)格式進行。AudioStreamInSource 各個成員函數(shù)的定義 (位于 frameworks/av/media/libnbaio/AudioStreamInSource.cpp) 如下:
AudioStreamInSource::AudioStreamInSource(sp<StreamInHalInterface> stream) :
NBAIO_Source(),
mStream(stream),
mStreamBufferSizeBytes(0),
mFramesOverrun(0),
mOverruns(0)
{
ALOG_ASSERT(stream != 0);
}
AudioStreamInSource::~AudioStreamInSource()
{
mStream.clear();
}
ssize_t AudioStreamInSource::negotiate(const NBAIO_Format offers[], size_t numOffers,
NBAIO_Format counterOffers[], size_t& numCounterOffers)
{
if (!Format_isValid(mFormat)) {
status_t result;
result = mStream->getBufferSize(&mStreamBufferSizeBytes);
if (result != OK) return result;
audio_config_base_t config = AUDIO_CONFIG_BASE_INITIALIZER;
result = mStream->getAudioProperties(&config);
if (result != OK) return result;
mFormat = Format_from_SR_C(config.sample_rate,
audio_channel_count_from_in_mask(config.channel_mask), config.format);
mFrameSize = Format_frameSize(mFormat);
}
return NBAIO_Source::negotiate(offers, numOffers, counterOffers, numCounterOffers);
}
int64_t AudioStreamInSource::framesOverrun()
{
uint32_t framesOverrun;
status_t result = mStream->getInputFramesLost(&framesOverrun);
if (result == OK && framesOverrun > 0) {
mFramesOverrun += framesOverrun;
// FIXME only increment for contiguous ranges
++mOverruns;
} else if (result != OK) {
ALOGE("Error when retrieving lost frames count from HAL: %d", result);
}
return mFramesOverrun;
}
ssize_t AudioStreamInSource::read(void *buffer, size_t count)
{
if (CC_UNLIKELY(!Format_isValid(mFormat))) {
return NEGOTIATE;
}
size_t bytesRead;
status_t result = mStream->read(buffer, count * mFrameSize, &bytesRead);
if (result == OK && bytesRead > 0) {
size_t framesRead = bytesRead / mFrameSize;
mFramesRead += framesRead;
return framesRead;
} else {
ALOGE_IF(result != OK, "Error while reading data from HAL: %d", result);
return bytesRead;
}
}
AudioStreamInSource 提供接口從 Audio HAL 獲得溢出數(shù)據(jù)的數(shù)量。AudioStreamInSource 的可寫入數(shù)據(jù)量總是 Audio HAL 流整個緩沖區(qū)的大小。
AudioStreamOutSink 是對 StreamOutHalInterface 的封裝,它通過后者寫入數(shù)據(jù),寫入的數(shù)據(jù)格式會按照從 StreamOutHalInterface 獲得的數(shù)據(jù)格式進行。AudioStreamOutSink 各個成員函數(shù)的定義 (位于 frameworks/av/media/libnbaio/AudioStreamOutSink.cpp) 如下:
AudioStreamOutSink::AudioStreamOutSink(sp<StreamOutHalInterface> stream) :
NBAIO_Sink(),
mStream(stream),
mStreamBufferSizeBytes(0)
{
ALOG_ASSERT(stream != 0);
}
AudioStreamOutSink::~AudioStreamOutSink()
{
mStream.clear();
}
ssize_t AudioStreamOutSink::negotiate(const NBAIO_Format offers[], size_t numOffers,
NBAIO_Format counterOffers[], size_t& numCounterOffers)
{
if (!Format_isValid(mFormat)) {
status_t result;
result = mStream->getBufferSize(&mStreamBufferSizeBytes);
if (result != OK) return result;
audio_config_base_t config = AUDIO_CONFIG_BASE_INITIALIZER;
result = mStream->getAudioProperties(&config);
if (result != OK) return result;
mFormat = Format_from_SR_C(config.sample_rate,
audio_channel_count_from_out_mask(config.channel_mask), config.format);
mFrameSize = Format_frameSize(mFormat);
}
return NBAIO_Sink::negotiate(offers, numOffers, counterOffers, numCounterOffers);
}
ssize_t AudioStreamOutSink::write(const void *buffer, size_t count)
{
if (!mNegotiated) {
return NEGOTIATE;
}
ALOG_ASSERT(Format_isValid(mFormat));
size_t written;
status_t ret = mStream->write(buffer, count * mFrameSize, &written);
if (ret == OK && written > 0) {
written /= mFrameSize;
mFramesWritten += written;
return written;
} else {
// FIXME verify HAL implementations are returning the correct error codes e.g. WOULD_BLOCK
ALOGE_IF(ret != OK, "Error while writing data to HAL: %d", ret);
return ret;
}
}
status_t AudioStreamOutSink::getTimestamp(ExtendedTimestamp ×tamp)
{
uint64_t position64;
struct timespec time;
if (mStream->getPresentationPosition(&position64, &time) != OK) {
return INVALID_OPERATION;
}
timestamp.mPosition[ExtendedTimestamp::LOCATION_KERNEL] = position64;
timestamp.mTimeNs[ExtendedTimestamp::LOCATION_KERNEL] = audio_utils_ns_from_timespec(&time);
return OK;
}
AudioStreamInSource 和 AudioStreamOutSink 提供了 negotiate() 操作,它主要用來完成一些初始化動作,并與使用者協(xié)商具體使用的音頻數(shù)據(jù)的格式。
Done.