Java 文件下載限流算法

也可以看我的CSDN的博客
https://blog.csdn.net/u013332124/article/details/88043761

在做文件下載功能時,為了避免下載功能將服務(wù)器的帶寬打滿,從而影響服務(wù)器的其他服務(wù)。我們可以設(shè)計一個限流器來限制下載的速率,從而限制下載服務(wù)所占用的帶寬。

一、算法思路

定義一個數(shù)據(jù)塊chunk(單位 bytes)以及允許的最大速率 maxRate(單位 KB/s)。通過maxRate我們可以算出,在maxRate的速率下,通過一個數(shù)據(jù)塊大小的字節(jié)流所需要的時間 timeCostPerChunk。

之后,在讀取/寫入字節(jié)時,我們維護(hù)已經(jīng)讀取/寫入的字節(jié)量 bytesWillBeSentOrReceive。

當(dāng)bytesWillBeSentOrReceive達(dá)到一個數(shù)據(jù)塊的大小時,檢查期間消耗的時間(nowNanoTime-lastPieceSentOrReceiveTick)

如果期間消耗的時間小于timeCostPerChunk的值,說明當(dāng)前的速率已經(jīng)超過了 maxRate的速率,這時候就需要休眠一會來限制流量

如果速率沒超過或者休眠完后,將 bytesWillBeSentOrReceive=bytesWillBeSentOrReceive-chunkSize

之后在讀取/寫入數(shù)據(jù)時繼續(xù)檢查。

