以寡治眾各個擊破,超大文件分片上傳之構建基于Vue.js3.0+Ant-desgin+Tornado6純異步IO高效寫入服務

原文轉載自「劉悅的技術博客」https://v3u.cn/a_id_218

分治算法是一種很古老但很務實的方法。本意即使將一個較大的整體打碎分成小的局部,這樣每個小的局部都不足以對抗大的整體。戰(zhàn)國時期,秦國破壞合縱的連橫即是一種分而治之的手段;十九世紀,比利時殖民者占領盧旺達, 將盧旺達的種族分為胡圖族與圖西族,以圖進行分裂控制,莫不如是。

21世紀,人們往往會在Leetcode平臺上刷分治算法題,但事實上,從工業(yè)角度上來看,算法如果不和實際業(yè)務場景相結合,算法就永遠是虛無縹緲的存在,它只會出現(xiàn)在開發(fā)者的某一次不經(jīng)意的面試中,而真實的算法,并不是虛空的,它應該能幫助我們解決實際問題,是的,它應該落地成為實體。

大文件分片上傳就是這樣一個契合分治算法的場景,現(xiàn)而今,視頻文件的體積越來越大,高清視頻體積大概2-4g不等,但4K視頻的分辨率是標準高清的四倍,需要四倍的存儲空間——只需兩到三分鐘的未壓縮4K 電影,或者電影預告片的長度,就可以達到500GB。 8K視頻文件更是大得難以想象,而現(xiàn)在12K正在出現(xiàn),如此巨大的文件,該怎樣設計一套合理的數(shù)據(jù)傳輸方案?這里我們以前后端分離項目為例,前端使用Vue.js3.0配合ui庫Ant-desgin,后端采用并發(fā)異步框架Tornado實現(xiàn)大文件的分片無阻塞傳輸與異步IO寫入服務。

前端分片

首先,安裝Vue3.0以上版本:

npm install -g @vue/cli

安裝異步請求庫axios:

npm install axios --save

隨后,安裝Ant-desgin:

npm i --save ant-design-vue@next -S

Ant-desgin雖然因為曾經(jīng)的圣誕節(jié)“彩蛋門”事件而聲名狼藉,但客觀地說,它依然是業(yè)界不可多得的優(yōu)秀UI框架之一。

接著在項目程序入口文件引入使用:

import { createApp } from 'vue'  
import App from './App.vue'  
import { router } from './router/index'  
  
  
import axios from 'axios'  
import qs from 'qs'  
  
import Antd from 'ant-design-vue';  
import 'ant-design-vue/dist/antd.css';  
  
  
const app = createApp(App)  
  
  
app.config.globalProperties.axios = axios;  
app.config.globalProperties.upload_dir = "https://localhost/static/";  
  
app.config.globalProperties.weburl = "http://localhost:8000";  
  
app.use(router);  
app.use(Antd);  
  
app.mount('#app')

隨后,參照Ant-desgin官方文檔:https://antdv.com/components/overview-cn 構建上傳控件:

<a-upload  
  
    @change="fileupload"  
    :before-upload="beforeUpload"  
  >  
    <a-button>  
      <upload-outlined></upload-outlined>  
      上傳文件  
    </a-button>  
  </a-upload>

注意這里需要將綁定的before-upload強制返回false,設置為手動上傳:

beforeUpload:function(file){  
  
  
      return false;  
  
}

接著聲明分片方法:

fileupload:function(file){  
  
  
      var size = file.file.size;//總大小  
   
  
      var shardSize = 200 * 1024; //分片大小  
   
      this.shardCount = Math.ceil(size / shardSize); //總片數(shù)  
  
      console.log(this.shardCount);  
   
   
      for (var i = 0; i < this.shardCount; ++i) {  
   
        //計算每一片的起始與結束位置  
   
        var start = i * shardSize;  
   
        var end = Math.min(size, start + shardSize);  
  
        var tinyfile = file.file.slice(start, end);  
  
        let data = new FormData();  
        data.append('file', tinyfile);  
        data.append('count',i);  
        data.append('filename',file.file.name);  
  
        const axiosInstance = this.axios.create({withCredentials: false});  
  
        axiosInstance({  
            method: 'POST',  
            url:'http://localhost:8000/upload/',  //上傳地址  
            data:data  
        }).then(data =>{  
  
           this.finished += 1;  
  
           console.log(this.finished);  
  
           if(this.finished == this.shardCount){  
            this.mergeupload(file.file.name);  
           }  
  
        }).catch(function(err) {  
            //上傳失敗  
        });  
  
  
  
      }  
  
  
  
    }

具體分片邏輯是,大文件總體積按照單片體積的大小做除法并向上取整,獲取到文件的分片個數(shù),這里為了測試方便,將單片體積設置為200kb,可以隨時做修改。

隨后,分片過程中使用Math.min方法計算每一片的起始和結束位置,再通過slice方法進行切片操作,最后將分片的下標、文件名、以及分片本體異步發(fā)送到后臺。

當所有的分片請求都發(fā)送完畢后,封裝分片合并方法,請求后端發(fā)起合并分片操作:

mergeupload:function(filename){  
  
  
      this.myaxios(this.weburl+"/upload/","put",{"filename":filename}).then(data =>{  
  
              console.log(data);  
  
        });  
  
}

至此,前端分片邏輯就完成了。

后端異步IO寫入

