限流是保護(hù)高并發(fā)系統(tǒng)的利器,處理高并發(fā)還包括:
1.緩存 緩存的目的是提升系統(tǒng)訪問速度和增大系統(tǒng)處理容量
2.降級 降級是當(dāng)服務(wù)出現(xiàn)問題或者影響到核心流程時,需要暫時屏蔽掉,待高峰或者問題解決后再打開
3.限流 限流的目的是通過對并發(fā)訪問/請求進(jìn)行限速,或者對一個時間窗口內(nèi)的請求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)、排隊(duì)或等待、降級等處理
限流的目的是通過對并發(fā)訪問/請求進(jìn)行限速或者一個時間窗口內(nèi)的的請求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)或進(jìn)行流量整形。
漏桶算法:
漏桶一個固定容量的漏桶,按照固定常量速率流出請求,流入請求速率任意,當(dāng)流入的請求數(shù)累積到漏桶容量時,則新流入的請求被拒絕。漏桶可以看做是一個具有固定容量、固定流出速率的隊(duì)列,漏桶限制的是請求的流出速率。
漏桶算法的實(shí)現(xiàn)往往依賴于隊(duì)列,請求到達(dá)如果隊(duì)列未滿則直接放入隊(duì)列,然后有一個處理器按照固定頻率從隊(duì)列頭取出請求進(jìn)行處理。如果請求量大,則會導(dǎo)致隊(duì)列滿,那么新來的請求就會被拋棄。
令牌桶算法

