音視頻開發(fā)之旅(49)-邊緩存邊播放之AndroidVideoCache

目錄

  1. 背景
  2. AndroidVideoCache簡單使用
  3. 實(shí)現(xiàn)原理
  4. 源碼分析
  5. AndroidVideoCache的不足
  6. 資料
  7. 收獲

一、背景

播放音視頻時,播放器數(shù)據(jù)的請求是由播放器內(nèi)部發(fā)起的,我們只是提供了一個url,而不能控制數(shù)據(jù)的請求過程,
都是要先進(jìn)行下載,下載到一定量之后播放器再開始播放,當(dāng)下載進(jìn)度減去播放進(jìn)度小于一定閥值,進(jìn)入緩沖狀態(tài)。
比如MediaPlayer的最小緩存大小是4M,最大20M

//framework/av/media/libdatasource/include/datasource/NuCachedSource2.h:30
 
enum {
        kPageSize                       = 65536,
        //緩沖 最大閥值 20M
        kDefaultHighWaterThreshold      = 20 * 1024 * 1024,
        //緩沖 最小閥值 4M
        kDefaultLowWaterThreshold       = 4 * 1024 * 1024,

        // Read data after a 15 sec timeout whether we're actively
        // fetching or not.
        kDefaultKeepAliveIntervalUs     = 15000000,
    };

這樣的設(shè)計(jì)有如下兩個弊端:

  1. 造成首幀時長、卡頓恢復(fù)時長,都會比較高,影響用戶體驗(yàn)。
  2. 每次都要重新跟進(jìn)url重新下載視頻,造成了嚴(yán)重的流量(真金白銀)浪費(fèi)。

這就需要一種自定義播放器結(jié)合邊下邊播的策略,對下載、解碼、播放進(jìn)行控制。我們今天分析的開源項(xiàng)目AndroidVideoCache給我們提供了一種很好的思路,我們一起來分析學(xué)習(xí)吧。

二、AndroidVideoCache簡單使用

 public void setDataSource(String path ){  
   ...   
    // 獲取APP單例的proxy
   HttpProxyCacheServer proxy = MyApplication.getProxy();
    //把網(wǎng)絡(luò)的url轉(zhuǎn)為代理的url
   String proxyUrl = proxy.getProxyUrl(path);
    //內(nèi)部觸發(fā)請求,socketServer根據(jù)host和port監(jiān)聽有socket連接進(jìn)行代理請求下載音視頻流數(shù)據(jù)
   mediaPlayer.setDataSource(proxyUrl);
   ...
}


public class MyApplication extend Application
   
   public static HttpProxyCacheServer getProxy() {
        return getInstance().proxy == null ? (getInstance().proxy = getInstance().newProxy()) : getInstance().proxy;
    }


    private HttpProxyCacheServer newProxy() {
        return new HttpProxyCacheServer.Builder(mContext)
                //設(shè)置緩存路徑
                .cacheDirectory(CacheUtils.getVideoCacheDir(mContext))
                //設(shè)置緩存的名稱
                .fileNameGenerator(new MyMd5FileNameGenerator())
                .build();
    }
}

三、實(shí)現(xiàn)原理

在業(yè)務(wù)層和播放器層直接加入本地代理,通過Socket的的方式,首先建立本地的socketServer,監(jiān)聽local host和指定(bind的時候指定讓系統(tǒng)來分配一個可用的)端口的請求。每次數(shù)據(jù)的請求都發(fā)給local host,socketSrever監(jiān)聽到有Socket連接時,由 socketServer來代理視頻數(shù)據(jù)的請求,請求到的數(shù)據(jù)不返回給播放器,而是直接寫入到文件緩存中,再從改文件緩存中讀取buffer數(shù)據(jù)給到播放器。

圖片來自:Android主流視頻播放及緩存實(shí)現(xiàn)原理調(diào)研

四、源碼分析

主流程圖

下面我們結(jié)合源碼進(jìn)行分析,我們從HttpProxyCacheServer獲取本地代理以及轉(zhuǎn)換請求地址的getProxyUrl方法開始入手具體分析下。