為了避免同步寫入引起的阻塞,安裝aiofiles庫:

pip3 install aiofiles

aiofiles用于處理asyncio應用程序中的本地磁盤文件,配合Tornado的異步非阻塞機制,可以有效的提升文件寫入效率:

import aiofiles  
  
# 分片上傳  
class SliceUploadHandler(BaseHandler):  
      
    async def post(self):  
  
  
        file = self.request.files["file"][0]  
        filename = self.get_argument("filename")  
        count = self.get_argument("count")  
  
        filename = '%s_%s' % (filename,count) # 構成該分片唯一標識符  
  
        contents = file['body'] #異步讀取文件  
        async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:  
            await f.write(contents)  
  
        return {"filename": file.filename,"errcode":0}

這里后端獲取到分片實體、文件名、以及分片標識后,將分片文件以文件名_分片標識的格式異步寫入到系統(tǒng)目錄中,以一張378kb大小的png圖片為例,分片文件應該順序為200kb和178kb,如圖所示:

[圖片上傳失敗...(image-1affef-1658759510726)]

當分片文件都寫入成功后,觸發(fā)分片合并接口:

import aiofiles  
  
# 分片上傳  
class SliceUploadHandler(BaseHandler):  
      
    async def post(self):  
  
  
        file = self.request.files["file"][0]  
        filename = self.get_argument("filename")  
        count = self.get_argument("count")  
  
        filename = '%s_%s' % (filename,count) # 構成該分片唯一標識符  
  
        contents = file['body'] #異步讀取文件  
        async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:  
            await f.write(contents)  
  
        return {"filename": file.filename,"errcode":0}  
  
  
    async def put(self):  
  
        filename = self.get_argument("filename")  
        chunk = 0  
  
        async with aiofiles.open('./static/uploads/%s' % filename,'ab') as target_file:  
  
            while True:  
                try:  
                    source_file = open('./static/uploads/%s_%s' % (filename,chunk), 'rb')  
                    await target_file.write(source_file.read())  
                    source_file.close()  
                except Exception as e:  
                    print(str(e))  
                    break  
  
                chunk = chunk + 1  
        self.finish({"msg":"ok","errcode":0})

這里通過文件名進行尋址,隨后遍歷合并,注意句柄寫入模式為增量字節(jié)碼寫入,否則會逐層將分片文件覆蓋,同時也兼具了斷點續(xù)寫的功能。有些邏輯會將分片個數(shù)傳入后端,讓后端判斷分片合并個數(shù),其實并不需要,因為如果尋址失敗,會自動拋出異常并且跳出循環(huán),從而節(jié)約了一個參數(shù)的帶寬占用。

輪詢服務

在真實的超大文件傳輸場景中,由于網(wǎng)絡或者其他因素,很可能導致分片任務中斷,此時就需要通過降級快速響應,返回托底數(shù)據(jù),避免用戶的長時間等待,這里我們使用基于Tornado的Apscheduler庫來調度分片任務:

pip install apscheduler

隨后編寫job.py輪詢服務文件:

from datetime import datetime  
from tornado.ioloop import IOLoop, PeriodicCallback  
from tornado.web import RequestHandler, Application  
from apscheduler.schedulers.tornado import TornadoScheduler  
  
  
scheduler = None  
job_ids   = []  
  
# 初始化  
def init_scheduler():  
    global scheduler  
    scheduler = TornadoScheduler()  
    scheduler.start()  
    print('[Scheduler Init]APScheduler has been started')  
  
# 要執(zhí)行的定時任務在這里  
def task1(options):  
    print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))  
  
  
class MainHandler(RequestHandler):  
    def get(self):  
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')  
  
  
class SchedulerHandler(RequestHandler):  
    def get(self):  
        global job_ids  
        job_id = self.get_query_argument('job_id', None)  
        action = self.get_query_argument('action', None)  
        if job_id:  
            # add  
            if 'add' == action:  
                if job_id not in job_ids:  
                    job_ids.append(job_id)  
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))  
                    self.write('[TASK ADDED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK EXISTS] - {}'.format(job_id))  
            # remove  
            elif 'remove' == action:  
                if job_id in job_ids:  
                    scheduler.remove_job(job_id)  
                    job_ids.remove(job_id)  
                    self.write('[TASK REMOVED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))  
        else:  
            self.write('[INVALID PARAMS] INVALID job_id or action')  
  
  
if __name__ == "__main__":  
    routes    = [  
        (r"/", MainHandler),  
        (r"/scheduler/?", SchedulerHandler),  
    ]  
    init_scheduler()  
    app       = Application(routes, debug=True)  
    app.listen(8888)  
    IOLoop.current().start()

每一次分片接口被調用后,就建立定時任務對分片文件進行監(jiān)測,如果分片成功就刪除分片文件,同時刪除任務,否則就啟用降級預案。

結語

分治法對超大文件進行分片切割,同時并發(fā)異步發(fā)送,可以提高傳輸效率,降低傳輸時間,和之前的一篇:聚是一團火散作滿天星,前端Vue.js+elementUI結合后端FastAPI實現(xiàn)大文件分片上傳,邏輯上有異曲同工之妙,但手法上卻略有不同,確是頗有相互借鏡之處,最后代碼開源于Github:https://github.com/zcxey2911/Tornado6_Vuejs3_Edu,與眾親同饗。

原文轉載自「劉悅的技術博客」 https://v3u.cn/a_id_218

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容