下面該算法的Java代碼實現(xiàn):

    public synchronized void limitNextBytes(int len) {
        //累計bytesWillBeSentOrReceive
        this.bytesWillBeSentOrReceive += len;
        //如果積累的bytesWillBeSentOrReceive達(dá)到一個chunk的大小,就進(jìn)入語句塊操作
        while (this.bytesWillBeSentOrReceive > CHUNK_LENGTH) {
            long nowTick = System.nanoTime();
            //計算積累數(shù)據(jù)期間消耗的時間
            long passTime = nowTick - this.lastPieceSentOrReceiveTick;
            //timeCostPerChunk表示單個塊最多需要多少納秒
            //如果missedTime大于0,說明此時流量進(jìn)出的速率已經(jīng)超過maxRate了,需要休眠來限制流量
            long missedTime = this.timeCostPerChunk - passTime;
            if (missedTime > 0) {
                try {
                    Thread.sleep(missedTime / 1000000, (int) (missedTime % 1000000));
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            this.bytesWillBeSentOrReceive -= CHUNK_LENGTH;
            //重置最后一次檢查時間
            this.lastPieceSentOrReceiveTick = nowTick + (missedTime > 0 ? missedTime : 0);
        }
    }

二、限流的完整java代碼實現(xiàn)

限流器的實現(xiàn)

public class BandwidthLimiter {

    private static final Logger LOGGER = LoggerFactory.getLogger(BandwidthLimiter.class);
    //KB代表的字節(jié)數(shù)
    private static final Long KB = 1024L;
    //一個chunk的大小,單位byte。設(shè)置一個塊的大小為1M
    private static final Long CHUNK_LENGTH = 1024 * 1024L;

    //已經(jīng)發(fā)送/讀取的字節(jié)數(shù)
    private int bytesWillBeSentOrReceive = 0;
    //上一次接收到字節(jié)流的時間戳——單位納秒
    private long lastPieceSentOrReceiveTick = System.nanoTime();
    //允許的最大速率,默認(rèn)為 1024KB/s
    private int maxRate = 1024;
    //在maxRate的速率下,通過chunk大小的字節(jié)流要多少時間(納秒)
    private long timeCostPerChunk = (1000000000L * CHUNK_LENGTH) / (this.maxRate * KB);

    public BandwidthLimiter(int maxRate) {
        this.setMaxRate(maxRate);
    }

    //動態(tài)調(diào)整最大速率
    public void setMaxRate(int maxRate) {
        if (maxRate < 0) {
            throw new IllegalArgumentException("maxRate can not less than 0");
        }
        this.maxRate = maxRate;
        if (maxRate == 0) {
            this.timeCostPerChunk = 0;
        } else {
            this.timeCostPerChunk = (1000000000L * CHUNK_LENGTH) / (this.maxRate * KB);
        }
    }

    public synchronized void limitNextBytes() {
        this.limitNextBytes(1);
    }

    public synchronized void limitNextBytes(int len) {
        this.bytesWillBeSentOrReceive += len;

        while (this.bytesWillBeSentOrReceive > CHUNK_LENGTH) {
            long nowTick = System.nanoTime();
            long passTime = nowTick - this.lastPieceSentOrReceiveTick;
            long missedTime = this.timeCostPerChunk - passTime;
            if (missedTime > 0) {
                try {
                    Thread.sleep(missedTime / 1000000, (int) (missedTime % 1000000));
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            this.bytesWillBeSentOrReceive -= CHUNK_LENGTH;
            this.lastPieceSentOrReceiveTick = nowTick + (missedTime > 0 ? missedTime : 0);
        }
    }
}

有了限流器后,現(xiàn)在我們要對下載功能做限流。因為java的io流的設(shè)計是裝飾器模式,因此我們可以方便的封裝一個我們自己的InputStream

public class LimitInputStream extends InputStream {

    private InputStream inputStream;
    private BandwidthLimiter bandwidthLimiter;

    public LimitInputStream(InputStream inputStream, BandwidthLimiter bandwidthLimiter) {
        this.inputStream = inputStream;
        this.bandwidthLimiter = bandwidthLimiter;
    }

    @Override
    public int read() throws IOException {
        if (bandwidthLimiter != null) {
            bandwidthLimiter.limitNextBytes();
        }
        return inputStream.read();
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (bandwidthLimiter != null) {
            bandwidthLimiter.limitNextBytes(len);
        }
        return inputStream.read(b, off, len);
    }

    @Override
    public int read(byte[] b) throws IOException {
        if (bandwidthLimiter != null && b.length > 0) {
            bandwidthLimiter.limitNextBytes(b.length);
        }
        return inputStream.read(b);
    }
}

后面我們使用這個LimitInputStream來讀取文件,每次讀取一塊數(shù)據(jù),限流器都會檢查當(dāng)前的速率是否超過指定的最大速率。這樣就能間接的達(dá)到限制下載速率的目的了。

附上SpringMVC的一個下載限流的demo:

    @GetMapping("/limit")
    public void limitDownloadFile(String file, HttpServletResponse response) throws IOException {
        LOGGER.info("download file");
        if (file == null) {
            file = "/tmp/test.txt";
        }
        File downloadFile = new File(file);
        FileInputStream fileInputStream = new FileInputStream(downloadFile);

        response.setContentType("application/x-msdownload;");
        response.setHeader("Content-disposition", "attachment; filename=" + new String(downloadFile.getName()
                .getBytes("utf-8"), "ISO8859-1"));
        response.setHeader("Content-Length", String.valueOf(downloadFile.length()));
        ServletOutputStream outputStream = null;
        try {
            LimitInputStream limitInputStream = new LimitInputStream(fileInputStream, new BandwidthLimiter(1024));

            long beginTime = System.currentTimeMillis();
            outputStream = response.getOutputStream();
            byte[] bytes = new byte[1024];
            int read = limitInputStream.read(bytes, 0, 1024);
            while (read != -1) {
                outputStream.write(bytes);
                read = limitInputStream.read(bytes, 0, 1024);
            }
            LOGGER.info("download use {} ms", System.currentTimeMillis() - beginTime);
        } finally {
            fileInputStream.close();
            if (outputStream != null) {
                outputStream.close();
            }
            LOGGER.info("download success!");
        }
    }

三、注意點

使用這個算法要注意一個問題,就是chunk的塊大小不能設(shè)置的太小,即CHUNK_LENGTH不能設(shè)置的太小。否則容易造成明明maxRate設(shè)置的很大,但是實際下載速率卻很小的問題

假設(shè)CHUNK_LENGTH就設(shè)置為1024 bytes,每次讀取的塊大小也是1024 bytes,maxRate 為 64M/s。那么我們可以計算出timeCostPerChunk約等于15258納秒。

再如果真正的速率是100M/s,也就是每秒差不多會調(diào)用limitNextBytes方法100000次,由于每次讀取消耗的時間極短,因此每次進(jìn)入該方法都要sleep 15258納秒之后再讀取下一個塊的數(shù)據(jù)。如果沒有算上線程調(diào)度的時間,就算1秒內(nèi)休眠100000次也完全沒什么問題。但是線程的休眠和喚醒都需要內(nèi)核來進(jìn)行,線程上下文切換的時間應(yīng)該遠(yuǎn)大于15258納秒,這時候頻繁的休眠就會導(dǎo)致線程暫停運行的時間和我們預(yù)期的不符。由于休眠時間過長,最終導(dǎo)致實際的下載速率大大的低于maxRate。

因此,我們需要調(diào)大CHUNK_LENGTH,盡量讓timeCostPerChunk的值遠(yuǎn)大于線程調(diào)度的時間,減少線程調(diào)度對限流造成的影響。

四、具體demo的github地址

https://github.com/kongtrio/download-limit

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)載來自開濤的聊聊高并發(fā)系統(tǒng)限流特技-2 上一篇《聊聊高并發(fā)系統(tǒng)限流特技-1》講了限流算法、應(yīng)用級限流、分布式限流...
    meng_philip123閱讀 4,231評論 0 10
  • 摘要:上一篇《聊聊高并發(fā)系統(tǒng)限流特技-1》講了限流算法、應(yīng)用級限流、分布式限流;本篇將介紹接入層限流實現(xiàn)。 接入層...
    落羽成霜丶閱讀 988評論 0 5
  • 摘要:GFS在設(shè)計上有很多值得學(xué)習(xí)的地方,最近重讀了一下GFS的設(shè)計論文,試圖從架構(gòu)設(shè)計的角度對GFS進(jìn)行剖析,希...
    架構(gòu)禪話閱讀 4,734評論 0 2
  • 感官遲鈍,以為香港的熱情會連綿不絕。 猝不及防,冷風(fēng)掃落葉。 不覺此季值得傷懷,反而甚喜涼風(fēng)拂面。 爬上眉眼,撫過...
    東流水酌月閱讀 268評論 6 9
  • 當(dāng)你有一肚子的心里話,想要找個人傾訴的時候 , 卻突然發(fā)現(xiàn) 、 翻遍了通訊錄,盡然找不到一個可以任你隨時打擾的人 ...
    玉墨老師閱讀 155評論 0 0

友情鏈接更多精彩內(nèi)容