Android 中 libnbaio 庫的設計和實現(xiàn)

Android 中的 libnbaio (Non-Blocking Audio I/O 的縮寫) 庫主要是為非阻塞的音頻 I/O 設計的,但現(xiàn)在它也包含了一些接口的阻塞實現(xiàn),因而它的主要作用也變成了為各種音頻 I/O 設施提供統(tǒng)一的讀寫接口。libnbaio 庫主要用在 audioflinger 和一些音頻 HAL 模塊的實現(xiàn)中。libnbaio 庫提供的主要組件是 PipeMonoPipe,其中 Pipe 支持單個寫者,和 N 個讀者,MonoPipe 支持單個寫者和單個讀者。

Pipe 支持的特性如下:

沒有互斥量,在 SCHED_NORMAL 和 SCHED_FIFO 的線程之間使用是安全的。

  • 寫入:

    • 非阻塞
    • 如果沒有足夠的數(shù)據(jù)則返回實際傳輸?shù)膸瑪?shù)
    • 如果消費數(shù)據(jù)的速度不夠塊,則覆蓋數(shù)據(jù)
  • 讀?。?/p>

    • 非阻塞
    • 如果沒有足夠的數(shù)據(jù)則返回實際傳輸?shù)膸瑪?shù)
    • 如果讀取速度跟不上,將會丟失數(shù)據(jù)

MonoPipe 支持的特性如下:

沒有互斥量,在 SCHED_NORMAL 和 SCHED_FIFO 的線程之間使用是安全的。

  • 寫入:

    • 是否阻塞是可選的
    • 如果配置為阻塞,則在返回前將等待,直到空間可用
    • 如果配置為非阻塞,則將返回實際傳輸?shù)膸瑪?shù),并且不會覆蓋數(shù)據(jù)
  • 讀?。?/p>

    • 非阻塞
    • 如果沒有足夠的數(shù)據(jù)則返回實際傳輸?shù)膸瑪?shù)
    • 從不丟失數(shù)據(jù)

這里看一下 libnbaio 庫的設計和實現(xiàn),代碼分析基于 android-12.1.0_r27 版進行。

libnbaio 庫提供的主要抽象是 NBAIO_SinkNBAIO_Source,它們分別用于寫入音頻數(shù)據(jù)和讀取音頻數(shù)據(jù)。libnbaio 庫各個部分的繼承層次結構如下圖:

NBAIO Objects

LibsndfileSourceLibsndfileSink 分別封裝了 libsndfileSFM_READSFM_WRITE 模式打開的文件來實現(xiàn) NBAIO_SourceNBAIO_Sink。這兩個組件,由于 libsndfile 許可證不兼容,默認不會被編進 Android 系統(tǒng)中。在整個 Android 系統(tǒng)中,找不到使用它們的地方,隨著 Android 系統(tǒng)的發(fā)展,它們大概已處于年久失修不可用的狀態(tài)。

AudioStreamInSourceAudioStreamOutSink 分別封裝了 Audio HAL 的 StreamInHalInterfaceStreamOutHalInterface 接口實現(xiàn) NBAIO_SourceNBAIO_Sink,它們不是多線程安全的。

PipePipeReader 配合使用,分別用于寫入音頻數(shù)據(jù)和讀取音頻數(shù)據(jù)。MonoPipeMonoPipeReader 配合使用,分別用于寫入音頻數(shù)據(jù)和讀取音頻數(shù)據(jù)。

SourceAudioBufferProvider 封裝 NBAIO_Source 并最終繼承自 AudioBufferProvider,它主要用于將 libnbaio 庫的那些組件接進 Android 系統(tǒng)的其它音頻模塊里,如混音器等。

MonoPipe、MonoPipeReader、PipePipeReader 實現(xiàn)相關組件的結構如下圖:

NBAIO Pipe

