附件分片下載、斷點續(xù)傳

附件分片下載、斷點續(xù)傳

場景引入

在最近接觸的一個項目中碰到這樣的場景一個加密過的文件存儲在FastDFS中,在業(yè)務(wù)處理過程中需要將對應(yīng)的附件下載下來并解密之后做相應(yīng)的業(yè)務(wù)處理。

原有的處理方式是單線程模式下載并做相應(yīng)的業(yè)務(wù)處理,當(dāng)文件比較大時例如10G+的文件此時就會出現(xiàn)性能瓶頸。故此引入分片下載處理方案,開啟多個線程同步下載附件,提高下載效率。

以下示例是是剝離業(yè)務(wù)功能后寫的一個附件分片下載、斷點續(xù)傳demo 涵蓋了核心實現(xiàn)原理

附件分片下載、斷點續(xù)傳 實現(xiàn)原理

核心實現(xiàn)原理

  1. 獲取附件信息:附件大小、是否需要分片處理、詳細分片信息、附件MD5

    • 更具附件ID獲取附件信息

    • 通過分片閾值判斷是否需要進行分片處理,需要分片處理則獲取詳細的分片信息

    • 計算附件MD5或每個分片的MD5,并緩存起來(相同附件ID直接從緩存中獲取MD5)

  2. 根據(jù)步驟1獲取的信息請求附件下載接口下載附件

    • 判斷磁盤空間是否充裕

    • 根據(jù)分片信息調(diào)用下載接口下載附件,使用線程池執(zhí)行下載合理高效使用計算機處理能力

    • 比對每個分片的MD5,所有分片下載成功后合并附件(可每個分片下載成功直接進入合并附件的執(zhí)行步驟)。

  3. 分片下載完成后合并附件,并通過附件MD5校驗附件下載的一致性

  4. 相應(yīng)的業(yè)務(wù)處理

工業(yè)級代碼考慮點

  1. duild once ,Run everywhere 為適應(yīng)不同服務(wù)器公共的性能參數(shù)做到可配置:下載緩沖區(qū)大小、分片閾值、每個分片大小、下載存儲臨時目錄、線程池大小、線程池隊列大小

  2. 下載前做磁盤空間預(yù)判

  3. 保證每個分片下載的正確性(校驗分片MD5) 并保證最終合并附件的正確性

  4. 利用HTTP 206 Partial Content實現(xiàn)斷點續(xù)傳

代碼實現(xiàn)

獲取附件信息

/**
     * 獲取附件信息
     *
     * @return 附件詳細信息
     */
    @RequestMapping("/getFileDetailInfo")
    public FileDetailInfo getFileDetailInfo(String fileId) {
        FileDetailInfo fileDetailInfo = new FileDetailInfo();
        //附件id 獲取附件信息
        String path = getFilePath(fileId);
        File file = new File(path);
        if (file.exists() && file.isFile()) {
            fileDetailInfo.setSize(file.length());
            fileDetailInfo.setFilePath(path);
            fileDetailInfo.setFileName(file.getName());
            fileDetailInfo.setFileMd5(FileUtils.getFileMd5(path));
            if (file.length() > fileOperationProperties.getMultipartSizeLimit()) {
                fileDetailInfo.setMultipart(true);
                //超過分塊閾值限制 進行附件分塊
                long alreadyPartLength = 0;
                int part = 0;
                while (alreadyPartLength<file.length()) {
                    //起始分塊位置
                    long off = alreadyPartLength;
                    //已分塊的長度
                    alreadyPartLength = (alreadyPartLength + fileOperationProperties.getMultipartSize()) > file.length() ?
                            file.length() : alreadyPartLength + fileOperationProperties.getMultipartSize();
                    MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
                    multiPartFileInfo.setFileName(file.getName()+part);
                    multiPartFileInfo.setLen(alreadyPartLength - off);
                    //分塊的MD5
                    multiPartFileInfo.setFileMd5(FileUtils.getFileMd5(file,multiPartFileInfo.getFileName() ,
                                off ,multiPartFileInfo.getLen() ));
                    multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
                    part += 1;
                    multiPartFileInfo.setOff(off);

                    fileDetailInfo.addMultiPart(multiPartFileInfo);
                }
            }else{
                fileDetailInfo.setMultipart(false);
            }
        }else {
            throw new RuntimeException("操作異常!非文件或文件不存在");
        }
        return fileDetailInfo;
    }

