AndroidVideoCache-視頻邊播放邊緩存的代理策略

視頻現(xiàn)狀

現(xiàn)在視頻播放的需求越來越常見,就和16年上半年的直播一樣,似乎不加個視頻已經(jīng)不是個正常的APP了,連微信朋友圈都支持上傳小視頻,更別談以視頻為本命的一系列APP。
視頻方面主要是兩塊,一個是視頻錄制,這個已經(jīng)翻過一篇比較全的文章,再加上google開源的 grafika ,可以在踩坑時減少很多障礙,不過錄制這塊適配是大問題,需要不斷調(diào)整。
另一個方面就是視頻播放,這方面的輪子比上面錄制就多太多了,無論是google(開源良心)的 ExoPlayer,以及b站的 ijkplayer,還是一些其他的,基本上滿足了正常的需求。
當(dāng)然,今天這篇文章這兩個并不是主角,在視頻播放這種極其容易造成卡頓,跳幀等影響用戶體驗的需求上,如何優(yōu)化體驗是一件十分重要的事情。一般情況比較正常的就直接播放,一句設(shè)置數(shù)據(jù)源的代碼了事。但是要為了用戶體驗考慮??v觀這些有視頻功能的APP,主要分為兩類,一種是直接下載然后再播放,比如微信,微信的小視頻錄制壓縮比比較好,一個視頻大概幾百k,所以比較適合先全量下載,然后再播放的模式,另一種自然就是邊播放邊緩存,這是比較多的策略,大部分的視頻都是比較大的,等全部下完,黃花菜都涼了。

基本原理

既然是采取邊播放邊緩存的策略,比較逗的方式就是一邊正常的給videoview設(shè)置數(shù)據(jù)源,一邊開一個線程去下載文件,下完后就可以使用本地緩存了,這個方式是比較逗的,相當(dāng)于兩份網(wǎng)絡(luò)請求,大大的拖慢了用戶體驗。所以我們會想如何只有一份請求但是能夠操作這些數(shù)據(jù)邊讀邊寫呢,這個就是 AndroidVideoCache 所做的事情。
AndroidVideoCache 通過代理的策略實現(xiàn)一個中間層將我們的網(wǎng)絡(luò)請求轉(zhuǎn)移到本地實現(xiàn)的代理服務(wù)器上,這樣我們真正請求的數(shù)據(jù)就會被代理拿到,這樣代理一邊向本地寫入數(shù)據(jù),一邊根據(jù)我們需要的數(shù)據(jù)看是讀網(wǎng)絡(luò)數(shù)據(jù)還是讀本地緩存數(shù)據(jù)再提供給我們,真正做到了數(shù)據(jù)的復(fù)用。
這就和我們使用的抓包軟件性質(zhì)一樣,上個原理圖更清晰

代理服務(wù)器策略

從使用開始

這里在如何使用上直接搬運作者自己的readme。
首先AS用戶一行代碼在gradle中導(dǎo)包

dependencies {
    compile 'com.danikula:videocache:2.6.4'
}

然后在全局初始化一個本地代理服務(wù)器,這里選擇在Application的實現(xiàn)類中,至于這個類是干什么的,后面會詳細分析

public class App extends Application {

    private HttpProxyCacheServer proxy;

    public static HttpProxyCacheServer getProxy(Context context) {
        App app = (App) context.getApplicationContext();
        return app.proxy == null ? (app.proxy = app.newProxy()) : app.proxy;
    }

    private HttpProxyCacheServer newProxy() {
        return new HttpProxyCacheServer(this);
    }
}

有了代理服務(wù)器,我們就可以使用了,把自己的網(wǎng)絡(luò)視頻url用提供的方法替換成另一個URL

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);

    HttpProxyCacheServer proxy = getProxy();
    String proxyUrl = proxy.getProxyUrl(VIDEO_URL);
    videoView.setVideoPath(proxyUrl);
}

這樣就已經(jīng)可以正常使用了,當(dāng)然這個庫提供了更多的可以自定義的地方,比如緩存的文件最大大小,以及文件個數(shù),緩存采取的是LruCache的方法,對于老文件在達到上限后會自動清理。

