PulseAudio 設(shè)計和實現(xiàn)淺析

PulseAudio 是一個 POSIX 操作系統(tǒng)的音頻服務(wù)器系統(tǒng),它是我們的音頻應用程序訪問系統(tǒng)音頻設(shè)備的代理。它是所有相關(guān)的現(xiàn)代 Linux 發(fā)行版的組成部分,并被許多供應商用在了各種各樣的移動設(shè)備中。在應用程序和硬件設(shè)備間傳遞音頻數(shù)據(jù)時,它可以對音頻數(shù)據(jù)執(zhí)行一些高級操作。比如,把音頻數(shù)據(jù)傳給不同的機器,轉(zhuǎn)換采樣格式或通道數(shù),或者混音多路音頻到一路輸入/輸出,這些用 PulseAudio 實現(xiàn)都很簡單。PulseAudio 主頁

很多學習了編程語言,操作系統(tǒng),計算機網(wǎng)絡(luò)和標準庫及操作系統(tǒng) API 的同學都會受到一些問題的困擾,感覺自己好像已經(jīng)學習了不少知識,讀過的相關(guān)書籍可能都有一人高,但這些知識究竟能用來做些什么,基于這些知識怎樣才能實現(xiàn)有一定價值的應用,應用程序究竟是如何訪問硬件設(shè)備的,如音頻設(shè)備、視頻設(shè)備等等等,對于這些問題感到迷惑不解。另一方面,面對自己日常中使用的工具或軟件,如音視頻播放錄制等,對于它們的實現(xiàn)完全沒有思路。

實際上許多教材對于實現(xiàn)有價值的應用程序所需知識的介紹,由于種種原因,有所不足或缺失。如許多操作系統(tǒng)教材,可能會把重心放在基礎(chǔ)理論和算法的介紹,以使教材具有長久的價值,而不會介紹時下廣泛使用的操作系統(tǒng)的特性,即使讀了不少書,在面對實際的操作系統(tǒng)時也會讓人感到陌生。各種各樣介紹 Linux 系統(tǒng) API 的書,可能追蹤操作系統(tǒng)的發(fā)展腳步不夠緊密,也或者有意無意地會忽略一些比較高級的特性。此外,應用程序特定的領(lǐng)域知識在實現(xiàn)應用程序時必不可少,這部分知識常常是大多數(shù)教材所缺少的。

通過了解 PulseAudio 系統(tǒng)的設(shè)計和實現(xiàn),可以讓我們看到許許多多 POSIX 系統(tǒng),特別是 Linux 系統(tǒng)平臺下,開發(fā)一個實用的軟件,所需要了解的許許多多跟系統(tǒng)相關(guān)的點,如基于 udev 的設(shè)備發(fā)現(xiàn)和設(shè)備狀態(tài)變化監(jiān)聽,各種各樣的進程間通信方式,如基于 IPv4 和 IPv6 的 TCP,Unix 域 socket,D-Bus、共享內(nèi)存等,訪問音頻設(shè)備的 alsa 架構(gòu)等。

這里簡單看一下 PulseAudio 的設(shè)計和實現(xiàn)。

PulseAudio 的代碼下載及編譯

編譯出調(diào)試版代碼,并在調(diào)試器中把代碼運行起來,可以為我們研究一個軟件項目提供極大的便利。我們在 Ubuntu 20.04 上編譯 PulseAudio 并把它跑起來。

我們可以通過 Git 下載代碼,當前 PulseAudio 最新發(fā)布的版本為 14.2,我們切換到這個分支:

$ git clone https://github.com/pulseaudio/pulseaudio.git
$ cd pulseaudio
$ git checkout -t origin/stable-14.x

PulseAudio 是一個通過 meson + ninja 來構(gòu)建的工程,更多關(guān)于 meson 構(gòu)建系統(tǒng)的信息,可以參考 meson 主頁。在開始構(gòu)建 PulseAudio 前,需要先下載 meson:

$ sudo apt install meson

還需要下載 PulseAudio 本身及測試用例編譯依賴的一些庫:

$ sudo apt-get install libltdl-dev libsndfile1-dev cmake libsbc-dev check libbluetooth-dev  libtdb-dev  libavahi-client-dev libdbus-1-dev libdbus-c++-dev doxygen libglib2.0-dev libasound2-dev libx11-dev libssl-dev

編譯 PulseAudio

$ cd pulseaudio
$ mkdir build
$ cd build/
$ meson ..
$ ninja -C .
. . . . . . . . . 
    Database:                      tdb
    Legacy Database Entry Support: true
    module-stream-restore:
      Clear old devices:           false
    Running from build tree:       true
    System User:                   pulse
    System Group:                  pulse
    Access Group:                  pulse-access
meson.build:899: WARNING: 
You do not have speex support enabled. It is strongly recommended
that you enable speex support if your platform supports it as it is
the primary method used for audio resampling and is thus a critical
part of PulseAudio on that platform.
Build targets in project: 167

Found ninja-1.8.2 at ~/bin/depot_tools/ninja

PulseAudio 是 C/S 架構(gòu)的,系統(tǒng)中有一個它的服務(wù)守護進程 pulseaudio,各個需要播放或采集音頻數(shù)據(jù)的應用程序作為它的客戶端,通過進程間通信機制與 pulseaudio 服務(wù)傳遞消息或交換數(shù)據(jù)。pulseaudio 服務(wù)守護進程默認已經(jīng)運行在了 Ubuntu 20.04 操作系統(tǒng)中。為了能夠運行我們編譯出來的 pulseaudio 服務(wù),需要先暫停系統(tǒng)中已經(jīng)存在的那個:

systemctl --user stop pulseaudio.socket
systemctl --user stop pulseaudio.service

注意,執(zhí)行這些命令不需要超級用戶權(quán)限 sudo。

調(diào)試完成之后,還可以通過如下命令重新啟動系統(tǒng)的 pulseaudio 服務(wù):

systemctl --user start pulseaudio.socket
systemctl --user start pulseaudio.service

通過如下命令,可以將 pulseaudio 服務(wù)運行起來:

~/data/opensource/pulseaudio$ build/src/daemon/pulseaudio -n -F build/src/daemon/default.pa -p $(pwd)/build/src/modules/ 
W: [pulseaudio] caps.c: Normally all extra capabilities would be dropped now, but that's impossible because PulseAudio was built without capabilities support.
N: [pulseaudio] daemon-conf.c: Detected that we are run from the build tree, fixing search path.
N: [pulseaudio] alsa-util.c: Disabling timer-based scheduling because running inside a VM.
E: [alsa-sink-ES1371/1] alsa-sink.c: ALSA 提醒我們在該設(shè)備中寫入新數(shù)據(jù),但實際上沒有什么可以寫入的!
E: [alsa-sink-ES1371/1] alsa-sink.c: 這很可能是 ALSA 驅(qū)動程序 'snd_ens1371' 中的一個 bug。請向 ALSA 開發(fā)人員報告這個問題。
E: [alsa-sink-ES1371/1] alsa-sink.c: 我們因 POLLOUT 被設(shè)置而喚醒 -- 但結(jié)果是 snd_pcm_avail() 返回 0 或者另一個小于最小可用值的數(shù)值。
N: [pulseaudio] alsa-util.c: Disabling timer-based scheduling because running inside a VM.
E: [pulseaudio] stdin-util.c: Unable to read or parse data from client.
E: [pulseaudio] module.c: Failed to load module "module-gsettings" (argument: ""): initialization failed.

PulseAudio 客戶端 API 的基本使用

pulse audio 工程有許多測試用例,可以用于幫助我們了解這個工程設(shè)計與實現(xiàn)。其中許多測試用例是單元測試,主要用于測試 pulse audio 基礎(chǔ)組件的 API,但也有一些測試用例會跑完整的錄制或播放流程,可以讓我們聽到通過 pulse audio 播放出來的音頻數(shù)據(jù),這些測試用例包括 lo-latency-test (pulseaudio/src/tests/lo-latency-test.c----pulseaudio/build/src/tests/lo-latency-test) 和 sync-playback (pulseaudio/src/tests/sync-playback.c----pulseaudio/build/src/tests/sync-playback) 等。

這里我們通過觀察 sync-playback 和 pulseaudio 服務(wù)的交互來了解 PulseAudio 的實現(xiàn)。

sync-playback 的代碼如下:

#include <pulse/pulseaudio.h>
#include <pulse/mainloop.h>
#include <pulsecore/macro.h>

#define NSTREAMS 4
#define SINE_HZ 440
#define SAMPLE_HZ 8000

static pa_context *context = NULL;
static pa_stream *streams[NSTREAMS];
static pa_mainloop_api *mainloop_api = NULL;
static const char *bname = NULL;

static float data[SAMPLE_HZ]; /* one second space */

static int n_streams_ready = 0;

static const pa_sample_spec sample_spec = {
    .format = PA_SAMPLE_FLOAT32,
    .rate = SAMPLE_HZ,
    .channels = 1
};

static const pa_buffer_attr buffer_attr = {
    .maxlength = SAMPLE_HZ*sizeof(float)*NSTREAMS, /* exactly space for the entire play time */
    .tlength = (uint32_t) -1,
    .prebuf = 0, /* Setting prebuf to 0 guarantees us the streams will run synchronously, no matter what */
    .minreq = (uint32_t) -1,
    .fragsize = 0
};

static void nop_free_cb(void *p) {}

static void underflow_cb(struct pa_stream *s, void *userdata) {
    int i = PA_PTR_TO_INT(userdata);

    fprintf(stderr, "Stream %i finished\n", i);

    if (++n_streams_ready >= 2*NSTREAMS) {
        fprintf(stderr, "We're done\n");
        mainloop_api->quit(mainloop_api, 0);
    }
}