下載功能 包含斷點續(xù)傳

 /**
     * 附件下載
     */
    @RequestMapping("/downloadFile")
    public void downloadFile(@RequestBody MultiPartFileInfo multiPartFileInfo , HttpServletResponse response , HttpServletRequest request) {
        //FIXME 優(yōu)化代碼
        File file = new File(multiPartFileInfo.getFilePath());
        if (file.exists() && file.isFile()) {
            OutputStream out = null;
            RandomAccessFile in = null;
            //下載起始位置
            long off = 0;
            int downloadSize = 0;
            try {
                response.setContentType("application/octet-stream");
                response.setHeader("Content-disposition", String.format("attachment; filename=\"%s\"",
                        new String(multiPartFileInfo.getFileName().getBytes("UTF-8"), "ISO8859-1")));

                response.setHeader("Accept-Ranges", "bytes");
                if (request.getHeader("Range") == null) {
                    response.setHeader("Content-Length", String.valueOf(multiPartFileInfo.getLen()));
                }else {
                    //解析斷點續(xù)傳
                    String range = request.getHeader("Range");
                    String[] bytes = range.replaceAll("bytes", "").split("-");
                    off = Long.parseLong(bytes[0]);
                    long end = 0;
                    if (bytes.length == 2) {
                        end = Long.parseLong(bytes[1]);
                    }
                    int length = 0;
                    if (end != 0 && end>off) {
                        length = Math.toIntExact(end - off);
                    }else{
                        length = Math.toIntExact(multiPartFileInfo.getLen() - off);
                    }
                    response.setHeader("Content-Length", String.valueOf(length));
                    downloadSize = length;
                }

                in = new RandomAccessFile(file,"rw");
                 out = response.getOutputStream();
                if (off == 0) {
                    off = multiPartFileInfo.getOff();
                }
                if (downloadSize == 0) {
                    downloadSize = Math.toIntExact(multiPartFileInfo.getLen());
                }
                byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
                int length = 0;
                //設(shè)置下載起始位置
                if (multiPartFileInfo.getOff() > 0) {
                    in.seek(off);
                }

                //預(yù)防讀取超出分塊范圍大小
                long readContentLen = 0;
                if ((readContentLen + fileOperationProperties.getReadBufLenSize()) > downloadSize) {
                    bytes = new byte[Math.toIntExact(multiPartFileInfo.getLen() - readContentLen)];
                }
                while ((length = in.read(bytes)) !=-1 ) {
                    out.write(bytes,0,length);
                    readContentLen += length;
                }
                out.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                    if (out != null) {
                        out.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }

    }

Junit下載測試用例

@Test
    public void testFileUploadNotPart() {

        try {
            //Step1 查詢分片信息
            MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.post("/fileOperation/getFileDetailInfo").param("fileId", "1"))
                    .andExpect(MockMvcResultMatchers.status().isOk())
                    .andDo(MockMvcResultHandlers.print())
                    .andReturn();
            String body = mvcResult.getResponse().getContentAsString();
            FileDetailInfo fileDetailInfo = new JSONObject().parseObject(body, FileDetailInfo.class);

            //創(chuàng)建臨時存儲文件夾
            File directFile = new File(fileOperationProperties.getTempDirect());
            if (!directFile.exists()) {
                directFile.mkdirs();
            }

            if (FileUtils.enoughFreeSpace(fileOperationProperties.getTempDirect(), (long) (fileDetailInfo.getSize() * 1.3))) {
                //Step2 下載附件
                if (fileDetailInfo.isMultipart()) {
                    //Step2.1 分片下載
                    int nThreads = fileOperationProperties.getNThreads();
                    int threadPoolQueueCapacity = fileOperationProperties.getThreadPoolQueueCapacity();
                    ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads, 0L,
                            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPoolQueueCapacity));
                    //Step2.1.1 線程池中下載
                    //FIXME 優(yōu)化:1、獲取的分片信息是個有序集合,按照一定的順序下載。-->及時處理已下載完成的分片,盡可能合理利用存儲空間
                    //FIXME 優(yōu)化:2、下載完成一個分片 合并處理一個分片  不適用阻塞
                    Queue<Future<FileUploadResultModel>> futureQueue = new ConcurrentLinkedQueue<Future<FileUploadResultModel>>();
                    for (MultiPartFileInfo multiPartFileInfo : fileDetailInfo.getMultiPartFileInfos()) {
                        Future<FileUploadResultModel> futreTask = executorService.submit(new FileJob(restTemplate,
                                multiPartFileInfo, fileOperationProperties));
                        futureQueue.add(futreTask);
                    }

                    int sizeOfSuccessPartFile = 0;
                    for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
                        FileUploadResultModel resultModel = resultModelFuture.get();
                        if (resultModel.isSuccess()) {
                            sizeOfSuccessPartFile += 1;
                        } else {
                            //下載失敗處理--記錄下載失敗原因/重試下載
                        }
                    }

                    if (sizeOfSuccessPartFile == fileDetailInfo.getMultiPartFileInfos().size()) {
                        //Step2.1.2 下載成功 合并附件
                        String filePath = fileOperationProperties.getTempDirect() + File.separator + fileDetailInfo.getFileName();
                        //預(yù)創(chuàng)建與源文件相同大小的文件
                        File file = new File(filePath);
                        if (file.exists() && file.isFile()) {
                            file.delete();
                            file.createNewFile();
                        } else {
                            file.createNewFile();
                        }
                        //FIXME 此處使用多線程合并文件,提高合并處理效率
                        RandomAccessFile rFile = new RandomAccessFile(file, "rw");
                        rFile.setLength(fileDetailInfo.getSize());
                        for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
                            FileUploadResultModel fileUploadResultModel = resultModelFuture.get();
                            MultiPartFileInfo multiPartFileInfo = fileUploadResultModel.getMultiPartFileInfo();
                            //設(shè)置寫入起始位置
                            rFile.seek(multiPartFileInfo.getOff());
                            byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
                            int length = 0;
                            File tempFile = new File(fileUploadResultModel.getLocalFilePath());
                            InputStream TempFileInputStream = new FileInputStream(tempFile);
                            while ((length = TempFileInputStream.read(bytes)) != -1) {
                                rFile.write(bytes, 0, length);
                            }
                            TempFileInputStream.close();
                            tempFile.delete();
                        }
                        //Step2.1.3 校驗附件
                        if (FileUtils.checkFile(filePath, fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
                            log.info("附件下載成功!附件本地目錄 {}", filePath);
                        }

                    }
                } else {
                    //step2.2 整個附件下載
                    MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
                    multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
                    multiPartFileInfo.setOff(0);
                    multiPartFileInfo.setFileName(fileDetailInfo.getFileName());
                    multiPartFileInfo.setFileMd5(fileDetailInfo.getFileMd5());
                    multiPartFileInfo.setLen(fileDetailInfo.getSize());
                    //下載附件
                    FileJob fileJob = new FileJob(restTemplate,
                            multiPartFileInfo, fileOperationProperties);
                    FileUploadResultModel resultModel = fileJob.uploadFile();
                    //step3 校驗附件MD5
                    if (FileUtils.checkFile(resultModel.getLocalFilePath(), fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
                        log.info("附件下載成功!附件本地目錄 {}", resultModel.getLocalFilePath());
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

結(jié)尾

以上是整個分片下載、斷點續(xù)傳的實現(xiàn)原理及其實現(xiàn)。

在上訴實現(xiàn)中還有可優(yōu)化的點

  • 獲取的分片信息時使用有序集合,按照一定的順序下載。-->及時處理已下載完成的分片,盡可能合理利用存儲空間

  • 下載完成一個分片 合并處理一個分片 不使用阻塞,減少磁盤空間的占用

  • 可使用多線程處理附件合并

源碼地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容