在基本結構方面,MonoPipeMonoPipeReaderPipePipeReader 基本相同,但它們分別實現(xiàn)不同的讀寫策略。Pipe 分配一塊內存來給 audio_utils_fifo 管理,用于交換數(shù)據(jù);讀寫數(shù)據(jù)最終都通過 audio_utils_fifo 進行;Pipe 包含一個 audio_utils_fifo_writer 對象,audio_utils_fifo_writer 對象持有對 audio_utils_fifo 對象的引用,audio_utils_fifo_writer 用于向 audio_utils_fifo 寫入音頻數(shù)據(jù);PipeReader 持有 Pipe 對象的引用,且包含一個 audio_utils_fifo_reader 對象;audio_utils_fifo_reader 對象持有對 Pipeaudio_utils_fifo 對象的引用,用于從 audio_utils_fifo 讀取音頻數(shù)據(jù);PipePipeReader 通過共用的 audio_utils_fifo 對象交換數(shù)據(jù),它們分別扮演音頻數(shù)據(jù)的寫者和讀者角色。MonoPipeMonoPipeReaderPipePipeReader 有著相同的基本結構和數(shù)據(jù)交換方式。從數(shù)據(jù)流的角度來看,這些組件有如下這樣的結構:

Data stream in pipe and pipe reader

來看一下 MonoPipeMonoPipeReaderPipePipeReader 等的具體實現(xiàn)。Pipe 類的定義 (位于 frameworks/av/media/libnbaio/include/media/nbaio/Pipe.h) 如下:

class Pipe : public NBAIO_Sink {

    friend class PipeReader;

public:
    // maxFrames will be rounded up to a power of 2, and all slots are available. Must be >= 2.
    // buffer is an optional parameter specifying the virtual address of the pipe buffer,
    // which must be of size roundup(maxFrames) * Format_frameSize(format) bytes.
    Pipe(size_t maxFrames, const NBAIO_Format& format, void *buffer = NULL);

    // If a buffer was specified in the constructor, it is not automatically freed by destructor.
    virtual ~Pipe();

    // NBAIO_Port interface

    //virtual ssize_t negotiate(const NBAIO_Format offers[], size_t numOffers,
    //                          NBAIO_Format counterOffers[], size_t& numCounterOffers);
    //virtual NBAIO_Format format() const;

    // NBAIO_Sink interface

    //virtual int64_t framesWritten() const;
    //virtual int64_t framesUnderrun() const;
    //virtual int64_t underruns() const;

    // The write side of a pipe permits overruns; flow control is the caller's responsibility.
    // It doesn't return +infinity because that would guarantee an overrun.
    virtual ssize_t availableToWrite() { return mMaxFrames; }

    virtual ssize_t write(const void *buffer, size_t count);
    //virtual ssize_t writeVia(writeVia_t via, size_t total, void *user, size_t block);

private:
    const size_t    mMaxFrames;     // always a power of 2
    void * const    mBuffer;
    audio_utils_fifo        mFifo;
    audio_utils_fifo_writer mFifoWriter;
    volatile int32_t mReaders;      // number of PipeReader clients currently attached to this Pipe
    const bool      mFreeBufferInDestructor;
};

Pipe 對象在構造時,需要傳入最大幀數(shù),音頻數(shù)據(jù)格式,和可選的一塊用于交換數(shù)據(jù)的緩沖區(qū)。最大幀數(shù)將被向上對齊到 2 的次冪。數(shù)據(jù)緩沖區(qū)的大小必須是 向上對齊之后的最大幀數(shù) roundup(maxFrames) * 幀大小 Format_frameSize(format) 個字節(jié)。Pipe 對象構造時,如果沒有傳分配好的內存塊進來,則會分配一塊內存,在 Pipe 對象析構時,釋放分配的內存。Pipe 的構造和析構函數(shù)定義 (位于 frameworks/av/media/libnbaio/Pipe.cpp) 如下:

Pipe::Pipe(size_t maxFrames, const NBAIO_Format& format, void *buffer) :
        NBAIO_Sink(format),
        // TODO fifo now supports non-power-of-2 buffer sizes, so could remove the roundup
        mMaxFrames(roundup(maxFrames)),
        mBuffer(buffer == NULL ? malloc(mMaxFrames * Format_frameSize(format)) : buffer),
        mFifo(mMaxFrames, Format_frameSize(format), mBuffer, false /*throttlesWriter*/),
        mFifoWriter(mFifo),
        mReaders(0),
        mFreeBufferInDestructor(buffer == NULL)
{
}