private HttpProxyCacheServer newProxy() {
    return new HttpProxyCacheServer.Builder(this)
            .maxCacheSize(1024 * 1024 * 1024)       // 1 Gb for cache
            .build();
}

private HttpProxyCacheServer newProxy() {
    return new HttpProxyCacheServer.Builder(this)
            .maxCacheFilesCount(20)
            .build();
}

除了這個,還有一個就是生成的文件名,默認(rèn)是使用的MD5方式生成key,考慮到一些業(yè)務(wù)邏輯,我們也可以繼承一個 FileNameGenerator 來實現(xiàn)自己的策略

public class MyFileNameGenerator implements FileNameGenerator {

    // Urls contain mutable parts (parameter 'sessionToken') and stable video's id (parameter 'videoId').
    // e. g. http://example.com?videoId=abcqaz&sessionToken=xyz987
    public String generate(String url) {
        Uri uri = Uri.parse(url);
        String videoId = uri.getQueryParameter("videoId");
        return videoId + ".mp4";
    }
}

...
HttpProxyCacheServer proxy = HttpProxyCacheServer.Builder(context)
    .fileNameGenerator(new MyFileNameGenerator())
    .build()

到這里基本上整個AndroidVideoCache的使用就沒什么問題了。

具體分析

知道了怎么使用后,我們繼續(xù)往下走,看看是怎么實現(xiàn)的,這里就不分析后面的那些LruCache這些緩存策略,生成key之類的邏輯了,和一般的網(wǎng)絡(luò)請求里的都大同小異,我們直接看這個代碼最有含金量的地方。
前面在使用中,全局實例化過一個代理服務(wù)器,就先從這里開始

HttpProxyCacheServer.java
    private static final String PROXY_HOST = "127.0.0.1";
    private final Object clientsLock = new Object();
    private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
    private final Map<String, HttpProxyCacheServerClients> clientsMap = new ConcurrentHashMap<>();
    private final ServerSocket serverSocket;
    private final int port;
    private final Thread waitConnectionThread;
    private final Config config;
    private final Pinger pinger;

    private HttpProxyCacheServer(Config config) {
        this.config = checkNotNull(config);
        try {
            InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
            this.serverSocket = new ServerSocket(0, 8, inetAddress);
            this.port = serverSocket.getLocalPort();
            CountDownLatch startSignal = new CountDownLatch(1);
            this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
            this.waitConnectionThread.start();
            startSignal.await(); // freeze thread, wait for server starts
            this.pinger = new Pinger(PROXY_HOST, port);
            LOG.info("Proxy cache server started. Is it alive? " + isAlive());
        } catch (IOException | InterruptedException e) {
            socketProcessor.shutdown();
            throw new IllegalStateException("Error starting local proxy server", e);
        }
    }

這個構(gòu)造函數(shù)一眼看過去就很清楚了,參數(shù)就是前面那些自定義配置,這里使用的是 127.0.0.1,這個就是localhost的ip也就是本地ip,創(chuàng)建了一個 ServerSocket ,隨機分配了一個端口,這里通過 getLocalPort 拿到了這個服務(wù)器端口,后面用來通信。
這里出現(xiàn)了一個線程 WaitRequestsRunnable 并且調(diào)用了 start 方法,繼續(xù)跟進去看這個線程

        @Override
        public void run() {
            startSignal.countDown();
            waitForRequest();
        }

信號量主要是為了保證這個run方法先執(zhí)行,繼續(xù)看這個 waitForRequest 方法

    private void waitForRequest() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Socket socket = serverSocket.accept();
                LOG.debug("Accept new socket " + socket);
                socketProcessor.submit(new SocketProcessorRunnable(socket));
            }
        } catch (IOException e) {
            onError(new ProxyCacheException("Error during waiting connection", e));
        }
    }

好了,到這里服務(wù)器socket一套比較清晰了,整理一下就是先構(gòu)建一個全局的一個本地代理服務(wù)器 ServerSocket,指定一個隨機端口,然后新開一個線程,在線程的 run 方法里,通過accept() 方法監(jiān)聽這個服務(wù)器socket的入站連接,accept() 方法會一直阻塞,直到有一個客戶端嘗試建立連接。
現(xiàn)在有了服務(wù)器,然后就是客戶端的socket,先從使用時代理替換url地方開始看

    HttpProxyCacheServer proxy = getProxy();
    String proxyUrl = proxy.getProxyUrl(VIDEO_URL);