1.令牌桶是按照固定速率往桶中添加令牌,請求是否被處理需要看桶中令牌是否足夠,當(dāng)令牌數(shù)減為零時則拒絕新的請求;漏桶則是按照常量固定速率流出請求,流入請求速率任意,當(dāng)流入的請求數(shù)累積到漏桶容量時,則新流入的請求被拒絕;
2.令牌桶限制的是平均流入速率,允許突發(fā)請求,只要有令牌就可以處理,支持一次拿3個令牌,4個令牌;漏桶限制的是常量流出速率,即流出速率是一個固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,從而平滑突發(fā)流入速率;
3.令牌桶允許一定程度的突發(fā),而漏桶主要目的是平滑流出速率;
Google開源工具包Guava提供了限流工具類RateLimiter,該類基于令牌桶算法(Token Bucket)來完成限流
下面是RateLimiter令牌桶實(shí)現(xiàn)
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LxRateLimit {
double limitNum() default 20; //默認(rèn)每秒放入桶中的token
//獲取令牌的等待時間
int timeOut() default 0;
//等待時間單位
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
@Slf4j
@Order(1)
public class LxRateLimitAspect {
//用來存放不同接口的RateLimiter(key為接口名稱,value為RateLimiter)
private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>();
private RateLimiter rateLimiter;
/**
* 定義切點(diǎn)
* 1、通過掃包切入
* 2、帶有指定注解切入
*/
@Pointcut("@annotation(com.yang.aspect.LxRateLimit)")
public void checkPointcut() { }
@ResponseBody
@Around(value = "checkPointcut()")
public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("攔截到了{(lán)}方法", joinPoint.getSignature().getName());
Object obj = null;
//獲取攔截的方法名
Signature sig = joinPoint.getSignature();
//獲取攔截的方法名
MethodSignature msig = (MethodSignature) sig;
//返回被織入增加處理目標(biāo)對象
Object target = joinPoint.getTarget();
//為了獲取注解信息
Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
//獲取注解信息
LxRateLimit annotation = currentMethod.getAnnotation(LxRateLimit.class);
double limitNum = annotation.limitNum(); //獲取注解每秒加入桶中的token
TimeUnit timeUnit = annotation.timeUnit();//獲取時間單位
String functionName = msig.getName(); // 注解所在方法名區(qū)分不同的限流策略
int timeOut = annotation.timeOut();
//獲取rateLimiter
if(map.containsKey(functionName)){
rateLimiter = map.get(functionName);
}else {
map.put(functionName, RateLimiter.create(limitNum));
rateLimiter = map.get(functionName);
}
try {
if (rateLimiter.tryAcquire(timeOut,timeUnit)) {
//執(zhí)行方法
obj = joinPoint.proceed();
} else {
log.info("{}方法限流超載", joinPoint.getSignature().getName());
return "方法限流超載";
}
} catch (Throwable throwable) {
log.error("{}方法限流異常", joinPoint.getSignature().getName());
throwable.printStackTrace();
return "方法限流超載";
}
log.info("{}方法限流正常", joinPoint.getSignature().getName());
return obj;
}
}
但是RateLimiter只作用于一個jvm,而往往實(shí)際項(xiàng)目中會部署在多臺機(jī)器,運(yùn)行于多個jvm,這樣的話我們的LxRateLimit 又要修改。
比如我們有20服務(wù)器,我們的需求是所有服務(wù)器總限流每秒在100個左右,這樣的話,單個應(yīng)用中LxRateLimit.limitNum就要改為100/20=5。這樣就能保證。所有的服務(wù)器流量在100個左右(前提是nginx分發(fā)請求是平均的)。
當(dāng)然分布式環(huán)境也可以用redis限流,且不用修改limitNum數(shù)量,不過這樣增加了redis壓力(當(dāng)然完全不要用擔(dān)心redis,單機(jī)redis最高可以每秒處理幾十萬請求,何況我們可以做集群)
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLimit {
//存儲key的前綴
String prefix() default "limit";
//獲取等待時間,redis最高每秒處理幾十萬請求
int expire() default 1;
//等待時間單位,毫秒
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
注意這里的策略有修改,不在是發(fā)放令牌,而是redis的過期時間,單位最好是毫秒。比如我們需求是1s保持500個pv,那么RedisLimit.expire=1000/500=2
以下是redisson實(shí)現(xiàn)
import com.yang.annotation.RedisLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {
@Autowired
private RedissonClient redisson;
/**
* 定義切點(diǎn)
* 1、帶有指定注解切入
*/
@Pointcut("@annotation(com.yang.annotation.RedisLimit)")
public void checkPointcut() { }
@ResponseBody
@Around(value = "checkPointcut()")
public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("攔截到了{(lán)}方法", joinPoint.getSignature().getName());
Object obj = null;
// 得到類名
String clazzName = joinPoint.getTarget().getClass().getName();
//獲取攔截的方法
MethodSignature msig = (MethodSignature) joinPoint.getSignature();
//返回被織入增加處理目標(biāo)對象
Object target = joinPoint.getTarget();
//為了獲取注解信息
Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
//獲取參數(shù)列表
Object[] args = joinPoint.getArgs();
//獲取注解信息
RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
String prefix = annotation.prefix();
int expire = annotation.expire();//獲取時間單位
TimeUnit timeUnit = annotation.timeUnit();//獲取時間單位
String key = getKey(prefix,clazzName,msig.getName()); //注解限制的前綴+所在類+方法名區(qū)分不同的限流策略
RLock lock = redisson.getLock(key);
try {
boolean res = lock.tryLock(0,expire,timeUnit);
if (res) {
//執(zhí)行方法
obj = joinPoint.proceed();
} else {
log.info("{}方法限流超載", msig.getName());
return "方法限流超載";
}
} catch (Exception throwable) {
log.error("{}方法限流異常", msig.getName());
throwable.printStackTrace();
return "方法限流超載";
}
log.info("{}方法限流正常", msig.getName());
return obj;
}
/**
* * 根據(jù)類名、方法名和參數(shù)生成Key
* * @param clazzName
* * @param methodName
* * @return key格式:全類名|方法名|參數(shù)類型
*
*/
private String getKey( String prefix,String clazzName, String methodName) {
StringBuilder key = new StringBuilder();
key.append(prefix);
key.append(clazzName);
key.append(methodName);
return key.toString();
}
}
注意這里不要手動釋放鎖,等時間到了自動釋放。
也可以用原生的redisTemplate 實(shí)現(xiàn)
import com.google.common.util.concurrent.RateLimiter;
import com.yang.annotation.RedisLimit;
import com.yang.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {
@Autowired
private RedisTemplate redisTemplate ;
/**
* 定義切點(diǎn)
* 1、帶有指定注解切入
*/
@Pointcut("@annotation(com.yang.annotation.RedisLimit)")
public void checkPointcut() { }
@ResponseBody
@Around(value = "checkPointcut()")
public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("攔截到了{(lán)}方法", joinPoint.getSignature().getName());
Object obj = null;
// 得到類名
String clazzName = joinPoint.getTarget().getClass().getName();
//獲取攔截的方法
MethodSignature msig = (MethodSignature) joinPoint.getSignature();
//返回被織入增加處理目標(biāo)對象
Object target = joinPoint.getTarget();
//為了獲取注解信息
Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
//獲取參數(shù)列表
Object[] args = joinPoint.getArgs();
//獲取注解信息
RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
String prefix = annotation.prefix();
int expire = annotation.expire();//獲取時間單位
TimeUnit timeUnit = annotation.timeUnit();//獲取時間單位
String key = getKey(prefix,clazzName,msig.getName()); // 注解限制的前綴+所在類+方法名區(qū)分不同的限流策略
String value = getValue(key,args);
try {
if (!redisTemplate.hasKey(key)) {
//執(zhí)行方法
redisTemplate.opsForValue().set(key,value,expire,timeUnit);
obj = joinPoint.proceed();
} else {
log.info("{}方法限流超載", msig.getName());
return "方法限流超載";
}
} catch (Throwable throwable) {
log.error("{}方法限流異常", msig.getName());
throwable.printStackTrace();
}
log.info("{}方法限流正常", msig.getName());
return obj;
}
/**
* * 根據(jù)類名、方法名和參數(shù)生成Key
* * @param clazzName
* * @param methodName
* * @return key格式:全類名|方法名|參數(shù)類型
*
*/
private String getKey( String prefix,String clazzName, String methodName) {
StringBuilder key = new StringBuilder();
key.append(prefix);
key.append(clazzName);
key.append(methodName);
return key.toString();
}
/**
* * 根據(jù)類名、方法名和參數(shù)生成value
* * @param clazzName
* * @param methodName
* * @param args
* * @return key格式:全類名|方法名|參數(shù)類型
*
*/
private String getValue(String key, Object[] args) {
StringBuilder value = new StringBuilder();
value.append(key);
value.append(Arrays.toString(args));
return value.toString();
}
}