Pipe::~Pipe()
{
    ALOG_ASSERT(android_atomic_acquire_load(&mReaders) == 0);
    if (mFreeBufferInDestructor) {
        free(mBuffer);
    }
}

PipePipeReader 的讀寫操作的線程安全語義,主要由底層的 audio_utils_fifo_reader、audio_utils_fifo_writeraudio_utils_fifo 實現(xiàn)。Pipewrite() 函數(shù)實現(xiàn) (位于 frameworks/av/media/libnbaio/Pipe.cpp) 如下:

ssize_t Pipe::write(const void *buffer, size_t count)
{
    // count == 0 is unlikely and not worth checking for
    if (CC_UNLIKELY(!mNegotiated)) {
        return NEGOTIATE;
    }
    ssize_t actual = mFifoWriter.write(buffer, count);
    ALOG_ASSERT(actual <= count);
    if (actual <= 0) {
        return actual;
    }
    mFramesWritten += (size_t) actual;
    return actual;
}

用于從 Pipe 寫入數(shù)據(jù)的緩沖區(qū)中讀取數(shù)據(jù)的 PipeReader 類的定義 (位于 frameworks/av/media/libnbaio/include/media/nbaio/PipeReader.h) 如下:

class PipeReader : public NBAIO_Source {

public:

    // Construct a PipeReader and associate it with a Pipe
    // FIXME make this constructor a factory method of Pipe.
    PipeReader(Pipe& pipe);
    virtual ~PipeReader();

    // NBAIO_Port interface

    //virtual ssize_t negotiate(const NBAIO_Format offers[], size_t numOffers,
    //                          NBAIO_Format counterOffers[], size_t& numCounterOffers);
    //virtual NBAIO_Format format() const;

    // NBAIO_Source interface

    //virtual size_t framesRead() const;
    virtual int64_t framesOverrun() { return mFramesOverrun; }
    virtual int64_t overruns()  { return mOverruns; }

    virtual ssize_t availableToRead();

    virtual ssize_t read(void *buffer, size_t count);

    virtual ssize_t flush();

    // NBAIO_Source end

#if 0   // until necessary
    Pipe& pipe() const { return mPipe; }
#endif

private:
    Pipe&       mPipe;
    audio_utils_fifo_reader mFifoReader;
    int64_t     mFramesOverrun;
    int64_t     mOverruns;
};

PipeReader 只對單個線程是安全的。不同的線程可以通過不同的 PipeReader 對象讀取相同 Pipe 的數(shù)據(jù),但不同線程不能通過相同的 PipeReader 對象讀取數(shù)據(jù)。PipeReader 的各個成員函數(shù)定義 (位于 frameworks/av/media/libnbaio/PipeReader.cpp) 如下:

PipeReader::PipeReader(Pipe& pipe) :
        NBAIO_Source(pipe.mFormat),
        mPipe(pipe), mFifoReader(mPipe.mFifo, false /*throttlesWriter*/, false /*flush*/),
        mFramesOverrun(0),
        mOverruns(0)
{
    android_atomic_inc(&pipe.mReaders);
}

PipeReader::~PipeReader()
{
#if !LOG_NDEBUG
    int32_t readers =
#else
    (void)
#endif
            android_atomic_dec(&mPipe.mReaders);
    ALOG_ASSERT(readers > 0);
}

ssize_t PipeReader::availableToRead()
{
    if (CC_UNLIKELY(!mNegotiated)) {
        return NEGOTIATE;
    }
    size_t lost;
    ssize_t avail = mFifoReader.available(&lost);
    if (avail == -EOVERFLOW || lost > 0) {
        mFramesOverrun += lost;
        ++mOverruns;
        avail = OVERRUN;
    }
    return avail;
}

ssize_t PipeReader::read(void *buffer, size_t count)
{
    size_t lost;
    ssize_t actual = mFifoReader.read(buffer, count, NULL /*timeout*/, &lost);
    ALOG_ASSERT(actual <= count);
    if (actual == -EOVERFLOW || lost > 0) {
        mFramesOverrun += lost;
        ++mOverruns;
        actual = OVERRUN;
    }
    if (actual <= 0) {
        return actual;
    }
    mFramesRead += (size_t) actual;
    return actual;
}

