原文轉載自「劉悅的技術博客」https://v3u.cn/a_id_218
分治算法是一種很古老但很務實的方法。本意即使將一個較大的整體打碎分成小的局部,這樣每個小的局部都不足以對抗大的整體。戰(zhàn)國時期,秦國破壞合縱的連橫即是一種分而治之的手段;十九世紀,比利時殖民者占領盧旺達, 將盧旺達的種族分為胡圖族與圖西族,以圖進行分裂控制,莫不如是。
21世紀,人們往往會在Leetcode平臺上刷分治算法題,但事實上,從工業(yè)角度上來看,算法如果不和實際業(yè)務場景相結合,算法就永遠是虛無縹緲的存在,它只會出現(xiàn)在開發(fā)者的某一次不經(jīng)意的面試中,而真實的算法,并不是虛空的,它應該能幫助我們解決實際問題,是的,它應該落地成為實體。
大文件分片上傳就是這樣一個契合分治算法的場景,現(xiàn)而今,視頻文件的體積越來越大,高清視頻體積大概2-4g不等,但4K視頻的分辨率是標準高清的四倍,需要四倍的存儲空間——只需兩到三分鐘的未壓縮4K 電影,或者電影預告片的長度,就可以達到500GB。 8K視頻文件更是大得難以想象,而現(xiàn)在12K正在出現(xiàn),如此巨大的文件,該怎樣設計一套合理的數(shù)據(jù)傳輸方案?這里我們以前后端分離項目為例,前端使用Vue.js3.0配合ui庫Ant-desgin,后端采用并發(fā)異步框架Tornado實現(xiàn)大文件的分片無阻塞傳輸與異步IO寫入服務。
前端分片
首先,安裝Vue3.0以上版本:
npm install -g @vue/cli
安裝異步請求庫axios:
npm install axios --save
隨后,安裝Ant-desgin:
npm i --save ant-design-vue@next -S
Ant-desgin雖然因為曾經(jīng)的圣誕節(jié)“彩蛋門”事件而聲名狼藉,但客觀地說,它依然是業(yè)界不可多得的優(yōu)秀UI框架之一。
接著在項目程序入口文件引入使用:
import { createApp } from 'vue'
import App from './App.vue'
import { router } from './router/index'
import axios from 'axios'
import qs from 'qs'
import Antd from 'ant-design-vue';
import 'ant-design-vue/dist/antd.css';
const app = createApp(App)
app.config.globalProperties.axios = axios;
app.config.globalProperties.upload_dir = "https://localhost/static/";
app.config.globalProperties.weburl = "http://localhost:8000";
app.use(router);
app.use(Antd);
app.mount('#app')
隨后,參照Ant-desgin官方文檔:https://antdv.com/components/overview-cn 構建上傳控件:
<a-upload
@change="fileupload"
:before-upload="beforeUpload"
>
<a-button>
<upload-outlined></upload-outlined>
上傳文件
</a-button>
</a-upload>
注意這里需要將綁定的before-upload強制返回false,設置為手動上傳:
beforeUpload:function(file){
return false;
}
接著聲明分片方法:
fileupload:function(file){
var size = file.file.size;//總大小
var shardSize = 200 * 1024; //分片大小
this.shardCount = Math.ceil(size / shardSize); //總片數(shù)
console.log(this.shardCount);
for (var i = 0; i < this.shardCount; ++i) {
//計算每一片的起始與結束位置
var start = i * shardSize;
var end = Math.min(size, start + shardSize);
var tinyfile = file.file.slice(start, end);
let data = new FormData();
data.append('file', tinyfile);
data.append('count',i);
data.append('filename',file.file.name);
const axiosInstance = this.axios.create({withCredentials: false});
axiosInstance({
method: 'POST',
url:'http://localhost:8000/upload/', //上傳地址
data:data
}).then(data =>{
this.finished += 1;
console.log(this.finished);
if(this.finished == this.shardCount){
this.mergeupload(file.file.name);
}
}).catch(function(err) {
//上傳失敗
});
}
}
具體分片邏輯是,大文件總體積按照單片體積的大小做除法并向上取整,獲取到文件的分片個數(shù),這里為了測試方便,將單片體積設置為200kb,可以隨時做修改。
隨后,分片過程中使用Math.min方法計算每一片的起始和結束位置,再通過slice方法進行切片操作,最后將分片的下標、文件名、以及分片本體異步發(fā)送到后臺。
當所有的分片請求都發(fā)送完畢后,封裝分片合并方法,請求后端發(fā)起合并分片操作:
mergeupload:function(filename){
this.myaxios(this.weburl+"/upload/","put",{"filename":filename}).then(data =>{
console.log(data);
});
}
至此,前端分片邏輯就完成了。
后端異步IO寫入
為了避免同步寫入引起的阻塞,安裝aiofiles庫:
pip3 install aiofiles
aiofiles用于處理asyncio應用程序中的本地磁盤文件,配合Tornado的異步非阻塞機制,可以有效的提升文件寫入效率:
import aiofiles
# 分片上傳
class SliceUploadHandler(BaseHandler):
async def post(self):
file = self.request.files["file"][0]
filename = self.get_argument("filename")
count = self.get_argument("count")
filename = '%s_%s' % (filename,count) # 構成該分片唯一標識符
contents = file['body'] #異步讀取文件
async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:
await f.write(contents)
return {"filename": file.filename,"errcode":0}
這里后端獲取到分片實體、文件名、以及分片標識后,將分片文件以文件名_分片標識的格式異步寫入到系統(tǒng)目錄中,以一張378kb大小的png圖片為例,分片文件應該順序為200kb和178kb,如圖所示:
[圖片上傳失敗...(image-1affef-1658759510726)]
當分片文件都寫入成功后,觸發(fā)分片合并接口:
import aiofiles
# 分片上傳
class SliceUploadHandler(BaseHandler):
async def post(self):
file = self.request.files["file"][0]
filename = self.get_argument("filename")
count = self.get_argument("count")
filename = '%s_%s' % (filename,count) # 構成該分片唯一標識符
contents = file['body'] #異步讀取文件
async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:
await f.write(contents)
return {"filename": file.filename,"errcode":0}
async def put(self):
filename = self.get_argument("filename")
chunk = 0
async with aiofiles.open('./static/uploads/%s' % filename,'ab') as target_file:
while True:
try:
source_file = open('./static/uploads/%s_%s' % (filename,chunk), 'rb')
await target_file.write(source_file.read())
source_file.close()
except Exception as e:
print(str(e))
break
chunk = chunk + 1
self.finish({"msg":"ok","errcode":0})
這里通過文件名進行尋址,隨后遍歷合并,注意句柄寫入模式為增量字節(jié)碼寫入,否則會逐層將分片文件覆蓋,同時也兼具了斷點續(xù)寫的功能。有些邏輯會將分片個數(shù)傳入后端,讓后端判斷分片合并個數(shù),其實并不需要,因為如果尋址失敗,會自動拋出異常并且跳出循環(huán),從而節(jié)約了一個參數(shù)的帶寬占用。
輪詢服務
在真實的超大文件傳輸場景中,由于網(wǎng)絡或者其他因素,很可能導致分片任務中斷,此時就需要通過降級快速響應,返回托底數(shù)據(jù),避免用戶的長時間等待,這里我們使用基于Tornado的Apscheduler庫來調度分片任務:
pip install apscheduler
隨后編寫job.py輪詢服務文件:
from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler
scheduler = None
job_ids = []
# 初始化
def init_scheduler():
global scheduler
scheduler = TornadoScheduler()
scheduler.start()
print('[Scheduler Init]APScheduler has been started')
# 要執(zhí)行的定時任務在這里
def task1(options):
print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))
class MainHandler(RequestHandler):
def get(self):
self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')
class SchedulerHandler(RequestHandler):
def get(self):
global job_ids
job_id = self.get_query_argument('job_id', None)
action = self.get_query_argument('action', None)
if job_id:
# add
if 'add' == action:
if job_id not in job_ids:
job_ids.append(job_id)
scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
self.write('[TASK ADDED] - {}'.format(job_id))
else:
self.write('[TASK EXISTS] - {}'.format(job_id))
# remove
elif 'remove' == action:
if job_id in job_ids:
scheduler.remove_job(job_id)
job_ids.remove(job_id)
self.write('[TASK REMOVED] - {}'.format(job_id))
else:
self.write('[TASK NOT FOUND] - {}'.format(job_id))
else:
self.write('[INVALID PARAMS] INVALID job_id or action')
if __name__ == "__main__":
routes = [
(r"/", MainHandler),
(r"/scheduler/?", SchedulerHandler),
]
init_scheduler()
app = Application(routes, debug=True)
app.listen(8888)
IOLoop.current().start()
每一次分片接口被調用后,就建立定時任務對分片文件進行監(jiān)測,如果分片成功就刪除分片文件,同時刪除任務,否則就啟用降級預案。
結語
分治法對超大文件進行分片切割,同時并發(fā)異步發(fā)送,可以提高傳輸效率,降低傳輸時間,和之前的一篇:聚是一團火散作滿天星,前端Vue.js+elementUI結合后端FastAPI實現(xiàn)大文件分片上傳,邏輯上有異曲同工之妙,但手法上卻略有不同,確是頗有相互借鏡之處,最后代碼開源于Github:https://github.com/zcxey2911/Tornado6_Vuejs3_Edu,與眾親同饗。
原文轉載自「劉悅的技術博客」 https://v3u.cn/a_id_218