這里使用的是HttpProxyCacheServer 中的 getProxyUrl 方法

    public String getProxyUrl(String url, boolean allowCachedFileUri) {
        if (allowCachedFileUri && isCached(url)) {
            File cacheFile = getCacheFile(url);
            touchFileSafely(cacheFile);
            return Uri.fromFile(cacheFile).toString();
        }
        return isAlive() ? appendToProxyUrl(url) : url;
    }

整個策略就是如果本地已經(jīng)緩存了,就直接那本地地址的Uri,并且touch一下文件,把時間更新后最新,因為后面LruCache是根據(jù)文件被訪問的時間進行排序的,如果文件沒有被緩存那么就會先走一下 isAlive() 方法, 這里會ping一下目標(biāo)url,確保url是一個有效的,如果用戶是通過代理訪問的話,就會ping不通,這樣就還是原生url,正常情況都會進入這個 appendToProxyUrl 方法里面。

    private String appendToProxyUrl(String url) {
        return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));
    }

比較直接,這里拼接出來一個帶有127.0.0.1目標(biāo)地址及端口并攜帶原url的新地址,這個請求的話就會被我們的服務(wù)器socket監(jiān)聽到,也就是前面的accept() 會繼續(xù)往下走,這里接收到的socket就是我們所請求的客戶端socket

 socketProcessor.submit(new SocketProcessorRunnable(socket));

整個socket會被包裹成一個runnable,發(fā)配給線程池。這個 runnable 的 run 方法中所做的事情就是調(diào)用了一個方法

    private void processSocket(Socket socket) {
        try {
            GetRequest request = GetRequest.read(socket.getInputStream());
            LOG.debug("Request to cache proxy:" + request);
            String url = ProxyCacheUtils.decode(request.uri);
            if (pinger.isPingRequest(url)) {
                pinger.responseToPing(socket);
            } else {
                HttpProxyCacheServerClients clients = getClients(url);
                clients.processRequest(request, socket);
            }
        } catch (SocketException e) {
            // There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
            // So just to prevent log flooding don't log stacktrace
            LOG.debug("Closing socket… Socket is closed by client.");
        } catch (ProxyCacheException | IOException e) {
            onError(new ProxyCacheException("Error processing request", e));
        } finally {
            releaseSocket(socket);
            LOG.debug("Opened connections: " + getClientsCount());
        }
    }

前面ping的過程其實也被會這個socket監(jiān)聽并且走進來這一段,不過這個比較簡單,就不分析了,我們直接看里面的 else 框內(nèi)的代碼,這里一個 getClients 就是一個ConcurrentHashMap,重復(fù)url返回的是同一個HttpProxyCacheServerClients ,

 HttpProxyCacheServerClients clients = clientsMap.get(url);
            if (clients == null) {
                clients = new HttpProxyCacheServerClients(url, config);
                clientsMap.put(url, clients);
            }
            return clients;

如果是第一次就會根據(jù)url構(gòu)建出一個HttpProxyCacheServerClients并被put到ConcurrentHashMap中,真正的操作都在這個客戶端的 processRequest 操作中,并且傳遞過去一個是request,這是一個GetRequest 對象,是一個url和rangeoffset以及partial的包裝類,另一個就是客戶端socket。

    public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
        startProcessRequest();
        try {
            clientsCount.incrementAndGet();
            proxyCache.processRequest(request, socket);
        } finally {
            finishProcessRequest();
        }
    }

這里 startProcessRequest 方法會得到一個HttpProxyCache 類

    private synchronized void startProcessRequest() throws ProxyCacheException {
        proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
    }
    private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
        HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage);
        FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
        HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
        httpProxyCache.registerCacheListener(uiCacheListener);
        return httpProxyCache;
    }