ssize_t PipeReader::flush()
{
    if (CC_UNLIKELY(!mNegotiated)) {
        return NEGOTIATE;
    }
    size_t lost;
    ssize_t flushed = mFifoReader.flush(&lost);
    if (flushed == -EOVERFLOW || lost > 0) {
        mFramesOverrun += lost;
        ++mOverruns;
        flushed = OVERRUN;
    }
    if (flushed <= 0) {
        return flushed;
    }
    mFramesRead += (size_t) flushed;  // we consider flushed frames as read, but not lost frames
    return flushed;
}

PipeReader 的讀取操作發(fā)現(xiàn)寫入數(shù)據(jù)有溢出時,即使讀取到了一部分數(shù)據(jù),也會返回錯誤,它獲得可讀取數(shù)據(jù)量和 flush() 操作也有類似語義。

MonoPipe 的音頻數(shù)據(jù)寫入語義在其 write() 操作中實現(xiàn),該函數(shù)定義 (位于 frameworks/av/media/libnbaio/MonoPipe.cpp) 如下:

ssize_t MonoPipe::write(const void *buffer, size_t count)
{
    if (CC_UNLIKELY(!mNegotiated)) {
        return NEGOTIATE;
    }
    size_t totalFramesWritten = 0;
    while (count > 0) {
        ssize_t actual = mFifoWriter.write(buffer, count);
        ALOG_ASSERT(actual <= count);
        if (actual < 0) {
            if (totalFramesWritten == 0) {
                return actual;
            }
            break;
        }
        size_t written = (size_t) actual;
        totalFramesWritten += written;
        if (!mWriteCanBlock || mIsShutdown) {
            break;
        }
        count -= written;
        buffer = (char *) buffer + (written * mFrameSize);
        // TODO Replace this whole section by audio_util_fifo's setpoint feature.
        // Simulate blocking I/O by sleeping at different rates, depending on a throttle.
        // The throttle tries to keep the mean pipe depth near the setpoint, with a slight jitter.
        uint32_t ns;
        if (written > 0) {
            ssize_t avail = mFifoWriter.available();
            ALOG_ASSERT(avail <= mMaxFrames);
            if (avail < 0) {
                // don't return avail as status, because totalFramesWritten > 0
                break;
            }
            size_t filled = mMaxFrames - (size_t) avail;
            // FIXME cache these values to avoid re-computation
            if (filled <= mSetpoint / 2) {
                // pipe is (nearly) empty, fill quickly
                ns = written * ( 500000000 / Format_sampleRate(mFormat));
            } else if (filled <= (mSetpoint * 3) / 4) {
                // pipe is below setpoint, fill at slightly faster rate
                ns = written * ( 750000000 / Format_sampleRate(mFormat));
            } else if (filled <= (mSetpoint * 5) / 4) {
                // pipe is at setpoint, fill at nominal rate
                ns = written * (1000000000 / Format_sampleRate(mFormat));
            } else if (filled <= (mSetpoint * 3) / 2) {
                // pipe is above setpoint, fill at slightly slower rate
                ns = written * (1150000000 / Format_sampleRate(mFormat));
            } else if (filled <= (mSetpoint * 7) / 4) {
                // pipe is overflowing, fill slowly
                ns = written * (1350000000 / Format_sampleRate(mFormat));
            } else {
                // pipe is severely overflowing
                ns = written * (1750000000 / Format_sampleRate(mFormat));
            }
        } else {
            ns = count * (1350000000 / Format_sampleRate(mFormat));
        }
        if (ns > 999999999) {
            ns = 999999999;
        }
        struct timespec nowTs;
        bool nowTsValid = !clock_gettime(CLOCK_MONOTONIC, &nowTs);
        // deduct the elapsed time since previous write() completed
        if (nowTsValid && mWriteTsValid) {
            time_t sec = nowTs.tv_sec - mWriteTs.tv_sec;
            long nsec = nowTs.tv_nsec - mWriteTs.tv_nsec;
            ALOGE_IF(sec < 0 || (sec == 0 && nsec < 0),
                    "clock_gettime(CLOCK_MONOTONIC) failed: was %ld.%09ld but now %ld.%09ld",
                    mWriteTs.tv_sec, mWriteTs.tv_nsec, nowTs.tv_sec, nowTs.tv_nsec);
            if (nsec < 0) {
                --sec;
                nsec += 1000000000;
            }
            if (sec == 0) {
                if ((long) ns > nsec) {
                    ns -= nsec;
                } else {
                    ns = 0;
                }
            }
        }
        if (ns > 0) {
            const struct timespec req = {0, static_cast<long>(ns)};
            nanosleep(&req, NULL);
        }
        // record the time that this write() completed
        if (nowTsValid) {
            mWriteTs = nowTs;
            if ((mWriteTs.tv_nsec += ns) >= 1000000000) {
                mWriteTs.tv_nsec -= 1000000000;
                ++mWriteTs.tv_sec;
            }
        }
        mWriteTsValid = nowTsValid;
    }
    mFramesWritten += totalFramesWritten;
    return totalFramesWritten;
}