1. HttpProxyCacheServer.Builder通過構(gòu)造器來生成本地代理服務(wù)器。

        public HttpProxyCacheServer build() {
            Config config = buildConfig();
            return new HttpProxyCacheServer(config);
        }


        private Config buildConfig() {
            //cacheRoot: 設(shè)置緩存路徑
            //fileNameGenerator: 設(shè)置文件名,一般用url的md5或者唯一表示的業(yè)務(wù)id/hash
            //diskUsage: 緩存的lru策略,有個touch方法,用于更新文件的修改時間(這個的實(shí)現(xiàn)也很有意思)。
            //           支持設(shè)置緩存總大小以及緩存總個數(shù)的閥值。也可以自行擴(kuò)展比如設(shè)置緩存的有效期
            //sourceInfoStorage : 緩存信息的存儲,根據(jù)唯一表示存儲/查詢對應(yīng)的緩存路徑等信息
            
            return new Config(cacheRoot, fileNameGenerator, diskUsage, sourceInfoStorage);
        }

2. HttpProxyCacheServer構(gòu)造方法

private static final String PROXY_HOST = "127.0.0.1";

private HttpProxyCacheServer(Config config) {
        this.config = checkNotNull(config);
        try {
            //根據(jù)host生成本地代理服務(wù)器的地址
            InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
            //創(chuàng)建ServerSocket,最大可于8個client進(jìn)行連接
            this.serverSocket = new ServerSocket(0, 8, inetAddress);
            //有系統(tǒng)自動分配一個端口
            this.port = serverSocket.getLocalPort();
            IgnoreHostProxySelector.install(PROXY_HOST, port);
            //等待waitConnectionThread線程啟動
            CountDownLatch startSignal = new CountDownLatch(1);
            //開啟一個線程接收socket連接
            this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
            this.waitConnectionThread.start();
            //阻塞當(dāng)前線程,直到startSignal.countDown();
            startSignal.await(); // freeze thread, wait for server starts
            this.pinger = new Pinger(PROXY_HOST, port);
        } catch (IOException | InterruptedException e) {
            socketProcessor.shutdown();
            throw new IllegalStateException("Error starting local proxy server", e);
        }
    }

3. WaitRequestsRunnable:開啟一個線程,在線程中輪訓(xùn)

 private final class WaitRequestsRunnable implements Runnable {

        private final CountDownLatch startSignal;

        public WaitRequestsRunnable(CountDownLatch startSignal)   {
            this.startSignal = startSignal;
        }

        @Override
        public void run() {
            startSignal.countDown();
            //開啟一個線程,在線程中輪訓(xùn)
            waitForRequest();
        }
    }

4. waitForRequest

private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);

private void waitForRequest() {
        try {
            //如果線程沒有interrupt,不斷的輪詢,用于檢測是否有新的socket連接
            while (!Thread.currentThread().isInterrupted()) {
                //阻塞的方法 用于socket連接
                //socketServer通過監(jiān)聽本地host:port,如果有對應(yīng)的請求觸發(fā)就進(jìn)行一個socket連接
                Socket socket = serverSocket.accept();
                //線程池,同時最大可以有8個socket連接
               // 每個socket獨(dú)占一個線程,最大可以有8個并發(fā)連接
               // submit一個runnable進(jìn)行處理socket
                socketProcessor.submit(new SocketProcessorRunnable(socket));
            }
        } catch (IOException e) {
            onError(new ProxyCacheException("Error during waiting connection", e));
        }
    }

5. 等到有看下getProxyUrl調(diào)用,serverSocket的accept就會收到socket連接走到SocketProcessorRunnable,我們先看下getProxyUrl的實(shí)現(xiàn)。

    public String getProxyUrl(String url, boolean allowCachedFileUri) {
        if (allowCachedFileUri && isCached(url)) {
            File cacheFile = getCacheFile(url);
            touchFileSafely(cacheFile);
            return Uri.fromFile(cacheFile).toString();
        }
        return isAlive() ? appendToProxyUrl(url) : url;
    }

    private boolean isAlive() {
        return pinger.ping(3, 70);   // 70+140+280=max~500ms
    }


    private String appendToProxyUrl(String url) {
        return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));
    }

6. 接著繼續(xù)看SocketProcessorRunnable:處理這個socket連接

    private final class SocketProcessorRunnable implements Runnable {

        private final Socket socket;

        public SocketProcessorRunnable(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            //處理這個socket連接
            processSocket(socket);
        }
    }

7. processSocket:獲取 HttpProxyCacheServerClients ,并進(jìn)行request處理

//HttpProxyCacheServer#processSocket

