也可以看我的CSDN的博客
https://blog.csdn.net/u013332124/article/details/88043761
在做文件下載功能時,為了避免下載功能將服務(wù)器的帶寬打滿,從而影響服務(wù)器的其他服務(wù)。我們可以設(shè)計一個限流器來限制下載的速率,從而限制下載服務(wù)所占用的帶寬。
一、算法思路
定義一個數(shù)據(jù)塊chunk(單位 bytes)以及允許的最大速率 maxRate(單位 KB/s)。通過maxRate我們可以算出,在maxRate的速率下,通過一個數(shù)據(jù)塊大小的字節(jié)流所需要的時間 timeCostPerChunk。
之后,在讀取/寫入字節(jié)時,我們維護(hù)已經(jīng)讀取/寫入的字節(jié)量 bytesWillBeSentOrReceive。
當(dāng)bytesWillBeSentOrReceive達(dá)到一個數(shù)據(jù)塊的大小時,檢查期間消耗的時間(nowNanoTime-lastPieceSentOrReceiveTick)
如果期間消耗的時間小于timeCostPerChunk的值,說明當(dāng)前的速率已經(jīng)超過了 maxRate的速率,這時候就需要休眠一會來限制流量
如果速率沒超過或者休眠完后,將 bytesWillBeSentOrReceive=bytesWillBeSentOrReceive-chunkSize
之后在讀取/寫入數(shù)據(jù)時繼續(xù)檢查。
下面該算法的Java代碼實現(xiàn):
public synchronized void limitNextBytes(int len) {
//累計bytesWillBeSentOrReceive
this.bytesWillBeSentOrReceive += len;
//如果積累的bytesWillBeSentOrReceive達(dá)到一個chunk的大小,就進(jìn)入語句塊操作
while (this.bytesWillBeSentOrReceive > CHUNK_LENGTH) {
long nowTick = System.nanoTime();
//計算積累數(shù)據(jù)期間消耗的時間
long passTime = nowTick - this.lastPieceSentOrReceiveTick;
//timeCostPerChunk表示單個塊最多需要多少納秒
//如果missedTime大于0,說明此時流量進(jìn)出的速率已經(jīng)超過maxRate了,需要休眠來限制流量
long missedTime = this.timeCostPerChunk - passTime;
if (missedTime > 0) {
try {
Thread.sleep(missedTime / 1000000, (int) (missedTime % 1000000));
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
this.bytesWillBeSentOrReceive -= CHUNK_LENGTH;
//重置最后一次檢查時間
this.lastPieceSentOrReceiveTick = nowTick + (missedTime > 0 ? missedTime : 0);
}
}
二、限流的完整java代碼實現(xiàn)
限流器的實現(xiàn)
public class BandwidthLimiter {
private static final Logger LOGGER = LoggerFactory.getLogger(BandwidthLimiter.class);
//KB代表的字節(jié)數(shù)
private static final Long KB = 1024L;
//一個chunk的大小,單位byte。設(shè)置一個塊的大小為1M
private static final Long CHUNK_LENGTH = 1024 * 1024L;
//已經(jīng)發(fā)送/讀取的字節(jié)數(shù)
private int bytesWillBeSentOrReceive = 0;
//上一次接收到字節(jié)流的時間戳——單位納秒
private long lastPieceSentOrReceiveTick = System.nanoTime();
//允許的最大速率,默認(rèn)為 1024KB/s
private int maxRate = 1024;
//在maxRate的速率下,通過chunk大小的字節(jié)流要多少時間(納秒)
private long timeCostPerChunk = (1000000000L * CHUNK_LENGTH) / (this.maxRate * KB);
public BandwidthLimiter(int maxRate) {
this.setMaxRate(maxRate);
}
//動態(tài)調(diào)整最大速率
public void setMaxRate(int maxRate) {
if (maxRate < 0) {
throw new IllegalArgumentException("maxRate can not less than 0");
}
this.maxRate = maxRate;
if (maxRate == 0) {
this.timeCostPerChunk = 0;
} else {
this.timeCostPerChunk = (1000000000L * CHUNK_LENGTH) / (this.maxRate * KB);
}
}
public synchronized void limitNextBytes() {
this.limitNextBytes(1);
}
public synchronized void limitNextBytes(int len) {
this.bytesWillBeSentOrReceive += len;
while (this.bytesWillBeSentOrReceive > CHUNK_LENGTH) {
long nowTick = System.nanoTime();
long passTime = nowTick - this.lastPieceSentOrReceiveTick;
long missedTime = this.timeCostPerChunk - passTime;
if (missedTime > 0) {
try {
Thread.sleep(missedTime / 1000000, (int) (missedTime % 1000000));
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
this.bytesWillBeSentOrReceive -= CHUNK_LENGTH;
this.lastPieceSentOrReceiveTick = nowTick + (missedTime > 0 ? missedTime : 0);
}
}
}
有了限流器后,現(xiàn)在我們要對下載功能做限流。因為java的io流的設(shè)計是裝飾器模式,因此我們可以方便的封裝一個我們自己的InputStream
public class LimitInputStream extends InputStream {
private InputStream inputStream;
private BandwidthLimiter bandwidthLimiter;
public LimitInputStream(InputStream inputStream, BandwidthLimiter bandwidthLimiter) {
this.inputStream = inputStream;
this.bandwidthLimiter = bandwidthLimiter;
}
@Override
public int read() throws IOException {
if (bandwidthLimiter != null) {
bandwidthLimiter.limitNextBytes();
}
return inputStream.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (bandwidthLimiter != null) {
bandwidthLimiter.limitNextBytes(len);
}
return inputStream.read(b, off, len);
}
@Override
public int read(byte[] b) throws IOException {
if (bandwidthLimiter != null && b.length > 0) {
bandwidthLimiter.limitNextBytes(b.length);
}
return inputStream.read(b);
}
}
后面我們使用這個LimitInputStream來讀取文件,每次讀取一塊數(shù)據(jù),限流器都會檢查當(dāng)前的速率是否超過指定的最大速率。這樣就能間接的達(dá)到限制下載速率的目的了。
附上SpringMVC的一個下載限流的demo:
@GetMapping("/limit")
public void limitDownloadFile(String file, HttpServletResponse response) throws IOException {
LOGGER.info("download file");
if (file == null) {
file = "/tmp/test.txt";
}
File downloadFile = new File(file);
FileInputStream fileInputStream = new FileInputStream(downloadFile);
response.setContentType("application/x-msdownload;");
response.setHeader("Content-disposition", "attachment; filename=" + new String(downloadFile.getName()
.getBytes("utf-8"), "ISO8859-1"));
response.setHeader("Content-Length", String.valueOf(downloadFile.length()));
ServletOutputStream outputStream = null;
try {
LimitInputStream limitInputStream = new LimitInputStream(fileInputStream, new BandwidthLimiter(1024));
long beginTime = System.currentTimeMillis();
outputStream = response.getOutputStream();
byte[] bytes = new byte[1024];
int read = limitInputStream.read(bytes, 0, 1024);
while (read != -1) {
outputStream.write(bytes);
read = limitInputStream.read(bytes, 0, 1024);
}
LOGGER.info("download use {} ms", System.currentTimeMillis() - beginTime);
} finally {
fileInputStream.close();
if (outputStream != null) {
outputStream.close();
}
LOGGER.info("download success!");
}
}
三、注意點
使用這個算法要注意一個問題,就是chunk的塊大小不能設(shè)置的太小,即CHUNK_LENGTH不能設(shè)置的太小。否則容易造成明明maxRate設(shè)置的很大,但是實際下載速率卻很小的問題。
假設(shè)CHUNK_LENGTH就設(shè)置為1024 bytes,每次讀取的塊大小也是1024 bytes,maxRate 為 64M/s。那么我們可以計算出timeCostPerChunk約等于15258納秒。
再如果真正的速率是100M/s,也就是每秒差不多會調(diào)用limitNextBytes方法100000次,由于每次讀取消耗的時間極短,因此每次進(jìn)入該方法都要sleep 15258納秒之后再讀取下一個塊的數(shù)據(jù)。如果沒有算上線程調(diào)度的時間,就算1秒內(nèi)休眠100000次也完全沒什么問題。但是線程的休眠和喚醒都需要內(nèi)核來進(jìn)行,線程上下文切換的時間應(yīng)該遠(yuǎn)大于15258納秒,這時候頻繁的休眠就會導(dǎo)致線程暫停運行的時間和我們預(yù)期的不符。由于休眠時間過長,最終導(dǎo)致實際的下載速率大大的低于maxRate。
因此,我們需要調(diào)大CHUNK_LENGTH,盡量讓timeCostPerChunk的值遠(yuǎn)大于線程調(diào)度的時間,減少線程調(diào)度對限流造成的影響。