如果是同步寫入,沒有足夠的空間寫入全部數(shù)據(jù)時,會根據(jù)當前緩沖區(qū)中已經(jīng)寫入的數(shù)據(jù)的水位計算休眠時間,并休眠等待,否則返回實際寫入的音頻幀個數(shù)。MonoPipeReader 的讀取操作不關心寫入數(shù)據(jù)的溢出,相關操作實現(xiàn) (位于 frameworks/av/media/libnbaio/MonoPipeReader.cpp) 如下:

ssize_t MonoPipeReader::availableToRead()
{
    if (CC_UNLIKELY(!mNegotiated)) {
        return NEGOTIATE;
    }
    ssize_t ret = mFifoReader.available();
    ALOG_ASSERT(ret <= mPipe->mMaxFrames);
    return ret;
}

ssize_t MonoPipeReader::read(void *buffer, size_t count)
{
    // count == 0 is unlikely and not worth checking for explicitly; will be handled automatically
    ssize_t actual = mFifoReader.read(buffer, count);
    ALOG_ASSERT(actual <= count);
    if (CC_UNLIKELY(actual <= 0)) {
        return actual;
    }
    mFramesRead += (size_t) actual;
    return actual;
}

AudioStreamInSource 是對 StreamInHalInterface 的封裝,它通過后者讀取數(shù)據(jù),讀取的數(shù)據(jù)格式會按照從 StreamInHalInterface 獲得的數(shù)據(jù)格式進行。AudioStreamInSource 各個成員函數(shù)的定義 (位于 frameworks/av/media/libnbaio/AudioStreamInSource.cpp) 如下:

AudioStreamInSource::AudioStreamInSource(sp<StreamInHalInterface> stream) :
        NBAIO_Source(),
        mStream(stream),
        mStreamBufferSizeBytes(0),
        mFramesOverrun(0),
        mOverruns(0)
{
    ALOG_ASSERT(stream != 0);
}

AudioStreamInSource::~AudioStreamInSource()
{
    mStream.clear();
}

ssize_t AudioStreamInSource::negotiate(const NBAIO_Format offers[], size_t numOffers,
                                      NBAIO_Format counterOffers[], size_t& numCounterOffers)
{
    if (!Format_isValid(mFormat)) {
        status_t result;
        result = mStream->getBufferSize(&mStreamBufferSizeBytes);
        if (result != OK) return result;
        audio_config_base_t config = AUDIO_CONFIG_BASE_INITIALIZER;
        result = mStream->getAudioProperties(&config);
        if (result != OK) return result;
        mFormat = Format_from_SR_C(config.sample_rate,
                audio_channel_count_from_in_mask(config.channel_mask), config.format);
        mFrameSize = Format_frameSize(mFormat);
    }
    return NBAIO_Source::negotiate(offers, numOffers, counterOffers, numCounterOffers);
}