/* This routine is called whenever the stream state changes */
static void stream_state_callback(pa_stream *s, void *userdata) {
    fail_unless(s != NULL);

    switch (pa_stream_get_state(s)) {
        case PA_STREAM_UNCONNECTED:
        case PA_STREAM_CREATING:
        case PA_STREAM_TERMINATED:
            break;

        case PA_STREAM_READY: {

            int r, i = PA_PTR_TO_INT(userdata);

            fprintf(stderr, "Writing data to stream %i.\n", i);

            r = pa_stream_write(s, data, sizeof(data), nop_free_cb, (int64_t) sizeof(data) * (int64_t) i, PA_SEEK_ABSOLUTE);
            fail_unless(r == 0);

            /* Be notified when this stream is drained */
            pa_stream_set_underflow_callback(s, underflow_cb, userdata);

            /* All streams have been set up, let's go! */
            if (++n_streams_ready >= NSTREAMS) {
                fprintf(stderr, "Uncorking\n");
                pa_operation_unref(pa_stream_cork(s, 0, NULL, NULL));
            }

            break;
        }

        default:
        case PA_STREAM_FAILED:
            fprintf(stderr, "Stream error: %s\n", pa_strerror(pa_context_errno(pa_stream_get_context(s))));
            ck_abort();
    }
}

/* This is called whenever the context status changes */
static void context_state_callback(pa_context *c, void *userdata) {
    fail_unless(c != NULL);

    switch (pa_context_get_state(c)) {
        case PA_CONTEXT_CONNECTING:
        case PA_CONTEXT_AUTHORIZING:
        case PA_CONTEXT_SETTING_NAME:
            break;

        case PA_CONTEXT_READY: {

            int i;
            fprintf(stderr, "Connection established.\n");

            for (i = 0; i < NSTREAMS; i++) {
                char name[64];

                fprintf(stderr, "Creating stream %i\n", i);

                snprintf(name, sizeof(name), "stream #%i", i);

                streams[i] = pa_stream_new(c, name, &sample_spec, NULL);
                fail_unless(streams[i] != NULL);
                pa_stream_set_state_callback(streams[i], stream_state_callback, PA_INT_TO_PTR(i));
                pa_stream_connect_playback(streams[i], NULL, &buffer_attr, PA_STREAM_START_CORKED, NULL, i == 0 ? NULL : streams[0]);
            }

            break;
        }

        case PA_CONTEXT_TERMINATED:
            mainloop_api->quit(mainloop_api, 0);
            break;

        case PA_CONTEXT_FAILED:
        default:
            fprintf(stderr, "Context error: %s\n", pa_strerror(pa_context_errno(c)));
            ck_abort();
    }
}

START_TEST (sync_playback_test) {
    pa_mainloop* m = NULL;
    int i, ret = 0;

    for (i = 0; i < SAMPLE_HZ; i++)
        data[i] = (float) sin(((double) i/SAMPLE_HZ)*2*M_PI*SINE_HZ)/2;

    for (i = 0; i < NSTREAMS; i++)
        streams[i] = NULL;

    /* Set up a new main loop */
    m = pa_mainloop_new();
    fail_unless(m != NULL);

    mainloop_api = pa_mainloop_get_api(m);

    context = pa_context_new(mainloop_api, bname);
    fail_unless(context != NULL);

    pa_context_set_state_callback(context, context_state_callback, NULL);

    /* Connect the context */
    if (pa_context_connect(context, NULL, 0, NULL) < 0) {
        fprintf(stderr, "pa_context_connect() failed.\n");
        goto quit;
    }

    if (pa_mainloop_run(m, &ret) < 0)
        fprintf(stderr, "pa_mainloop_run() failed.\n");

quit:
    pa_context_unref(context);

    for (i = 0; i < NSTREAMS; i++)
        if (streams[i])
            pa_stream_unref(streams[i]);

    pa_mainloop_free(m);

    fail_unless(ret == 0);
}
END_TEST

PulseAudio 客戶端播放 API 的使用方法大體如下:

  1. 創(chuàng)建 pa_mainloop 對象;
  2. pa_mainloop 對象獲得 pa_mainloop_api 對象;
  3. 創(chuàng)建 pa_context 對象;
  4. pa_context 對象設(shè)置狀態(tài)變化回調(diào)函數(shù);
  5. 連接 pa_context
  6. 運行 pa_mainloop;
  7. pa_context 狀態(tài)變化時,會有事件回來,事件會在 pa_mainloop 中處理,當 pa_context 狀態(tài)為 PA_CONTEXT_READY 時,則創(chuàng)建流,為流設(shè)置狀態(tài)變化回調(diào),并連接播放流;
  8. 流狀態(tài)變化時,會有事件回來,事件會在 pa_mainloop 中處理,當流 pa_stream 狀態(tài)為 PA_STREAM_READY 時,向流寫入數(shù)據(jù);
  9. 播放結(jié)束之后,銷毀 pa_context,銷毀創(chuàng)建的流,銷毀 pa_mainloop。

PulseAudio 的設(shè)計和實現(xiàn)

從進程角度看,pulseaudio 是C/S架構(gòu)的。pulseaudio 有一個常駐系統(tǒng)的守護進程,它接收音頻相關(guān)的操作命令,管理音頻設(shè)備,實現(xiàn)與音頻設(shè)備間的數(shù)據(jù)交換等。各種接入 pulseaudio 庫,通過 pulseaudio API 訪問音頻設(shè)備的應用是 pulseaudio 服務(wù)的客戶端,它們通過 pulseaudio 服務(wù)向音頻播放設(shè)備寫入數(shù)據(jù)將聲音播放出來,通過 pulseaudio 服務(wù)讀取錄制設(shè)備已采集的音頻數(shù)據(jù)。pulseaudio 服務(wù)和應用程序間通過跨進程通信機制傳消息遞數(shù)據(jù)和消息。

pulseaudio 服務(wù)和它的客戶端通過多種機制實現(xiàn)進程間的命令和數(shù)據(jù)傳遞。對于命令的傳遞,可以通過 UNIX 域 socket,IPv4 socket,IP v6 socket,Linux 系統(tǒng)的 D-Bus 等;對于大塊的音頻 PCM 數(shù)據(jù),則通過共享內(nèi)存?zhèn)鬟f。

這里看一下 PulseAudio 數(shù)據(jù)傳輸?shù)幕具^程。

  1. 發(fā)生在 pulseaudio 服務(wù)進程中的 socket 監(jiān)聽
#0  pa_socket_server_new_unix ()
    at ../src/pulsecore/socket-server.c:175
#1  module_native_protocol_unix_LTX_pa__init () at ../src/modules/module-protocol-stub.c:330
#2  pa_module_load ()
    at ../src/pulsecore/module.c:191
#3  pa_cli_command_load ()
    at ../src/pulsecore/cli-command.c:437
#4  pa_cli_command_execute_line_stateful () at ../src/pulsecore/cli-command.c:2141
#5  pa_cli_command_execute_file_stream ()
    at ../src/pulsecore/cli-command.c:2181
#6  pa_cli_command_execute_file ()
    at ../src/pulsecore/cli-command.c:2212
#7  pa_cli_command_execute_line_stateful () at ../src/pulsecore/cli-command.c:2106
#8  pa_cli_command_execute ()
    at ../src/pulsecore/cli-command.c:2238
#9  main () at ../src/daemon/main.c:1112

pulseaudio 服務(wù)在進程啟動時,會執(zhí)行傳入的配置文件 build/src/daemon/default.pa 中的命令。如配置文件中的 load-module module-native-protocol-unix 行會使得 pulseaudio 服務(wù)加載 module-native-protocol-unix 模塊,在 build/src/daemon/default.pa 中,這是加載的僅有的一個 module-native-protocol-* 模塊,也是加載的僅有的一個 *-protocol-* 模塊,如:

### Load several protocols
.ifexists module-esound-protocol-unix.so
load-module module-esound-protocol-unix
.endif
load-module module-native-protocol-unix

### Network access (may be configured with paprefs, so leave this commented
### here if you plan to use paprefs)
#load-module module-esound-protocol-tcp
#load-module module-native-protocol-tcp
#load-module module-zeroconf-publish