private void processSocket(Socket socket) {
        try {
            //通過輸入流(即請求轉(zhuǎn)換過的url等信息)生成GetRequest對象
            GetRequest request = GetRequest.read(socket.getInputStream());
            String url = ProxyCacheUtils.decode(request.uri);
            //url是"ping" 返回200,可以ping通
            if (pinger.isPingRequest(url)) {
                pinger.responseToPing(socket);
            } else {
                //獲取 HttpProxyCacheServerClients ,并進(jìn)行request處理
                HttpProxyCacheServerClients clients = getClients(url);
                clients.processRequest(request, socket);
            }
        } catch (SocketException e) {
            // There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
            // So just to prevent log flooding don't log stacktrace
        } catch (ProxyCacheException | IOException e) {
            onError(new ProxyCacheException("Error processing request", e));
        } finally {
            //socket處理完畢之后,在finally中,關(guān)閉socket連接釋放資源
            releaseSocket(socket);
        }
    }

8. HttpProxyCacheServerClients#processRequest: 構(gòu)造proxyCache,并進(jìn)行請求

//HttpProxyCacheServerClients#processRequest

public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
        //proxyCache的初始化,如果沒有則重新newHttpProxyCache,否則復(fù)用即可
        startProcessRequest();
        try {
            //原子操作用于記錄當(dāng)前有多少個socketClient
            clientsCount.incrementAndGet();
            //緩存代理開始處理
            proxyCache.processRequest(request, socket);
        } finally {
            //結(jié)束
            finishProcessRequest();
        }
    }

    private synchronized void startProcessRequest() throws ProxyCacheException {
        proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
    }

    private synchronized void finishProcessRequest() {
        if (clientsCount.decrementAndGet() <= 0) {
            //sourceReaderThread中斷
            //FileChannel關(guān)閉
            //touch下文件
            proxyCache.shutdown();
            proxyCache = null;
        }
    }

9.1 HttpProxyCacheServerClients#newHttpProxyCache:進(jìn)行httpProxyCache的初始化

private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
        //HttpUrlSource 持有url,開啟HttpUrlConnetcion來獲取inputStream
        HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage, config.headerInjector);
        //緩存總以.download存在,緩存完后更名,并會進(jìn)行一次touch
        FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
        HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
        httpProxyCache.registerCacheListener(uiCacheListener);
        return httpProxyCache;
    }

9.2 HttpProxyCache#processRequest:這個方法是邊緩存邊播放的關(guān)鍵

把數(shù)據(jù)先以流的方式 寫入到緩存,在通過socket的outStream給到播放器

public void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
        //socket.getOutputStream()  就是clientSocket需要的stream(會以流的方式,先緩存到本地再給到播放器)
        OutputStream out = new BufferedOutputStream(socket.getOutputStream());
        //先添加 響應(yīng)頭
        //HTTP/1.1 200 OK
        //Accept-Ranges: bytes
        //Content-Length: 4585263
        //Content-Type: audio/mpeg
        String responseHeaders = newResponseHeaders(request);
        out.write(responseHeaders.getBytes("UTF-8"));

        long offset = request.rangeOffset;
        //判斷是否需要緩存,TODO 這里的可以進(jìn)行優(yōu)化,否則一旦seek后就可能不會在緩存了
        //要處理seek后繼續(xù)緩存就要考慮文件空洞的以及merge的事情
        if (isUseCache(request)) {
            //如果使用緩存,先把請求數(shù)據(jù)寫入緩存文件,再返回給播放器
            responseWithCache(out, offset);
        } else {
            responseWithoutCache(out, offset);
        }
    }

    private boolean isUseCache(GetRequest request) throws ProxyCacheException {
        //原始長度
        long sourceLength = source.length();
        boolean sourceLengthKnown = sourceLength > 0;
        //已經(jīng)緩存的長度
        long cacheAvailable = cache.available();
        // do not use cache for partial requests which too far from available cache. It seems user seek video.
        return !sourceLengthKnown || !request.partial || request.rangeOffset <= cacheAvailable + sourceLength * NO_CACHE_BARRIER;
    }

10. HttpProxyCache#responseWithCache: 每次從網(wǎng)絡(luò)六種讀取8192個字節(jié),先寫入到緩存文件,再從緩存文件中取出給到播放器

static final int DEFAULT_BUFFER_SIZE = 8 * 1024;

private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
        int readBytes;
        //這里的read方法,每次讀取8192個字節(jié),直到讀完為止
        while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
            out.write(buffer, 0, readBytes);
            offset += readBytes;
        }
        out.flush();
    }

11. ProxyCache#read