int64_t AudioStreamInSource::framesOverrun()
{
    uint32_t framesOverrun;
    status_t result = mStream->getInputFramesLost(&framesOverrun);
    if (result == OK && framesOverrun > 0) {
        mFramesOverrun += framesOverrun;
        // FIXME only increment for contiguous ranges
        ++mOverruns;
    } else if (result != OK) {
        ALOGE("Error when retrieving lost frames count from HAL: %d", result);
    }
    return mFramesOverrun;
}

ssize_t AudioStreamInSource::read(void *buffer, size_t count)
{
    if (CC_UNLIKELY(!Format_isValid(mFormat))) {
        return NEGOTIATE;
    }
    size_t bytesRead;
    status_t result = mStream->read(buffer, count * mFrameSize, &bytesRead);
    if (result == OK && bytesRead > 0) {
        size_t framesRead = bytesRead / mFrameSize;
        mFramesRead += framesRead;
        return framesRead;
    } else {
        ALOGE_IF(result != OK, "Error while reading data from HAL: %d", result);
        return bytesRead;
    }
}

AudioStreamInSource 提供接口從 Audio HAL 獲得溢出數(shù)據(jù)的數(shù)量。AudioStreamInSource 的可寫入數(shù)據(jù)量總是 Audio HAL 流整個緩沖區(qū)的大小。

AudioStreamOutSink 是對 StreamOutHalInterface 的封裝,它通過后者寫入數(shù)據(jù),寫入的數(shù)據(jù)格式會按照從 StreamOutHalInterface 獲得的數(shù)據(jù)格式進行。AudioStreamOutSink 各個成員函數(shù)的定義 (位于 frameworks/av/media/libnbaio/AudioStreamOutSink.cpp) 如下:

AudioStreamOutSink::AudioStreamOutSink(sp<StreamOutHalInterface> stream) :
        NBAIO_Sink(),
        mStream(stream),
        mStreamBufferSizeBytes(0)
{
    ALOG_ASSERT(stream != 0);
}

AudioStreamOutSink::~AudioStreamOutSink()
{
    mStream.clear();
}

ssize_t AudioStreamOutSink::negotiate(const NBAIO_Format offers[], size_t numOffers,
                                      NBAIO_Format counterOffers[], size_t& numCounterOffers)
{
    if (!Format_isValid(mFormat)) {
        status_t result;
        result = mStream->getBufferSize(&mStreamBufferSizeBytes);
        if (result != OK) return result;
        audio_config_base_t config = AUDIO_CONFIG_BASE_INITIALIZER;
        result = mStream->getAudioProperties(&config);
        if (result != OK) return result;
        mFormat = Format_from_SR_C(config.sample_rate,
                audio_channel_count_from_out_mask(config.channel_mask), config.format);
        mFrameSize = Format_frameSize(mFormat);
    }
    return NBAIO_Sink::negotiate(offers, numOffers, counterOffers, numCounterOffers);
}

ssize_t AudioStreamOutSink::write(const void *buffer, size_t count)
{
    if (!mNegotiated) {
        return NEGOTIATE;
    }
    ALOG_ASSERT(Format_isValid(mFormat));
    size_t written;
    status_t ret = mStream->write(buffer, count * mFrameSize, &written);
    if (ret == OK && written > 0) {
        written /= mFrameSize;
        mFramesWritten += written;
        return written;
    } else {
        // FIXME verify HAL implementations are returning the correct error codes e.g. WOULD_BLOCK
        ALOGE_IF(ret != OK, "Error while writing data to HAL: %d", ret);
        return ret;
    }
}

status_t AudioStreamOutSink::getTimestamp(ExtendedTimestamp &timestamp)
{
    uint64_t position64;
    struct timespec time;
    if (mStream->getPresentationPosition(&position64, &time) != OK) {
        return INVALID_OPERATION;
    }
    timestamp.mPosition[ExtendedTimestamp::LOCATION_KERNEL] = position64;
    timestamp.mTimeNs[ExtendedTimestamp::LOCATION_KERNEL] = audio_utils_ns_from_timespec(&time);
    return OK;
}

AudioStreamInSourceAudioStreamOutSink 提供了 negotiate() 操作,它主要用來完成一些初始化動作,并與使用者協(xié)商具體使用的音頻數(shù)據(jù)的格式。

Done.

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容