在這里,我們構(gòu)建一個基于原生url的HttpUrlSource ,這個類負責(zé)持有url,并開啟HttpURLConnection來獲取一個InputStream,這樣才能通過這個輸入流讀數(shù)據(jù),同時也創(chuàng)建了一個本地的臨時文件,一個以.download結(jié)尾的臨時文件,這個文件在成功下載完后的 FileCache 類中的 complete 方法中被更名。
我們構(gòu)建了一個HttpProxyCache 類,也注冊了一個CacheListener,這個listener可以用來回調(diào)進度。
做完這一切之后,然后這個HttpProxyCache 對象就開始 processRequest ,

    public void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
        OutputStream out = new BufferedOutputStream(socket.getOutputStream());
        String responseHeaders = newResponseHeaders(request);
        out.write(responseHeaders.getBytes("UTF-8"));

        long offset = request.rangeOffset;
        if (isUseCache(request)) {
            responseWithCache(out, offset);
        } else {
            responseWithoutCache(out, offset);
        }
    }

這里我們用傳過來的那個客戶端socket,拿到一個OutputStream輸出流,這樣我們就能往里面寫數(shù)據(jù)了,如果不用緩存就走常規(guī)邏輯,這里我們只看走緩存的行為。

    private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
        int readBytes;
        while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
            out.write(buffer, 0, readBytes);
            offset += readBytes;
        }
        out.flush();
    }

構(gòu)造一個8 * 1024字節(jié)的buffer,這里的read方法,實際上是調(diào)用的父類ProxyCache的實現(xiàn)

    public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);

        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            readSourceAsync();
            waitForSourceData();
            checkReadSourceErrorsCount();
        }
        int read = cache.read(buffer, offset, length);
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }

在while循環(huán)里面,開啟了一個新的線程sourceReaderThread,其中封裝了一個SourceReaderRunnable的Runnable,這個異步線程用來給cache,也就是本地文件寫數(shù)據(jù),同時還更新一下當(dāng)前的緩存進度

        int sourceAvailable = -1;
        int offset = 0;
        try {
            offset = cache.available();
            source.open(offset);
            sourceAvailable = source.length();
            byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
            int readBytes;
            while ((readBytes = source.read(buffer)) != -1) {
                synchronized (stopLock) {
                    if (isStopped()) {
                        return;
                    }
                    cache.append(buffer, readBytes);
                }
                offset += readBytes;
                notifyNewCacheDataAvailable(offset, sourceAvailable);
            }
            tryComplete();
            onSourceRead();

同時我們的另一個線程也會從cache中去讀數(shù)據(jù),在緩存結(jié)束后同樣也會發(fā)送一個通知通知自己已經(jīng)緩存完了,回調(diào)由外界控制。
以上差不多就是總體代碼,這里我們在請求遠程URL時將文件寫到本地fileCache中,然后讀數(shù)據(jù)從本地讀取,寫入到客戶端socket里面,服務(wù)器Socket主要還是一個代理的作用,從中間攔截掉網(wǎng)絡(luò)請求,然后實現(xiàn)對socket的讀取和寫入。

后記

整個分析為了節(jié)約篇幅,盡量的是描述一些其中比較重要的片段,源碼文件還是比較多的,這里不能詳述,對這種代理方式感興趣的可以在自己詳細閱讀一下源碼,畢竟源碼面前,了無秘密。
這個項目用起來有一點問題,是因為如果我們的APP設(shè)置了代理,那么這個socket方式拿url就會出問題,因為我們拿到的也是一個代理url,所以在開發(fā)時需要考慮代理用戶提供兼容性處理。
另外這種本地代理服務(wù)器的策略也能為我們提供一些不一樣的思路,既然視頻可行那么音頻文件呢,進而推導(dǎo)到普通的網(wǎng)絡(luò)請求,json文件?;谶@樣一套思路,在其基礎(chǔ)上甚至能夠?qū)崿F(xiàn)一套離線緩存加載的策略,當(dāng)然這取決于我們自身的服務(wù)器架構(gòu),服務(wù)端URL策略。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,688評論 19 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,319評論 25 708
  • 最近在把一個重構(gòu)完的項目放到beta環(huán)境測試時,順帶實踐了一下Java Mission Control(簡稱JMC...
    zcliu閱讀 21,473評論 0 5

友情鏈接更多精彩內(nèi)容