/**
     * 這個是邊緩存邊播放的關(guān)鍵,先往文件中寫入數(shù)據(jù),直到寫完(整個文件寫完或者8192個寫完)或者中斷。
     * buffer:一次讀取的buffer
     * offset:當(dāng)前的已有緩存的偏移
     * lenght: 一次讀取buffer的大小
     */
    public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);

        //如果沒有緩存完,并且緩存的大小小于需要緩存的大小(一次8192個字節(jié)),并且sourceReaderThread線程沒有停止
        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            //異步的讀取數(shù)據(jù), 這里為什么要這樣設(shè)計(jì)吶??(本來已經(jīng)在子線程了,為什么還要在開啟線程進(jìn)行讀取網(wǎng)絡(luò)數(shù)據(jù)吶?sourceReaderThread)
            readSourceAsync();
            //等待,最大時長1s秒鐘,每過1s中檢查是否有錯誤發(fā)生
            waitForSourceData();
            checkReadSourceErrorsCount();
        }
        //從緩存中讀取最大的8192個字節(jié)數(shù)據(jù)給到播放器
        int read = cache.read(buffer, offset, length);
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }

    private void waitForSourceData() throws ProxyCacheException {
        synchronized (wc) {
            try {
                wc.wait(1000);
            } catch (InterruptedException e) {
                throw new ProxyCacheException("Waiting source data is interrupted!", e);
            }
        }
    }

12. ProxyCache#readSourceAsync: 如果已經(jīng)還沒有停止,并且 還沒有緩存完 并且 沒有在讀取中 則開啟新的數(shù)據(jù)讀取線程 線程

 private synchronized void readSourceAsync() throws ProxyCacheException {
        boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
        //如果已經(jīng)還沒有停止,并且 還沒有緩存完 并且 沒有在讀取中 則開啟新的數(shù)據(jù)讀取線程 線程
        if (!stopped && !cache.isCompleted() && !readingInProgress) {
            //在這個SourceReaderRunnable中進(jìn)行
            sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
            sourceReaderThread.start();
        }
    }

13. 下面再來看下SourceReaderRunnable的的run中的ProxyCache#readSource
從網(wǎng)絡(luò)連接的HttpUrlConnetion拿到inputStream,不斷的讀取數(shù)據(jù)(每次8192個字節(jié)),直到讀完。

 private void readSource() {
        long sourceAvailable = -1;
        long offset = 0;
        try {
            //已經(jīng)緩存的大小
            offset = cache.available();
            //開啟 HttpUrlConnetion,獲取一個inputStream
            source.open(offset);
            //文件的大小
            sourceAvailable = source.length();
            byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
            int readBytes;
            //HttpUrlSource.read,不斷的讀取數(shù)據(jù)從inputstream
            while ((readBytes = source.read(buffer)) != -1) {
                synchronized (stopLock) {
                    if (isStopped()) {
                        return;
                    }
                    //往緩存文件中寫入數(shù)據(jù),一次寫入8192字節(jié)
                    cache.append(buffer, readBytes);
                }
                offset += readBytes;
                notifyNewCacheDataAvailable(offset, sourceAvailable);
            }
            tryComplete();
            onSourceRead();
        } catch (Throwable e) {
            //如果讀取過程中發(fā)生了錯誤,則進(jìn)行原子加操作,每過1s秒會檢查該標(biāo)記位
            readSourceErrorsCount.incrementAndGet();
            onError(e);
        } finally {
            closeSource();
            notifyNewCacheDataAvailable(offset, sourceAvailable);
        }
    }

14.HttpUrlSource#read

這里的inputStream就是HttpUrlconnection的輸入
//  
 @Override
    public int read(byte[] buffer) throws ProxyCacheException {
        if (inputStream == null) {
            throw new ProxyCacheException("Error reading data from " + sourceInfo.url + ": connection is absent!");
        }
        try {
            return inputStream.read(buffer, 0, buffer.length);
        } catch (InterruptedIOException e) {
            throw new InterruptedProxyCacheException("Reading source " + sourceInfo.url + " is interrupted", e);
        } catch (IOException e) {
            throw new ProxyCacheException("Error reading data from " + sourceInfo.url, e);
        }
    }

為什么使用HttpUrlconnection而不是OKHttp吶,這里完全可以使用OKHttp替換。可以結(jié)合自己業(yè)務(wù)的實(shí)際情況來進(jìn)行切換。


截圖來自:performance-okhttp-vs.-httpurlconnection

