附件分片下載、斷點續(xù)傳
場景引入
在最近接觸的一個項目中碰到這樣的場景一個加密過的文件存儲在FastDFS中,在業(yè)務(wù)處理過程中需要將對應(yīng)的附件下載下來并解密之后做相應(yīng)的業(yè)務(wù)處理。
原有的處理方式是單線程模式下載并做相應(yīng)的業(yè)務(wù)處理,當(dāng)文件比較大時例如10G+的文件此時就會出現(xiàn)性能瓶頸。故此引入分片下載處理方案,開啟多個線程同步下載附件,提高下載效率。
以下示例是是剝離業(yè)務(wù)功能后寫的一個附件分片下載、斷點續(xù)傳demo 涵蓋了核心實現(xiàn)原理
附件分片下載、斷點續(xù)傳 實現(xiàn)原理
核心實現(xiàn)原理
-
獲取附件信息:附件大小、是否需要分片處理、詳細分片信息、
附件MD5更具附件ID獲取附件信息
通過分片閾值判斷是否需要進行分片處理,需要分片處理則獲取詳細的分片信息
計算
附件MD5或每個分片的MD5,并緩存起來(相同附件ID直接從緩存中獲取MD5)
-
根據(jù)步驟
1獲取的信息請求附件下載接口下載附件判斷磁盤空間是否充裕
根據(jù)分片信息調(diào)用下載接口下載附件,使用線程池執(zhí)行下載合理高效使用計算機處理能力
比對每個分片的
MD5,所有分片下載成功后合并附件(可每個分片下載成功直接進入合并附件的執(zhí)行步驟)。
分片下載完成后合并附件,并通過
附件MD5校驗附件下載的一致性相應(yīng)的業(yè)務(wù)處理
工業(yè)級代碼考慮點
duild once ,Run everywhere為適應(yīng)不同服務(wù)器公共的性能參數(shù)做到可配置:下載緩沖區(qū)大小、分片閾值、每個分片大小、下載存儲臨時目錄、線程池大小、線程池隊列大小下載前做磁盤空間預(yù)判
保證每個分片下載的正確性(
校驗分片MD5) 并保證最終合并附件的正確性利用
HTTP 206 Partial Content實現(xiàn)斷點續(xù)傳
代碼實現(xiàn)
獲取附件信息
/**
* 獲取附件信息
*
* @return 附件詳細信息
*/
@RequestMapping("/getFileDetailInfo")
public FileDetailInfo getFileDetailInfo(String fileId) {
FileDetailInfo fileDetailInfo = new FileDetailInfo();
//附件id 獲取附件信息
String path = getFilePath(fileId);
File file = new File(path);
if (file.exists() && file.isFile()) {
fileDetailInfo.setSize(file.length());
fileDetailInfo.setFilePath(path);
fileDetailInfo.setFileName(file.getName());
fileDetailInfo.setFileMd5(FileUtils.getFileMd5(path));
if (file.length() > fileOperationProperties.getMultipartSizeLimit()) {
fileDetailInfo.setMultipart(true);
//超過分塊閾值限制 進行附件分塊
long alreadyPartLength = 0;
int part = 0;
while (alreadyPartLength<file.length()) {
//起始分塊位置
long off = alreadyPartLength;
//已分塊的長度
alreadyPartLength = (alreadyPartLength + fileOperationProperties.getMultipartSize()) > file.length() ?
file.length() : alreadyPartLength + fileOperationProperties.getMultipartSize();
MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
multiPartFileInfo.setFileName(file.getName()+part);
multiPartFileInfo.setLen(alreadyPartLength - off);
//分塊的MD5
multiPartFileInfo.setFileMd5(FileUtils.getFileMd5(file,multiPartFileInfo.getFileName() ,
off ,multiPartFileInfo.getLen() ));
multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
part += 1;
multiPartFileInfo.setOff(off);
fileDetailInfo.addMultiPart(multiPartFileInfo);
}
}else{
fileDetailInfo.setMultipart(false);
}
}else {
throw new RuntimeException("操作異常!非文件或文件不存在");
}
return fileDetailInfo;
}
下載功能 包含斷點續(xù)傳
/**
* 附件下載
*/
@RequestMapping("/downloadFile")
public void downloadFile(@RequestBody MultiPartFileInfo multiPartFileInfo , HttpServletResponse response , HttpServletRequest request) {
//FIXME 優(yōu)化代碼
File file = new File(multiPartFileInfo.getFilePath());
if (file.exists() && file.isFile()) {
OutputStream out = null;
RandomAccessFile in = null;
//下載起始位置
long off = 0;
int downloadSize = 0;
try {
response.setContentType("application/octet-stream");
response.setHeader("Content-disposition", String.format("attachment; filename=\"%s\"",
new String(multiPartFileInfo.getFileName().getBytes("UTF-8"), "ISO8859-1")));
response.setHeader("Accept-Ranges", "bytes");
if (request.getHeader("Range") == null) {
response.setHeader("Content-Length", String.valueOf(multiPartFileInfo.getLen()));
}else {
//解析斷點續(xù)傳
String range = request.getHeader("Range");
String[] bytes = range.replaceAll("bytes", "").split("-");
off = Long.parseLong(bytes[0]);
long end = 0;
if (bytes.length == 2) {
end = Long.parseLong(bytes[1]);
}
int length = 0;
if (end != 0 && end>off) {
length = Math.toIntExact(end - off);
}else{
length = Math.toIntExact(multiPartFileInfo.getLen() - off);
}
response.setHeader("Content-Length", String.valueOf(length));
downloadSize = length;
}
in = new RandomAccessFile(file,"rw");
out = response.getOutputStream();
if (off == 0) {
off = multiPartFileInfo.getOff();
}
if (downloadSize == 0) {
downloadSize = Math.toIntExact(multiPartFileInfo.getLen());
}
byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
int length = 0;
//設(shè)置下載起始位置
if (multiPartFileInfo.getOff() > 0) {
in.seek(off);
}
//預(yù)防讀取超出分塊范圍大小
long readContentLen = 0;
if ((readContentLen + fileOperationProperties.getReadBufLenSize()) > downloadSize) {
bytes = new byte[Math.toIntExact(multiPartFileInfo.getLen() - readContentLen)];
}
while ((length = in.read(bytes)) !=-1 ) {
out.write(bytes,0,length);
readContentLen += length;
}
out.flush();
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Junit下載測試用例
@Test
public void testFileUploadNotPart() {
try {
//Step1 查詢分片信息
MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.post("/fileOperation/getFileDetailInfo").param("fileId", "1"))
.andExpect(MockMvcResultMatchers.status().isOk())
.andDo(MockMvcResultHandlers.print())
.andReturn();
String body = mvcResult.getResponse().getContentAsString();
FileDetailInfo fileDetailInfo = new JSONObject().parseObject(body, FileDetailInfo.class);
//創(chuàng)建臨時存儲文件夾
File directFile = new File(fileOperationProperties.getTempDirect());
if (!directFile.exists()) {
directFile.mkdirs();
}
if (FileUtils.enoughFreeSpace(fileOperationProperties.getTempDirect(), (long) (fileDetailInfo.getSize() * 1.3))) {
//Step2 下載附件
if (fileDetailInfo.isMultipart()) {
//Step2.1 分片下載
int nThreads = fileOperationProperties.getNThreads();
int threadPoolQueueCapacity = fileOperationProperties.getThreadPoolQueueCapacity();
ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPoolQueueCapacity));
//Step2.1.1 線程池中下載
//FIXME 優(yōu)化:1、獲取的分片信息是個有序集合,按照一定的順序下載。-->及時處理已下載完成的分片,盡可能合理利用存儲空間
//FIXME 優(yōu)化:2、下載完成一個分片 合并處理一個分片 不適用阻塞
Queue<Future<FileUploadResultModel>> futureQueue = new ConcurrentLinkedQueue<Future<FileUploadResultModel>>();
for (MultiPartFileInfo multiPartFileInfo : fileDetailInfo.getMultiPartFileInfos()) {
Future<FileUploadResultModel> futreTask = executorService.submit(new FileJob(restTemplate,
multiPartFileInfo, fileOperationProperties));
futureQueue.add(futreTask);
}
int sizeOfSuccessPartFile = 0;
for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
FileUploadResultModel resultModel = resultModelFuture.get();
if (resultModel.isSuccess()) {
sizeOfSuccessPartFile += 1;
} else {
//下載失敗處理--記錄下載失敗原因/重試下載
}
}
if (sizeOfSuccessPartFile == fileDetailInfo.getMultiPartFileInfos().size()) {
//Step2.1.2 下載成功 合并附件
String filePath = fileOperationProperties.getTempDirect() + File.separator + fileDetailInfo.getFileName();
//預(yù)創(chuàng)建與源文件相同大小的文件
File file = new File(filePath);
if (file.exists() && file.isFile()) {
file.delete();
file.createNewFile();
} else {
file.createNewFile();
}
//FIXME 此處使用多線程合并文件,提高合并處理效率
RandomAccessFile rFile = new RandomAccessFile(file, "rw");
rFile.setLength(fileDetailInfo.getSize());
for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
FileUploadResultModel fileUploadResultModel = resultModelFuture.get();
MultiPartFileInfo multiPartFileInfo = fileUploadResultModel.getMultiPartFileInfo();
//設(shè)置寫入起始位置
rFile.seek(multiPartFileInfo.getOff());
byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
int length = 0;
File tempFile = new File(fileUploadResultModel.getLocalFilePath());
InputStream TempFileInputStream = new FileInputStream(tempFile);
while ((length = TempFileInputStream.read(bytes)) != -1) {
rFile.write(bytes, 0, length);
}
TempFileInputStream.close();
tempFile.delete();
}
//Step2.1.3 校驗附件
if (FileUtils.checkFile(filePath, fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
log.info("附件下載成功!附件本地目錄 {}", filePath);
}
}
} else {
//step2.2 整個附件下載
MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
multiPartFileInfo.setOff(0);
multiPartFileInfo.setFileName(fileDetailInfo.getFileName());
multiPartFileInfo.setFileMd5(fileDetailInfo.getFileMd5());
multiPartFileInfo.setLen(fileDetailInfo.getSize());
//下載附件
FileJob fileJob = new FileJob(restTemplate,
multiPartFileInfo, fileOperationProperties);
FileUploadResultModel resultModel = fileJob.uploadFile();
//step3 校驗附件MD5
if (FileUtils.checkFile(resultModel.getLocalFilePath(), fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
log.info("附件下載成功!附件本地目錄 {}", resultModel.getLocalFilePath());
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
結(jié)尾
以上是整個分片下載、斷點續(xù)傳的實現(xiàn)原理及其實現(xiàn)。
在上訴實現(xiàn)中還有可優(yōu)化的點
獲取的分片信息時使用有序集合,按照一定的順序下載。-->及時處理已下載完成的分片,盡可能合理利用存儲空間
下載完成一個分片 合并處理一個分片 不使用阻塞,減少磁盤空間的占用
可使用多線程處理附件合并