pulseaudio/src/modules/meson.build 文件中,可以看到這個模塊包含哪些源碼文件,以及它是如何編譯出來的:

  [ 'module-cli-protocol-tcp', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_CLI', '-DUSE_TCP_SOCKETS'], [], libprotocol_cli ],
  [ 'module-cli-protocol-unix', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_CLI', '-DUSE_UNIX_SOCKETS'], [], libprotocol_cli ],
 . . . . . .
  [ 'module-http-protocol-tcp', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_HTTP', '-DUSE_TCP_SOCKETS'], [], libprotocol_http ],
  [ 'module-http-protocol-unix', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_HTTP', '-DUSE_UNIX_SOCKETS'], [], libprotocol_http ],
 . . . . . .
  [ 'module-native-protocol-fd', 'module-native-protocol-fd.c', [], [], [], libprotocol_native ],
  [ 'module-native-protocol-tcp', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_NATIVE', '-DUSE_TCP_SOCKETS'], [], libprotocol_native ],
  [ 'module-native-protocol-unix', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_NATIVE', '-DUSE_UNIX_SOCKETS'], [], libprotocol_native ],
 . . . . . .
  [ 'module-simple-protocol-tcp', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_SIMPLE', '-DUSE_TCP_SOCKETS'], [], libprotocol_simple ],
  [ 'module-simple-protocol-unix', 'module-protocol-stub.c', [], ['-DUSE_PROTOCOL_SIMPLE', '-DUSE_UNIX_SOCKETS'], [], libprotocol_simple ],

module-native-protocol-unix 等多個模塊,通過加上不同的宏,及對不同庫的依賴,由同一個源文件編譯而來。pulseaudio/src/modules/module-protocol-stub.c 文件中的初始化函數(shù)定義如下:

int pa__init(pa_module*m) {
    pa_modargs *ma = NULL;
    struct userdata *u = NULL;

#if defined(USE_TCP_SOCKETS)
    uint32_t port = IPV4_PORT;
    bool port_fallback = true;
    const char *listen_on;
#else
    int r;
#endif

#if defined(USE_PROTOCOL_NATIVE) || defined(USE_PROTOCOL_HTTP)
    char t[256];
#endif

    pa_assert(m);

    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
        pa_log("Failed to parse module arguments");
        goto fail;
    }

    m->userdata = u = pa_xnew0(struct userdata, 1);
    u->module = m;

#if defined(USE_PROTOCOL_SIMPLE)
    u->simple_protocol = pa_simple_protocol_get(m->core);

    u->simple_options = pa_simple_options_new();
    if (pa_simple_options_parse(u->simple_options, m->core, ma) < 0)
        goto fail;
    u->simple_options->module = m;
#elif defined(USE_PROTOCOL_CLI)
    u->cli_protocol = pa_cli_protocol_get(m->core);
#elif defined(USE_PROTOCOL_HTTP)
    u->http_protocol = pa_http_protocol_get(m->core);
#elif defined(USE_PROTOCOL_NATIVE)
    u->native_protocol = pa_native_protocol_get(m->core);

    u->native_options = pa_native_options_new();
    if (pa_native_options_parse(u->native_options, m->core, ma) < 0)
        goto fail;
    u->native_options->module = m;
#else
    u->esound_protocol = pa_esound_protocol_get(m->core);

    u->esound_options = pa_esound_options_new();
    if (pa_esound_options_parse(u->esound_options, m->core, ma) < 0)
        goto fail;
    u->esound_options->module = m;
#endif

#if defined(USE_TCP_SOCKETS)

    if (pa_in_system_mode() || pa_modargs_get_value(ma, "port", NULL))
        port_fallback = false;

    if (pa_modargs_get_value_u32(ma, "port", &port) < 0 || port < 1 || port > 0xFFFF) {
        pa_log("port= expects a numerical argument between 1 and 65535.");
        goto fail;
    }

    listen_on = pa_modargs_get_value(ma, "listen", NULL);

    if (listen_on) {
#  ifdef HAVE_IPV6
        u->socket_server_ipv6 = pa_socket_server_new_ipv6_string(m->core->mainloop, listen_on, (uint16_t) port, port_fallback, TCPWRAP_SERVICE);
#  endif
        u->socket_server_ipv4 = pa_socket_server_new_ipv4_string(m->core->mainloop, listen_on, (uint16_t) port, port_fallback, TCPWRAP_SERVICE);
    } else {
#  ifdef HAVE_IPV6
        u->socket_server_ipv6 = pa_socket_server_new_ipv6_any(m->core->mainloop, (uint16_t) port, port_fallback, TCPWRAP_SERVICE);
#  endif
        u->socket_server_ipv4 = pa_socket_server_new_ipv4_any(m->core->mainloop, (uint16_t) port, port_fallback, TCPWRAP_SERVICE);
    }

#  ifdef HAVE_IPV6
    if (!u->socket_server_ipv4 && !u->socket_server_ipv6)
#  else
    if (!u->socket_server_ipv4)
#  endif
        goto fail;

    if (u->socket_server_ipv4)
        pa_socket_server_set_callback(u->socket_server_ipv4, socket_server_on_connection_cb, u);
#  ifdef HAVE_IPV6
    if (u->socket_server_ipv6)
        pa_socket_server_set_callback(u->socket_server_ipv6, socket_server_on_connection_cb, u);
#  endif

#else

#  if defined(USE_PROTOCOL_ESOUND)

    /* Windows doesn't support getuid(), so we ignore the per-user Esound socket compile flag.
     * Moreover, Esound Unix sockets haven't been supported on Windows historically. */
#    if defined(USE_PER_USER_ESOUND_SOCKET) && !defined(OS_IS_WIN32)
    u->socket_path = pa_sprintf_malloc("/tmp/.esd-%lu/socket", (unsigned long) getuid());
#    else
    u->socket_path = pa_xstrdup("/tmp/.esd/socket");
#    endif

    /* This socket doesn't reside in our own runtime dir but in
     * /tmp/.esd/, hence we have to create the dir first */

    if (pa_make_secure_parent_dir(u->socket_path, pa_in_system_mode() ? 0755U : 0700U, (uid_t)-1, (gid_t)-1, false) < 0) {
        pa_log("Failed to create socket directory '%s': %s\n", u->socket_path, pa_cstrerror(errno));
        goto fail;
    }

#  else
    if (!(u->socket_path = pa_runtime_path(pa_modargs_get_value(ma, "socket", UNIX_SOCKET)))) {
        pa_log("Failed to generate socket path.");
        goto fail;
    }
#  endif

    if ((r = pa_unix_socket_remove_stale(u->socket_path)) < 0) {
        pa_log("Failed to remove stale UNIX socket '%s': %s", u->socket_path, pa_cstrerror(errno));
        goto fail;
    } else if (r > 0)
        pa_log_info("Removed stale UNIX socket '%s'.", u->socket_path);

    if (!(u->socket_server_unix = pa_socket_server_new_unix(m->core->mainloop, u->socket_path)))
        goto fail;

    pa_socket_server_set_callback(u->socket_server_unix, socket_server_on_connection_cb, u);

#endif

#if defined(USE_PROTOCOL_NATIVE)
#  if defined(USE_TCP_SOCKETS)
    if (u->socket_server_ipv4)
        if (pa_socket_server_get_address(u->socket_server_ipv4, t, sizeof(t)))
            pa_native_protocol_add_server_string(u->native_protocol, t);

#    ifdef HAVE_IPV6
    if (u->socket_server_ipv6)
        if (pa_socket_server_get_address(u->socket_server_ipv6, t, sizeof(t)))
            pa_native_protocol_add_server_string(u->native_protocol, t);
#    endif
#  else
    if (pa_socket_server_get_address(u->socket_server_unix, t, sizeof(t)))
        pa_native_protocol_add_server_string(u->native_protocol, t);

#  endif
#endif

#if defined(USE_PROTOCOL_HTTP)
#if defined(USE_TCP_SOCKETS)
    if (u->socket_server_ipv4)
        if (pa_socket_server_get_address(u->socket_server_ipv4, t, sizeof(t)))
            pa_http_protocol_add_server_string(u->http_protocol, t);

#ifdef HAVE_IPV6
    if (u->socket_server_ipv6)
        if (pa_socket_server_get_address(u->socket_server_ipv6, t, sizeof(t)))
            pa_http_protocol_add_server_string(u->http_protocol, t);
#endif /* HAVE_IPV6 */
#else /* USE_TCP_SOCKETS */
    if (pa_socket_server_get_address(u->socket_server_unix, t, sizeof(t)))
        pa_http_protocol_add_server_string(u->http_protocol, t);

#endif /* USE_TCP_SOCKETS */
#endif /* USE_PROTOCOL_HTTP */

    if (ma)
        pa_modargs_free(ma);

    return 0;

fail:

    if (ma)
        pa_modargs_free(ma);

    pa__done(m);

    return -1;
}

對于命令傳遞,PusleAudio 支持五種協(xié)議,包括 PROTOCOL_SIMPLE、PROTOCOL_CLI、PROTOCOL_HTTP、PROTOCOL_NATIVE 和 PROTOCOL_ESOUND,在協(xié)議之下,支持 IPv4、IPv6 和 Unix 域 socket 等三種傳輸通道。

加載 module-native-protocol-unix 模塊的過程中,會啟動監(jiān)聽路徑為 /run/user/1000/pulse/native 的 Unix 域 socket,詳細的調(diào)用路徑如上面的堆棧信息所示。module-native-protocol-unix 模塊會設(shè)置連接建立等事件發(fā)生時的回調(diào)函數(shù) socket_server_on_connection_cb() 等。Unix 域 socket 由模塊 module-native-protocol-unix 管理。

  1. 客戶端建立連接
#0  pa_socket_client_new_unix ()
    at ../src/pulsecore/socket-client.c:227
#1  pa_socket_client_new_string () at ../src/pulsecore/socket-client.c:459
#2  try_next_connection () at ../src/pulse/context.c:884
#3  pa_context_connect () at ../src/pulse/context.c:1046
#4  sync_playback_test () at ../src/tests/sync-playback.c:165
#5  main () at ../src/tests/sync-playback.c:184

客戶端應用程序,在調(diào)用 pa_context_connect() 接口時,會去連接 pulseaudio 服務(wù)監(jiān)聽的 Unix 域 socket。

  1. pulseaudio 服務(wù)將命令處理程序和 IO 通道關(guān)聯(lián)

pulseaudio 服務(wù)在收到客戶端的連接時,socket-server 在其回調(diào)中,根據(jù)新連接的文件描述符創(chuàng)建 pa_iochannel,并執(zhí)行回調(diào) socket_server_on_connection_cb(),這個回調(diào)通過 pa_native_protocol_connect() 函數(shù)為連接創(chuàng)建所需的資源,如 pa_client,pa_native_connection 和流 pa_pstream 等,為流 pa_pstream 設(shè)置各種事件的回調(diào),并將命令處理程序關(guān)聯(lián)到流上:

#0  pa_native_protocol_connect () at ../src/pulsecore/protocol-native.c:5194
#1  socket_server_on_connection_cb ()
    at ../src/modules/module-protocol-stub.c:202
#2  callback ()
    at ../src/pulsecore/socket-server.c:143
#3  dispatch_pollfds () at ../src/pulse/mainloop.c:655
#4  pa_mainloop_dispatch () at ../src/pulse/mainloop.c:896
#5  pa_mainloop_iterate () at ../src/pulse/mainloop.c:927
#6  pa_mainloop_run () at ../src/pulse/mainloop.c:942
#7  main () at ../src/daemon/main.c:1170

pa_native_protocol_connect() 函數(shù)的部分代碼如下:

    c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
    pa_pstream_set_receive_packet_callback(c->pstream, pstream_packet_callback, c);
    pa_pstream_set_receive_memblock_callback(c->pstream, pstream_memblock_callback, c);
    pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
    pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
    pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
    pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);

    c->pdispatch = pa_pdispatch_new(p->core->mainloop, true, command_table, PA_COMMAND_MAX);

    c->record_streams = pa_idxset_new(NULL, NULL);
    c->output_streams = pa_idxset_new(NULL, NULL);
  1. 客戶端發(fā)送命令請求 pulseaudio 服務(wù)創(chuàng)建資源及 pulseaudio 服務(wù)處理命令

上面我們看到,pulseaudio 服務(wù)為流設(shè)置了收到數(shù)據(jù)包的處理函數(shù) pstream_packet_callback(),并且基于 command_table 創(chuàng)建了 pa_pdispatch 對象。pulseaudio/src/pulsecore/protocol-native.c 文件中,命令表 command_table 的定義如下:

static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
    [PA_COMMAND_ERROR] = NULL,
    [PA_COMMAND_TIMEOUT] = NULL,
    [PA_COMMAND_REPLY] = NULL,
    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
    [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
    [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
    [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
    [PA_COMMAND_AUTH] = command_auth,
    [PA_COMMAND_REQUEST] = NULL,
    [PA_COMMAND_EXIT] = command_exit,
    [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
    [PA_COMMAND_LOOKUP_SINK] = command_lookup,
    [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
    [PA_COMMAND_STAT] = command_stat,
    [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
    [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
    [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
    [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
    [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
    [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
    [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
    [PA_COMMAND_GET_SINK_INFO] = command_get_info,
    [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
    [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
    [PA_COMMAND_GET_CARD_INFO] = command_get_info,
    [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
    [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
    [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
    [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
    [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
    [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
    [PA_COMMAND_SUBSCRIBE] = command_subscribe,

    [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
    [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
    [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
    [PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME] = command_set_volume,

    [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
    [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
    [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
    [PA_COMMAND_SET_SOURCE_OUTPUT_MUTE] = command_set_mute,

    [PA_COMMAND_SUSPEND_SINK] = command_suspend,
    [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,

    [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
    [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
    [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
    [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,

    [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
    [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,

    [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
    [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
    [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
    [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
    [PA_COMMAND_KILL_CLIENT] = command_kill,
    [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
    [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
    [PA_COMMAND_LOAD_MODULE] = command_load_module,
    [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,

    [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
    [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
    [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
    [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,

    [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
    [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,

    [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
    [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,

    [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
    [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,

    [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
    [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
    [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,

    [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
    [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
    [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,

    [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,

    [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
    [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,

    [PA_COMMAND_SET_PORT_LATENCY_OFFSET] = command_set_port_latency_offset,

    [PA_COMMAND_ENABLE_SRBCHANNEL] = command_enable_srbchannel,

    [PA_COMMAND_REGISTER_MEMFD_SHMID] = command_register_memfd_shmid,

    [PA_COMMAND_SEND_OBJECT_MESSAGE] = command_send_object_message,

    [PA_COMMAND_EXTENSION] = command_extension
};

命令表中總共定義了 105 個命令的處理函數(shù)。上面的 pa_pdispatch_new() 函數(shù)定義 (位于 pulseaudio/src/pulsecore/pdispatch.c) 如下:

pa_pdispatch* pa_pdispatch_new(pa_mainloop_api *mainloop, bool use_rtclock, const pa_pdispatch_cb_t *table, unsigned entries) {
    pa_pdispatch *pd;

    pa_assert(mainloop);
    pa_assert((entries && table) || (!entries && !table));

    pd = pa_xnew0(pa_pdispatch, 1);
    PA_REFCNT_INIT(pd);
    pd->mainloop = mainloop;
    pd->callback_table = table;
    pd->n_commands = entries;
    PA_LLIST_HEAD_INIT(struct reply_info, pd->replies);
    pd->use_rtclock = use_rtclock;

    return pd;
}

后面可以通過 pa_pdispatch 對象的 callback_table 字段訪問命令表。pstream_packet_callback() 回調(diào)函數(shù)處理收到的數(shù)據(jù)包,這個函數(shù)的定義 (位于 pulseaudio/src/pulsecore/protocol-native.c) 如下:

static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
    pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);

    pa_assert(p);
    pa_assert(packet);
    pa_native_connection_assert_ref(c);

    if (pa_pdispatch_run(c->pdispatch, packet, ancil_data, c) < 0) {
        pa_log("invalid packet.");
        native_connection_unlink(c);
    }
}

pstream_packet_callback() 回調(diào)函數(shù)通過 pa_pdispatch_run() 處理各種命令,pa_pdispatch_run() 函數(shù)的定義 (位于 pulseaudio/src/pulsecore/pdispatch.c) 如下:

int pa_pdispatch_run(pa_pdispatch *pd, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
    uint32_t tag, command;
    pa_tagstruct *ts = NULL;
    int ret = -1;
    const void *pdata;
    size_t plen;

    pa_assert(pd);
    pa_assert(PA_REFCNT_VALUE(pd) >= 1);
    pa_assert(packet);

    pa_pdispatch_ref(pd);

    pdata = pa_packet_data(packet, &plen);
    if (plen <= 8)
        goto finish;

    ts = pa_tagstruct_new_fixed(pdata, plen);

    if (pa_tagstruct_getu32(ts, &command) < 0 ||
        pa_tagstruct_getu32(ts, &tag) < 0)
        goto finish;

#ifdef DEBUG_OPCODES
{
    char t[256];
    char const *p = NULL;

    if (command >= PA_COMMAND_MAX || !(p = command_names[command]))
        pa_snprintf((char*) (p = t), sizeof(t), "%u", command);

    pa_log("[%p] Received opcode <%s>", pd, p);
}
#endif

    pd->ancil_data = ancil_data;

    if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
        struct reply_info *r;

        PA_LLIST_FOREACH(r, pd->replies)
            if (r->tag == tag)
                break;

        if (r)
            run_action(pd, r, command, ts);

    } else if (pd->callback_table && (command < pd->n_commands) && pd->callback_table[command]) {
        const pa_pdispatch_cb_t *cb = pd->callback_table+command;

        (*cb)(pd, command, tag, ts, userdata);
    } else {
        pa_log("Received unsupported command %u", command);
        goto finish;
    }

    ret = 0;

finish:
    pd->ancil_data = NULL;

    if (ts)
        pa_tagstruct_free(ts);

    pa_pdispatch_unref(pd);

    return ret;
}

pa_pdispatch_run() 函數(shù)解析獲得命令索引,根據(jù)索引找到對應命令的處理函數(shù),并調(diào)用處理函數(shù)處理命令。

客戶端在連接建立后,通過新創(chuàng)建的連接發(fā)送幾個命令,請求 pulseaudio 服務(wù)分配資源。這些命令的交互主要包括如下這些:

command_get_info
command_get_info
command_auth
command_register_memfd_shmid
command_set_client_name
command_get_info
command_get_info
command_enable_srbchannel
command_create_playback_stream
command_create_playback_stream
command_create_playback_stream
command_create_playback_stream
command_get_info
command_get_info
command_get_info
command_get_info
command_get_info
command_get_info
command_cork_playback_stream
command_get_info
command_get_info
command_get_info

忽略不是很重要的 command_get_info 命令。

(1). 如前面我們提到的,客戶端和 pulseaudio 服務(wù)通過共享內(nèi)存相互傳遞大塊的音頻數(shù)據(jù)??蛻舳说墓蚕韮?nèi)存對象在創(chuàng)建 pa_context 對象時打開,如:

#0  sharedmem_create ()
    at ../src/pulsecore/shm.c:141
#1  pa_shm_create_rw () at ../src/pulsecore/shm.c:241
#2  pa_mempool_new () at ../src/pulsecore/memblock.c:849
#3  pa_context_new_with_proplist () at ../src/pulse/context.c:185
#4  pa_context_new () at ../src/pulse/context.c:103
#5  sync_playback_test () at ../src/tests/sync-playback.c:160
#6  main () at ../src/tests/sync-playback.c:184

pa_shm_create_rw () 函數(shù)定義 (位于 pulseaudio/src/pulsecore/shm.c) 如下:

#ifdef HAVE_SHM_OPEN
static char *segment_name(char *fn, size_t l, unsigned id) {
    pa_snprintf(fn, l, "/pulse-shm-%u", id);
    return fn;
}
#endif

static int privatemem_create(pa_shm *m, size_t size) {
    pa_assert(m);
    pa_assert(size > 0);

    m->type = PA_MEM_TYPE_PRIVATE;
    m->id = 0;
    m->size = size;
    m->do_unlink = false;
    m->fd = -1;

#ifdef MAP_ANONYMOUS
    if ((m->ptr = mmap(NULL, m->size, PROT_READ|PROT_WRITE, MAP_ANONYMOUS|MAP_PRIVATE, -1, (off_t) 0)) == MAP_FAILED) {
        pa_log("mmap() failed: %s", pa_cstrerror(errno));
        return -1;
    }
#elif defined(HAVE_POSIX_MEMALIGN)
    {
        int r;

        if ((r = posix_memalign(&m->ptr, pa_page_size(), size)) < 0) {
            pa_log("posix_memalign() failed: %s", pa_cstrerror(r));
            return r;
        }
    }
#else
    m->ptr = pa_xmalloc(m->size);
#endif

    return 0;
}


static int sharedmem_create(pa_shm *m, pa_mem_type_t type, size_t size, mode_t mode) {
#if defined(HAVE_SHM_OPEN) || defined(HAVE_MEMFD)
    char fn[32];
    int fd = -1;
    struct shm_marker *marker;
    bool do_unlink = false;

    /* Each time we create a new SHM area, let's first drop all stale
     * ones */
    pa_shm_cleanup();

    pa_random(&m->id, sizeof(m->id));

    switch (type) {
#ifdef HAVE_SHM_OPEN
    case PA_MEM_TYPE_SHARED_POSIX:
        segment_name(fn, sizeof(fn), m->id);
        fd = shm_open(fn, O_RDWR|O_CREAT|O_EXCL, mode);
        do_unlink = true;
        break;
#endif
#ifdef HAVE_MEMFD
    case PA_MEM_TYPE_SHARED_MEMFD:
        fd = memfd_create("pulseaudio", MFD_ALLOW_SEALING);
        break;
#endif
    default:
        goto fail;
    }

    if (fd < 0) {
        pa_log("%s open() failed: %s", pa_mem_type_to_string(type), pa_cstrerror(errno));
        goto fail;
    }

    m->type = type;
    m->size = size + shm_marker_size(type);
    m->do_unlink = do_unlink;

    if (ftruncate(fd, (off_t) m->size) < 0) {
        pa_log("ftruncate() failed: %s", pa_cstrerror(errno));
        goto fail;
    }

#ifndef MAP_NORESERVE
#define MAP_NORESERVE 0
#endif

    if ((m->ptr = mmap(NULL, PA_PAGE_ALIGN(m->size), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_NORESERVE, fd, (off_t) 0)) == MAP_FAILED) {
        pa_log("mmap() failed: %s", pa_cstrerror(errno));
        goto fail;
    }

    if (type == PA_MEM_TYPE_SHARED_POSIX) {
        /* We store our PID at the end of the shm block, so that we
         * can check for dead shm segments later */
        marker = (struct shm_marker*) ((uint8_t*) m->ptr + m->size - shm_marker_size(type));
        pa_atomic_store(&marker->pid, (int) getpid());
        pa_atomic_store(&marker->marker, SHM_MARKER);
    }

    /* For memfds, we keep the fd open until we pass it
     * to the other PA endpoint over unix domain socket. */
    if (type != PA_MEM_TYPE_SHARED_MEMFD) {
        pa_assert_se(pa_close(fd) == 0);
        m->fd = -1;
    }
#ifdef HAVE_MEMFD
    else
        m->fd = fd;
#endif

    return 0;

fail:
    if (fd >= 0) {
#ifdef HAVE_SHM_OPEN
        if (type == PA_MEM_TYPE_SHARED_POSIX)
            shm_unlink(fn);
#endif
        pa_close(fd);
    }
#endif /* defined(HAVE_SHM_OPEN) || defined(HAVE_MEMFD) */

    return -1;
}

int pa_shm_create_rw(pa_shm *m, pa_mem_type_t type, size_t size, mode_t mode) {
    pa_assert(m);
    pa_assert(size > 0);
    pa_assert(size <= MAX_SHM_SIZE);
    pa_assert(!(mode & ~0777));
    pa_assert(mode >= 0600);

    /* Round up to make it page aligned */
    size = PA_PAGE_ALIGN(size);

    if (type == PA_MEM_TYPE_PRIVATE)
        return privatemem_create(m, size);

    return sharedmem_create(m, type, size, mode);
}

pa_shm_create_rw () 函數(shù)創(chuàng)建用于進行音頻數(shù)據(jù)交換的內(nèi)存,根據(jù)請求類型,可能創(chuàng)建進程私有的,或跨進程共享的內(nèi)存塊。對于進程私有的內(nèi)存,可能會通過 mmapposix_memalignmalloc 這三種方式創(chuàng)建;對于跨進程共享的內(nèi)存,可能會通過 POSIX 共享內(nèi)存 API shm_openmemfd 兩種方式來創(chuàng)建。具體的方式,根據(jù)配置文件和系統(tǒng)對這些 API 的支持情況確定,但優(yōu)先使用 memfd。共享內(nèi)存塊通過 struct pa_shm 結(jié)構(gòu)體描述,這個結(jié)構(gòu)體定義 (位于 pulseaudio/src/pulsecore/shm.h) 如下:

typedef struct pa_shm {
    pa_mem_type_t type;
    unsigned id;
    void *ptr;
    size_t size;

    /* Only for type = PA_MEM_TYPE_SHARED_POSIX */
    bool do_unlink:1;

    /* Only for type = PA_MEM_TYPE_SHARED_MEMFD
     *
     * To avoid fd leaks, we keep this fd open only until we pass it
     * to the other PA endpoint over unix domain socket.
     *
     * When we don't have ownership for the memfd fd in question (e.g.
     * pa_shm_attach()), or the file descriptor has now been closed,
     * this is set to -1.
     *
     * For the special case of a global mempool, we keep this fd
     * always open. Check comments on top of pa_mempool_new() for
     * rationale. */
    int fd;
} pa_shm;

創(chuàng)建共享內(nèi)存時,會通過 mmap 將共享內(nèi)存塊映射到當前進程中。

(2). pulseaudio 服務(wù)是在客戶端發(fā)送了 PA_COMMAND_AUTH 命令,在PA_COMMAND_AUTH 命令處理程序中打開的共享內(nèi)存對象??蛻舳嗽谶B接建立成功時發(fā)送 PA_COMMAND_AUTH 命令:

#0  pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442
#1  pa_pstream_send_tagstruct_with_ancil_data ()
    at ../src/pulsecore/pstream-util.c:45
#2  pa_pstream_send_tagstruct_with_creds ()
    at ../src/pulsecore/pstream-util.c:58
#3  setup_context () at ../src/pulse/context.c:650
#4  on_connection () at ../src/pulse/context.c:926
#5  do_call () at ../src/pulsecore/socket-client.c:159
#6  connect_defer_cb () at ../src/pulsecore/socket-client.c:172
#7  dispatch_defer () at ../src/pulse/mainloop.c:680
#8  pa_mainloop_dispatch () at ../src/pulse/mainloop.c:887
#9  pa_mainloop_iterate () at ../src/pulse/mainloop.c:927
#10  pa_mainloop_run () at ../src/pulse/mainloop.c:942
#11  sync_playback_test () at ../src/tests/sync-playback.c:170
#12  main () at ../src/tests/sync-playback.c:184

(3). pulseaudio 服務(wù)在 PA_COMMAND_AUTH 命令處理程序中,創(chuàng)建 pa_srbchannel 對象,并打開共享內(nèi)存對象:

#0  sharedmem_create ()
    at ../src/pulsecore/shm.c:141
#1  pa_shm_create_rw () at ../src/pulsecore/shm.c:241
#2  pa_mempool_new (t) at ../src/pulsecore/memblock.c:849
#3  setup_srbchannel () at ../src/pulsecore/protocol-native.c:2502
#4  command_auth ()
    at ../src/pulsecore/protocol-native.c:2740
#5  pa_pdispatch_run ()
    at ../src/pulsecore/pdispatch.c:346
#6  pstream_packet_callback ()
    at ../src/pulsecore/protocol-native.c:5027
#7  do_read () at ../src/pulsecore/pstream.c:1020
#8  do_pstream_read_write () at ../src/pulsecore/pstream.c:260
#9  io_callback () at ../src/pulsecore/pstream.c:312
#10 callback ()
    at ../src/pulsecore/iochannel.c:158
#11 dispatch_pollfds () at ../src/pulse/mainloop.c:655
#12 pa_mainloop_dispatch () at ../src/pulse/mainloop.c:896
#13 pa_mainloop_iterate () at ../src/pulse/mainloop.c:927
#14 pa_mainloop_run () at ../src/pulse/mainloop.c:942
#15 main () at ../src/daemon/main.c:1170

pulseaudio 服務(wù)在 PA_COMMAND_AUTH 命令的處理中,創(chuàng)建共享內(nèi)存對象之后,還會通過 pa_pstream_register_memfd_mempool() 將共享內(nèi)存對象的文件描述符傳給客戶端,如下面的 setup_srbchannel () 函數(shù)定義所示:

static void setup_srbchannel(pa_native_connection *c, pa_mem_type_t shm_type) {
    pa_srbchannel_template srbt;
    pa_srbchannel *srb;
    pa_memchunk mc;
    pa_tagstruct *t;
    int fdlist[2];

#ifndef HAVE_CREDS
    pa_log_debug("Disabling srbchannel, reason: No fd passing support");
    return;
#endif

    if (!c->options->srbchannel) {
        pa_log_debug("Disabling srbchannel, reason: Must be enabled by module parameter");
        return;
    }

    if (c->version < 30) {
        pa_log_debug("Disabling srbchannel, reason: Protocol too old");
        return;
    }

    if (!pa_pstream_get_shm(c->pstream)) {
        pa_log_debug("Disabling srbchannel, reason: No SHM support");
        return;
    }

    if (c->rw_mempool) {
        pa_log_debug("Ignoring srbchannel setup, reason: received COMMAND_AUTH "
                     "more than once");
        return;
    }

    if (!(c->rw_mempool = pa_mempool_new(shm_type, c->protocol->core->shm_size, true))) {
        pa_log_warn("Disabling srbchannel, reason: Failed to allocate shared "
                    "writable memory pool.");
        return;
    }

    if (shm_type == PA_MEM_TYPE_SHARED_MEMFD) {
        const char *reason;
        if (pa_pstream_register_memfd_mempool(c->pstream, c->rw_mempool, &reason)) {
            pa_log_warn("Disabling srbchannel, reason: Failed to register memfd mempool: %s", reason);
            goto fail;
        }
    }
    pa_mempool_set_is_remote_writable(c->rw_mempool, true);

    srb = pa_srbchannel_new(c->protocol->core->mainloop, c->rw_mempool);
    if (!srb) {
        pa_log_debug("Failed to create srbchannel");
        goto fail;
    }
    pa_log_debug("Enabling srbchannel...");
    pa_srbchannel_export(srb, &srbt);

    /* Send enable command to client */
    t = pa_tagstruct_new();
    pa_tagstruct_putu32(t, PA_COMMAND_ENABLE_SRBCHANNEL);
    pa_tagstruct_putu32(t, (size_t) srb); /* tag */
    fdlist[0] = srbt.readfd;
    fdlist[1] = srbt.writefd;
    pa_pstream_send_tagstruct_with_fds(c->pstream, t, 2, fdlist, false);

    /* Send ringbuffer memblock to client */
    mc.memblock = srbt.memblock;
    mc.index = 0;
    mc.length = pa_memblock_get_length(srbt.memblock);
    pa_pstream_send_memblock(c->pstream, 0, 0, 0, &mc);

    c->srbpending = srb;
    return;

fail:
    if (c->rw_mempool) {
        pa_mempool_unref(c->rw_mempool);
        c->rw_mempool = NULL;
    }
}

setup_srbchannel () 函數(shù)還會向客戶端發(fā)送 PA_COMMAND_ENABLE_SRBCHANNEL 命令并向共享內(nèi)存中寫入一段數(shù)據(jù)。

(4). 隨后,客戶端收到 pulseaudio 服務(wù)發(fā)過來的共享內(nèi)存對象文件描述符,attach 到這塊共享內(nèi)存對象上:

#0  shm_attach () at ../src/pulsecore/shm.c:347
#1  pa_shm_attach ()
    at ../src/pulsecore/shm.c:424
#2  segment_attach ()
    at ../src/pulsecore/memblock.c:1123
#3  pa_memimport_attach_memfd () at ../src/pulsecore/memblock.c:1213
#4  pa_pstream_attach_memfd_shmid () at ../src/pulsecore/pstream.c:377
#5  pa_pstream_register_memfd_mempool ()
    at ../src/pulsecore/pstream-util.c:174
#6  setup_complete_callback ()
    at ../src/pulse/context.c:554
#7  run_action () at ../src/pulsecore/pdispatch.c:288
#8  pa_pdispatch_run ()
    at ../src/pulsecore/pdispatch.c:341
#9  pstream_packet_callback ()
    at ../src/pulse/context.c:353
#10 do_read () at ../src/pulsecore/pstream.c:1020
#11 do_pstream_read_write () at ../src/pulsecore/pstream.c:260
#12 io_callback () at ../src/pulsecore/pstream.c:312
#13 callback ()
    at ../src/pulsecore/iochannel.c:158
#14 dispatch_pollfds () at ../src/pulse/mainloop.c:655
#15 pa_mainloop_dispatch () at ../src/pulse/mainloop.c:896
#16 pa_mainloop_iterate () at ../src/pulse/mainloop.c:927
#17 pa_mainloop_run () at ../src/pulse/mainloop.c:942
#18 sync_playback_test () at ../src/tests/sync-playback.c:170
#19 main () at ../src/tests/sync-playback.c:184

(5). pulseaudio 服務(wù)創(chuàng)建的共享內(nèi)存對象的文件描述符被傳給客戶端,客戶端除了 attach 到這塊共享內(nèi)存對象上之外,還會通過發(fā)送 PA_COMMAND_REGISTER_MEMFD_SHMID 命令向 pulseaudio 服務(wù)注冊它自己創(chuàng)建的共享內(nèi)存對象的文件描述符:

#0  pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442
#1  pa_pstream_send_tagstruct_with_ancil_data ()
    at ../src/pulsecore/pstream-util.c:45
#2  pa_pstream_send_tagstruct_with_fds ()
    at ../src/pulsecore/pstream-util.c:81
#3  pa_pstream_register_memfd_mempool ()
    at ../src/pulsecore/pstream-util.c:186
#4  setup_complete_callback ()
    at ../src/pulse/context.c:554

(6). pulseaudio 服務(wù)處理客戶端發(fā)過來的注冊共享內(nèi)存對象文件描述符請求:

#0  shm_attach () at ../src/pulsecore/shm.c:347
#1  pa_shm_attach ()
    at ../src/pulsecore/shm.c:424
#2  segment_attach ()
    at ../src/pulsecore/memblock.c:1123
#3  pa_memimport_attach_memfd () at ../src/pulsecore/memblock.c:1213
#4  pa_pstream_attach_memfd_shmid () at ../src/pulsecore/pstream.c:377
#5  pa_common_command_register_memfd_shmid ()
    at ../src/pulsecore/native-common.c:67
#6  command_register_memfd_shmid ()
    at ../src/pulsecore/protocol-native.c:2750
#7  pa_pdispatch_run ()
    at ../src/pulsecore/pdispatch.c:346

(7). 客戶端發(fā)送 PA_COMMAND_ENABLE_SRBCHANNEL 命令:

#0  handle_srbchannel_memblock () at ../src/pulse/context.c:359
#1  pstream_memblock_callback () at ../src/pulse/context.c:413
#2  do_read () at ../src/pulsecore/pstream.c:1066
#3  do_pstream_read_write () at ../src/pulsecore/pstream.c:260
#4  io_callback () at ../src/pulsecore/pstream.c:312

pulseaudio 服務(wù)處理了客戶端注冊的共享內(nèi)存對象文件描述符之后,向共享內(nèi)存中寫入數(shù)據(jù),客戶端收到數(shù)據(jù)之后,向 pulseaudio 服務(wù)發(fā)送 PA_COMMAND_ENABLE_SRBCHANNEL 命令。

(8). pulseaudio 服務(wù)處理 PA_COMMAND_ENABLE_SRBCHANNEL 命令

#0  pa_pstream_set_srbchannel () at ../src/pulsecore/pstream.c:1272
#1  command_enable_srbchannel ()
    at ../src/pulsecore/protocol-native.c:2559
#2  pa_pdispatch_run ()
    at ../src/pulsecore/pdispatch.c:346
#3  pstream_packet_callback ()
    at ../src/pulsecore/protocol-native.c:5027

PA_COMMAND_ENABLE_SRBCHANNEL 命令處理函數(shù)中,將 srbchannel 與 pa_pstream 關(guān)聯(lián)起來。

(9). 客戶端發(fā)送 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令:

#0  pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442
#1  pa_pstream_send_tagstruct_with_ancil_data ()
    at ../src/pulsecore/pstream-util.c:45
#2  pa_pstream_send_tagstruct_with_creds () at ../src/pulsecore/pstream-util.c:61
#3  create_stream () at ../src/pulse/stream.c:1382
#4  pa_stream_connect_playback () at ../src/pulse/stream.c:1402
#5  context_state_callback () at ../src/tests/sync-playback.c:129

客戶端創(chuàng)建 pa_stream 對象,并在調(diào)用 pa_stream_connect_playback () 接口時向 pulseaudio 服務(wù)發(fā)送 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令,請求 pulseaudio 服務(wù)創(chuàng)建播放流。

(10). pulseaudio 服務(wù)處理 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令

#0  playback_stream_new () at ../src/pulsecore/protocol-native.c:964
#1  command_create_playback_stream ()
    at ../src/pulsecore/protocol-native.c:2067
#2  pa_pdispatch_run )
    at ../src/pulsecore/pdispatch.c:346

pulseaudio 服務(wù)處理 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令,創(chuàng)建 playback_stream 對象,這里會給流的 sink_input 設(shè)置許多回調(diào),如:

    s->sink_input->parent.process_msg = sink_input_process_msg;
    s->sink_input->pop = sink_input_pop_cb;
    s->sink_input->process_underrun = sink_input_process_underrun_cb;
    s->sink_input->process_rewind = sink_input_process_rewind_cb;
    s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
    s->sink_input->update_max_request = sink_input_update_max_request_cb;
    s->sink_input->kill = sink_input_kill_cb;
    s->sink_input->moving = sink_input_moving_cb;
    s->sink_input->suspend = sink_input_suspend_cb;
    s->sink_input->send_event = sink_input_send_event_cb;
    s->sink_input->userdata = s;

    start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;

    fix_playback_buffer_attr(s);

    pa_sink_input_get_silence(sink_input, &silence);
    memblockq_name = pa_sprintf_malloc("native protocol playback stream memblockq [%u]", s->sink_input->index);
    s->memblockq = pa_memblockq_new(
            memblockq_name,
            start_index,
            s->buffer_attr.maxlength,
            s->buffer_attr.tlength,
            &sink_input->sample_spec,
            s->buffer_attr.prebuf,
            s->buffer_attr.minreq,
            0,
            &silence);
    pa_xfree(memblockq_name);

(11). 客戶端發(fā)送 PA_COMMAND_CORK_PLAYBACK_STREAM 命令:

#0  pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442
#1  pa_pstream_send_tagstruct_with_ancil_data ()
    at ../src/pulsecore/pstream-util.c:45
#2  pa_pstream_send_tagstruct_with_creds () at ../src/pulsecore/pstream-util.c:61
#3  pa_stream_cork () at ../src/pulse/stream.c:2293
#4  stream_state_callback () at ../src/tests/sync-playback.c:95

客戶端調(diào)用 pa_stream_cork () 接口向 pulseaudio 服務(wù)發(fā)送 PA_COMMAND_CORK_PLAYBACK_STREAM 命令。這個接口用于暫?;蚧謴筒シ呕蜾浿屏?。

(12). pulseaudio 服務(wù)處理 PA_COMMAND_CORK_PLAYBACK_STREAM 命令

#0  pa_sink_input_cork () at ../src/pulsecore/sink-input.c:1577
#1  command_cork_playback_stream ()
    at ../src/pulsecore/protocol-native.c:3988
#2  pa_pdispatch_run ()
    at ../src/pulsecore/pdispatch.c:346
#3  pstream_packet_callback ()
    at ../src/pulsecore/protocol-native.c:5027

pulseaudio 服務(wù)設(shè)置 sink_input 的狀態(tài)。

此外,客戶端還向 pulseaudio 服務(wù)發(fā)送了 command_set_client_name 命令和多個 command_get_info 命令 ,這里不再詳述這些命令的發(fā)送和接收處理過程。

客戶端和 pulseaudio 服務(wù)經(jīng)過上面的這些命令交互,完成整個的協(xié)商過程,它們之間可以開始進行音頻數(shù)據(jù)的互相發(fā)送了。

  1. 客戶端發(fā)送數(shù)據(jù)

客戶端通過調(diào)用 pa_stream_write () 向播放流中寫入數(shù)據(jù),將播放數(shù)據(jù)發(fā)送給 pulseaudio 服務(wù):

#0  pa_pstream_send_memblock () at ../src/pulsecore/pstream.c:477
#1  pa_stream_write_ext_free () at ../src/pulse/stream.c:1549
#2  pa_stream_write () at ../src/pulse/stream.c:1616
#3  stream_state_callback () at ../src/tests/sync-playback.c:87

pa_stream_write_ext_free () 將要發(fā)送的數(shù)據(jù)切成一個個的塊,然后通過 pa_pstream_send_memblock () 發(fā)送出去:

        while (t_length > 0) {
            pa_memchunk chunk;

            chunk.index = 0;

            if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
                chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
                chunk.length = t_length;
            } else {
                void *d;
                size_t blk_size_max;

                /* Break large audio streams into _aligned_ blocks or the
                 * other endpoint will happily discard them upon arrival. */
                blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
                chunk.length = PA_MIN(t_length, blk_size_max);
                chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);

                d = pa_memblock_acquire(chunk.memblock);
                memcpy(d, t_data, chunk.length);
                pa_memblock_release(chunk.memblock);
            }

            pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);

            t_offset = 0;
            t_seek = PA_SEEK_RELATIVE;

            t_data = (const uint8_t*) t_data + chunk.length;
            t_length -= chunk.length;

            pa_memblock_unref(chunk.memblock);
        }

pa_pstream_send_memblock () 發(fā)送數(shù)據(jù)是異步的,它將一個個塊的數(shù)據(jù)再按需切成一個個 item,放進發(fā)送隊列里:

void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
    size_t length, idx;
    size_t bsm;

    pa_assert(p);
    pa_assert(PA_REFCNT_VALUE(p) > 0);
    pa_assert(channel != (uint32_t) -1);
    pa_assert(chunk);

    if (p->dead)
        return;

    idx = 0;
    length = chunk->length;

    bsm = pa_mempool_block_size_max(p->mempool);

    while (length > 0) {
        struct item_info *i;
        size_t n;

        if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
            i = pa_xnew(struct item_info, 1);
        i->type = PA_PSTREAM_ITEM_MEMBLOCK;

        n = PA_MIN(length, bsm);
        i->chunk.index = chunk->index + idx;
        i->chunk.length = n;
        i->chunk.memblock = pa_memblock_ref(chunk->memblock);

        i->channel = channel;
        i->offset = offset;
        i->seek_mode = seek_mode;
#ifdef HAVE_CREDS
        i->with_ancil_data = false;
#endif

        pa_queue_push(p->send_queue, i);

        idx += n;
        length -= n;
    }

    p->mainloop->defer_enable(p->defer_event, 1);
}

在主事件循環(huán)中,通過 srbchannel 將數(shù)據(jù)發(fā)送出去:

#0  pa_memexport_put () at ../src/pulsecore/memblock.c:1455
#1  prepare_next_write_item () at ../src/pulsecore/pstream.c:664
#2  do_write () at ../src/pulsecore/pstream.c:751
#3  do_pstream_read_write () at ../src/pulsecore/pstream.c:266
#4  srb_callback () at ../src/pulsecore/pstream.c:295
#5  srbchannel_rwloop () at ../src/pulsecore/srbchannel.c:190
#6  semread_cb ()
    at ../src/pulsecore/srbchannel.c:210
#7  dispatch_pollfds () at ../src/pulse/mainloop.c:655

pa_pstream_send_memblock () 函數(shù)是如何觸發(fā) srbchannel 的回調(diào)執(zhí)行的呢?這是由于客戶端通過如下過程創(chuàng)建 pa_srbchannel

#0  pa_srbchannel_new_from_template () at ../src/pulsecore/srbchannel.c:291
#1  handle_srbchannel_memblock () at ../src/pulse/context.c:380
#2  pstream_memblock_callback () at ../src/pulse/context.c:413
#3  do_read () at ../src/pulsecore/pstream.c:1066
#4  do_pstream_read_write () at ../src/pulsecore/pstream.c:260
#5  io_callback () at ../src/pulsecore/pstream.c:312
#6  callback ()
    at ../src/pulsecore/iochannel.c:158
#7  dispatch_pollfds () at ../src/pulse/mainloop.c:655

且會為 srb_channel 設(shè)置 defer_event 回調(diào):

#0  pa_srbchannel_set_callback () at ../src/pulsecore/srbchannel.c:340
#1  check_srbpending () at ../src/pulsecore/pstream.c:738
#2  do_write () at ../src/pulsecore/pstream.c:755
#3  do_pstream_read_write () at ../src/pulsecore/pstream.c:266
#4  0x00007ffff7bcd19e in io_callback () at ../src/pulsecore/pstream.c:312

pa_srbchannel_set_callback () 函數(shù)實現(xiàn)如下:

void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {
    if (sr->callback)
        pa_fdsem_after_poll(sr->sem_read);

    sr->callback = callback;
    sr->cb_userdata = userdata;

    if (sr->callback) {
        /* If there are events to be read already in the ringbuffer, we will not get any IO event for that,
           because that's how pa_fdsem works. Therefore check the ringbuffer in a defer event instead. */
        if (!sr->defer_event)
            sr->defer_event = sr->mainloop->defer_new(sr->mainloop, defer_cb, sr);
        sr->mainloop->defer_enable(sr->defer_event, 1);
    }
}

pa_srbchannel_new_from_template () 函數(shù)的實現(xiàn)如下:

pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t)
{
    int temp;
    struct srbheader *srh;
    pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));

    sr->mainloop = m;
    sr->memblock = t->memblock;
    pa_memblock_ref(sr->memblock);
    srh = pa_memblock_acquire(sr->memblock);

    sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
    sr->rb_read.count = &srh->read_count;
    sr->rb_write.count = &srh->write_count;

    sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
    sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;

    sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
    if (!sr->sem_read)
        goto fail;

    sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
    if (!sr->sem_write)
        goto fail;

    pa_srbchannel_swap(sr);
    temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;

#ifdef DEBUG_SRBCHANNEL
    pa_log("Enabling io event on fd %d", t->readfd);
#endif

    sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);

    return sr;

fail:
    pa_srbchannel_free(sr);

    return NULL;
}

這里會給 io event (read_event)關(guān)聯(lián)回調(diào)。

pa_pstream_send_memblock () 函數(shù)中通過 p->mainloop->defer_enable(p->defer_event, 1); 喚醒主循環(huán),這個回調(diào)的實際實現(xiàn)函數(shù)為 mainloop_defer_enable()

static void mainloop_defer_enable(pa_defer_event *e, int b) {
    pa_assert(e);
    pa_assert(!e->dead);

    if (e->enabled && !b) {
        pa_assert(e->mainloop->n_enabled_defer_events > 0);
        e->mainloop->n_enabled_defer_events--;
    } else if (!e->enabled && b) {
        e->mainloop->n_enabled_defer_events++;
        pa_mainloop_wakeup(e->mainloop);
    }

    e->enabled = b;
}

mainloop_defer_enable() 函數(shù)更新狀態(tài) n_enabled_defer_events,并喚醒主事件循環(huán)。被喚醒的主事件循環(huán)中,pa_mainloop_dispatch() 在看到這個狀態(tài)時,會將事件派發(fā)給各個 defer event:

static unsigned dispatch_defer(pa_mainloop *m) {
    pa_defer_event *e;
    unsigned r = 0;

    if (m->n_enabled_defer_events <= 0)
        return 0;

    PA_LLIST_FOREACH(e, m->defer_events) {

        if (m->quit)
            break;

        if (e->dead || !e->enabled)
            continue;

        pa_assert(e->callback);
        e->callback(&m->api, e, e->userdata);
        r++;
    }

    return r;
}
. . . . . .
int pa_mainloop_dispatch(pa_mainloop *m) {
    unsigned dispatched = 0;

    pa_assert(m);
    pa_assert(m->state == STATE_POLLED);

    if (m->quit)
        goto quit;

    if (m->n_enabled_defer_events)
        dispatched += dispatch_defer(m);
    else {

mainloop_defer_enable() 在出發(fā) defer event 被回調(diào)之外,也會觸發(fā) srb_channel 的 read event 被調(diào)用。

  1. 服務(wù)端接收數(shù)據(jù)

在 Linux 平臺上,pulseaudio 服務(wù)通過 ALSA 與系統(tǒng)音頻硬件交互。ALSA 項目的主頁為 ALSA。ALSA 項目有一份文檔 A Tutorial on Using the ALSA Audio API 簡單說明了 ALSA 庫接口的用法。ALSA 庫的接口還可以參考 ALSA Library APIALSA library API reference,特別是 PCM interface 部分。

有兩種方法可以用來傳輸音頻樣本數(shù)據(jù),第一種是標準的讀寫接口。第二種是使用直接音頻緩沖區(qū),來與設(shè)備通信。標準的讀寫接口包括 snd_pcm_writei() / snd_pcm_readi()snd_pcm_writen() / snd_pcm_readn()。直接讀寫傳輸借助于 mmap 的區(qū)域來傳輸數(shù)據(jù),這些接口包括 snd_pcm_mmap_begin() / snd_pcm_mmap_commit()。pulse audio 中用這兩種方式都支持,一般用的是直接讀寫傳輸。

alsa-sink 內(nèi)部有一個線程,在需要的時候,會去取播放的數(shù)據(jù)并寫入設(shè)備(pulseaudio/src/modules/alsa/alsa-sink.c):

static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, bool polled, bool on_timeout) {
    bool work_done = false;
. . . . . .
        for (;;) {
            pa_memchunk chunk;
            void *p;
            int err;
            const snd_pcm_channel_area_t *areas;
            snd_pcm_uframes_t offset, frames;
            snd_pcm_sframes_t sframes;
            size_t written;

            frames = (snd_pcm_uframes_t) (n_bytes / u->frame_size);
/*             pa_log_debug("%lu frames to write", (unsigned long) frames); */

            if (PA_UNLIKELY((err = pa_alsa_safe_mmap_begin(u->pcm_handle, &areas, &offset, &frames, u->hwbuf_size, &u->sink->sample_spec)) < 0)) {

                if (!after_avail && err == -EAGAIN)
                    break;

                if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0)
                    continue;

                if (r == 1)
                    break;

                return r;
            }

            /* Make sure that if these memblocks need to be copied they will fit into one slot */
            frames = PA_MIN(frames, u->frames_per_block);

            if (!after_avail && frames == 0)
                break;

            pa_assert(frames > 0);
            after_avail = false;

            /* Check these are multiples of 8 bit */
            pa_assert((areas[0].first & 7) == 0);
            pa_assert((areas[0].step & 7) == 0);

            /* We assume a single interleaved memory buffer */
            pa_assert((areas[0].first >> 3) == 0);
            pa_assert((areas[0].step >> 3) == u->frame_size);

            p = (uint8_t*) areas[0].addr + (offset * u->frame_size);

            written = frames * u->frame_size;
            chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, written, true);
            chunk.length = pa_memblock_get_length(chunk.memblock);
            chunk.index = 0;

            pa_sink_render_into_full(u->sink, &chunk);
            pa_memblock_unref_fixed(chunk.memblock);

            if (PA_UNLIKELY((sframes = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {

                if ((int) sframes == -EAGAIN)
                    break;

                if ((r = try_recover(u, "snd_pcm_mmap_commit", (int) sframes)) == 0)
                    continue;

                if (r == 1)
                    break;

                return r;
            }

如上所示,alsa-sink 模塊通過 snd_pcm_mmap_commit() 接口將接收的音頻數(shù)據(jù)送給音頻設(shè)備。

alsa-sink 線程取數(shù)據(jù)的過程如下:

#0  sink_input_pop_cb () at ../src/pulsecore/protocol-native.c:1504
#1  pa_sink_input_peek ()
    at ../src/pulsecore/sink-input.c:931
#2  fill_mix_info () at ../src/pulsecore/sink.c:1100
#3  pa_sink_render_into () at ../src/pulsecore/sink.c:1340
#4  pa_sink_render_into_full () at ../src/pulsecore/sink.c:1424
#5  mmap_write () at ../src/modules/alsa/alsa-sink.c:737
#6  thread_func () at ../src/modules/alsa/alsa-sink.c:1921
#7  internal_thread_func () at ../src/pulsecore/thread-posix.c:81
#8  start_thread ( at pthread_create.c:477

pulseaudio 的 ALSA 模塊在 sink_input_pop_cb() 函數(shù)中請求音頻數(shù)據(jù):

/* Called from thread context */
static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
    playback_stream *s;

    pa_sink_input_assert_ref(i);
    s = PLAYBACK_STREAM(i->userdata);
    playback_stream_assert_ref(s);
    pa_assert(chunk);

#ifdef PROTOCOL_NATIVE_DEBUG
    pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
#endif

    if (!handle_input_underrun(s, false))
        s->is_underrun = false;

    /* This call will not fail with prebuf=0, hence we check for
       underrun explicitly in handle_input_underrun */
    if (pa_memblockq_peek(s->memblockq, chunk) < 0)
        return -1;

    chunk->length = PA_MIN(nbytes, chunk->length);

    if (i->thread_info.underrun_for > 0)
        pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);

    pa_memblockq_drop(s->memblockq, chunk->length);
    playback_stream_request_bytes(s);

    return 0;
}

sink_input_pop_cb() 函數(shù)先處理 underrun ,即緩沖區(qū)中數(shù)據(jù)不足的情況:

#0  playback_stream_request_bytes () at ../src/pulsecore/protocol-native.c:1116
#1  handle_input_underrun () at ../src/pulsecore/protocol-native.c:1488
#2  sink_input_pop_cb () at ../src/pulsecore/protocol-native.c:1516
#3  pa_sink_input_peek ()
    at ../src/pulsecore/sink-input.c:931

此時會通過 playback_stream_request_bytes () 給客戶端發(fā)消息讓它發(fā)數(shù)據(jù)過來。隨后,從內(nèi)存塊緩存隊列中取一部分數(shù)據(jù)出來:

#0  pa_memblockq_peek () at ../src/pulsecore/memblockq.c:474
#1  sink_input_pop_cb () at ../src/pulsecore/protocol-native.c:1521
#2  pa_sink_input_peek ()
    at ../src/pulsecore/sink-input.c:931

最后sink_input_pop_cb() 函數(shù)還是會通過 playback_stream_request_bytes () 給客戶端發(fā)消息請求數(shù)據(jù)。playback_stream_request_bytes () 函數(shù)定義如下:

static void playback_stream_request_bytes(playback_stream *s) {
    size_t m;

    playback_stream_assert_ref(s);

    m = pa_memblockq_pop_missing(s->memblockq);

    if (m <= 0)
        return;

#ifdef PROTOCOL_NATIVE_DEBUG
    pa_log("request_bytes(%lu)", (unsigned long) m);
#endif

    if (pa_atomic_add(&s->missing, (int) m) <= 0)
        pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}

這最終會導致 pulseaudio 服務(wù)向客戶端發(fā)送一個請求數(shù)據(jù)的命令(pulseaudio/src/pulsecore/protocol-native.c)

static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
    playback_stream *s = PLAYBACK_STREAM(o);
    playback_stream_assert_ref(s);

    if (!s->connection)
        return -1;

    switch (code) {

        case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
            pa_tagstruct *t;
            int l = 0;

            for (;;) {
                if ((l = pa_atomic_load(&s->missing)) <= 0)
                    return 0;

                if (pa_atomic_cmpxchg(&s->missing, l, 0))
                    break;
            }

            t = pa_tagstruct_new();
            pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
            pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
            pa_tagstruct_putu32(t, s->index);
            pa_tagstruct_putu32(t, (uint32_t) l);
            pa_pstream_send_tagstruct(s->connection->pstream, t);

播放數(shù)據(jù)這塊的處理是典型的生產(chǎn)者-消費者模型。前面這里看到的都是消費者的處理,不過那生產(chǎn)者是怎么把數(shù)據(jù)放進隊列里的呢?

sink_input_process_msg() 函數(shù)在處理 SINK_INPUT_MESSAGE_SEEK 消息和 SINK_INPUT_MESSAGE_POST_DATA 消息時將讀取的數(shù)據(jù)放進隊列里,從 backtrace 可以看到,放數(shù)據(jù)的動作是在 IO 線程,也就是 alsa-sink 線程,里完成的:

#0  sink_input_process_msg () at ../src/pulsecore/protocol-native.c:1318
#1  pa_asyncmsgq_dispatch ()
    at ../src/pulsecore/asyncmsgq.c:323
#2  asyncmsgq_read_work () at ../src/pulsecore/rtpoll.c:566
#3  pa_rtpoll_run () at ../src/pulsecore/rtpoll.c:238
#4  thread_func () at ../src/modules/alsa/alsa-sink.c:2003

SINK_INPUT_MESSAGE_POST_DATASINK_INPUT_MESSAGE_POST_DATA 消息,讀取到客戶端發(fā)送過來的數(shù)據(jù)時發(fā)送,pstream.c 下的 do_read() 通過 pa_memimport_get () 獲得客戶端發(fā)送過來的內(nèi)存塊:

#0  pa_memimport_get () at ../src/pulsecore/memblock.c:1231
#1  do_read () at ../src/pulsecore/pstream.c:1042
#2  do_pstream_read_write () at ../src/pulsecore/pstream.c:253
#3  srb_callback () at ../src/pulsecore/pstream.c:295
#4  srbchannel_rwloop () at ../src/pulsecore/srbchannel.c:190
#5  semread_cb ()
    at ../src/pulsecore/srbchannel.c:210

pstream_memblock_callback() 函數(shù)中,發(fā)送 SINK_INPUT_MESSAGE_POST_DATASINK_INPUT_MESSAGE_POST_DATA 消息出去:

#0  pstream_memblock_callback () at ../src/pulsecore/protocol-native.c:5033
#1  do_read () at ../src/pulsecore/pstream.c:1066
#2  do_pstream_read_write () at ../src/pulsecore/pstream.c:253

從 PulseAudio Git repo master 分支的 commit 歷史可以看到,第一筆 commit 是在 2004 年提交的,從第一筆 commit 提交到現(xiàn)在已經(jīng)有近 20 年了,PulseAudio 是一個經(jīng)過了非常多年發(fā)展,非常有歷史的一個項目,想必 PulseAudio 的很多代碼也是經(jīng)過了相當長的歷史演化變成今天這個樣子的。一天兩天是很難將這個項目完全吃透的。這里對于 PulseAudio 的說明還很粗淺。

PulseAudio 項目不僅僅是要做一個應用和音頻設(shè)備之間的數(shù)據(jù)通道,它更想成為一個音頻框架。這里的介紹只是了解 PulseAudio 設(shè)計和實現(xiàn)的一個角度。要想對 PulseAudio 有更深入的了解,對一些技術(shù)基礎(chǔ)的了解不可或缺:

  • 進程間通信的庫接口的詳細用法,這包括 D-Bus,Unix 域 socket API,IPv4/IPv6 tcp socket 的 API,共享內(nèi)存接口如 memfd_create() / shm_open() 等。
  • udev 庫接口的詳細用法及其相關(guān)機制
  • ALSA 庫接口的詳細用法
  • timerfd 和 eventfd 的用法

還有很多 PulseAudio 相關(guān)的內(nèi)容這里還沒有涉及:

  • PulseAudio 的線程模型及線程基礎(chǔ)設(shè)施,如 pa_mainloop、pa_mainloop_apipa_threaded_mainloop 等。
  • PulseAudio 創(chuàng)建的一些概念和抽象的語義,如 pa_context,pa_stream,pa_iochannel,pa_pstream,pa_srbchannel,pa_sinkpa_sink_input,pa_sourcepa_source_output,pa_modulepa_card 等等等
  • pulseaudio 服務(wù)管理音頻設(shè)備文件
  • pulseaudio 服務(wù)的模塊化架構(gòu)設(shè)計
  • 音頻 pipeline 的搭建
  • 客戶端和 pulseaudio 服務(wù)間消息和數(shù)據(jù)交換的詳細設(shè)計
  • 等等等。

參考文檔:
如何暫時禁用PulseAudio?
Instructions for building and installing the current development version
README.md of pulseaudio
Unix/Linux編程:進程間通信(IPC)總結(jié)
通過 Unix Domain Socket 傳遞文件描述符

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容