主要流程到這里基本上就分析完了
在請求遠(yuǎn)程url時將文件寫到本地緩存中,然后從這個本地緩存中讀數(shù)據(jù),寫入到客戶端socket里面。服務(wù)器Socket主要還是一個代理的作用,從中間攔截掉網(wǎng)絡(luò)請求,然后實(shí)現(xiàn)對socket的讀取和寫入。

五、AndroidVideoCache的不足

5.1 Seek的場景

Seek后有可能就不緩存了
我們在上一小節(jié)的4.9.2的HttpProxyCache#processRequest的isUseCache就是來判斷是否進(jìn)行緩存。

 private boolean isUseCache(GetRequest request) throws ProxyCacheException {
        //原始長度
        long sourceLength = source.length();
        boolean sourceLengthKnown = sourceLength > 0;
        //已經(jīng)緩存的長度
        long cacheAvailable = cache.available();
        // do not use cache for partial requests which too far from available cache. It seems user seek video.
        return !sourceLengthKnown || !request.partial || request.rangeOffset <= cacheAvailable + sourceLength * NO_CACHE_BARRIER;
    }

這個不符合我們的預(yù)期,seek后也應(yīng)該進(jìn)行緩存,這是緩存文件之間可能存在空洞,需要針對這種情況做些特殊處理。下面一篇我們來分析下另外一個開源項(xiàng)目是如何處理這種情況的。

5.2 預(yù)緩存(脫離播放器實(shí)現(xiàn)緩存)

提前下載,無論視頻是否下載完成,都可以將這提前下載好的部分作為視頻緩存使用
參考上一小節(jié)的4.7,進(jìn)行下擴(kuò)展。根據(jù)url創(chuàng)建GetRequest,然后調(diào)用HttpProxyCacheServerClients#processRequest即可

HttpProxyCacheServerClients clients = getClients(url);
clients.processRequest(request);

5.3 線程管理

開啟線程過多,過多線程的內(nèi)存消耗以及狀態(tài)同步是一個需要注意點(diǎn)??梢园丫€程改為線程池的方式實(shí)現(xiàn)。但是要特別并發(fā)和狀態(tài)同步。這個后面也會有單獨(dú)一篇再來分析

有哪些線程?

  1. HttpProxyCacheServer.WaitRequestsRunnable—》等待socket連接
  2. HttpProxyCacheServer.SocketProcessorRunnable—》處理單個socket連接
  3. ProxyCache.SourceReaderRunnable —>分塊(8192個字節(jié))讀取網(wǎng)絡(luò)數(shù)據(jù)流寫入到緩存文件并且返回給clientSocket 【這個線程要重點(diǎn)分析】

5.4 緩存是根據(jù)url來進(jìn)行區(qū)分,對于大的視頻,沒有進(jìn)行分片下載,節(jié)省流量

可以參考m3u8的方式,給一個視頻進(jìn)行分片。這個后面再分析另外一個開源項(xiàng)目是再來一些拆解。

5.5 AndroidVideoCache采用數(shù)據(jù)庫進(jìn)行存儲緩存的信息,可以不使用,減少IO操作

5.6 如果我們的有其他代理,那么這個socket方式拿url就會出問題,因?yàn)槲覀兡玫降囊彩且粋€代理url,所以在開發(fā)時需要考慮代理用戶提供兼容性處理。

六、資料

  1. AndroidVideoCache-視頻邊播放邊緩存的代理策略
  2. 網(wǎng)易云音樂-音視頻播放
  3. [QQ空間十億級視頻播放技術(shù)優(yōu)化揭秘王輝終稿2.key]
  4. Android MediaPlayer buffer大小
  5. Android主流視頻播放及緩存實(shí)現(xiàn)原理調(diào)研
  6. Qzone視頻下載如何做到多快好?。?/a>
  7. AndroidVideoCache優(yōu)化
  8. Android 平臺視頻邊下邊播技術(shù)

七、收獲

通過本篇的學(xué)習(xí)實(shí)踐,

  1. 理解邊下邊播的必要性以其實(shí)現(xiàn)原理
  2. 分析AndroidVideoCache源碼,從整體和重要流程上進(jìn)行拆解分析
  3. AndroidVideoCache存在的一些不足,以及對應(yīng)的方案。

感謝你的閱讀
下一篇我們對seek的場景如何實(shí)現(xiàn)邊緩存邊播放進(jìn)行分析和實(shí)現(xiàn),歡迎關(guān)注公眾號“音視頻開發(fā)之旅”,一起學(xué)習(xí)成長。
歡迎交流

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容