分布式大型環(huán)境如何用AOP限流?

限流是保護(hù)高并發(fā)系統(tǒng)的利器,處理高并發(fā)還包括:

1.緩存 緩存的目的是提升系統(tǒng)訪問速度和增大系統(tǒng)處理容量
2.降級 降級是當(dāng)服務(wù)出現(xiàn)問題或者影響到核心流程時,需要暫時屏蔽掉,待高峰或者問題解決后再打開
3.限流 限流的目的是通過對并發(fā)訪問/請求進(jìn)行限速,或者對一個時間窗口內(nèi)的請求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)、排隊(duì)或等待、降級等處理

限流的目的是通過對并發(fā)訪問/請求進(jìn)行限速或者一個時間窗口內(nèi)的的請求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)或進(jìn)行流量整形。

漏桶算法:

漏桶一個固定容量的漏桶,按照固定常量速率流出請求,流入請求速率任意,當(dāng)流入的請求數(shù)累積到漏桶容量時,則新流入的請求被拒絕。漏桶可以看做是一個具有固定容量、固定流出速率的隊(duì)列,漏桶限制的是請求的流出速率。
漏桶算法的實(shí)現(xiàn)往往依賴于隊(duì)列,請求到達(dá)如果隊(duì)列未滿則直接放入隊(duì)列,然后有一個處理器按照固定頻率從隊(duì)列頭取出請求進(jìn)行處理。如果請求量大,則會導(dǎo)致隊(duì)列滿,那么新來的請求就會被拋棄。

令牌桶算法

令牌桶算法.png

1.令牌桶是按照固定速率往桶中添加令牌,請求是否被處理需要看桶中令牌是否足夠,當(dāng)令牌數(shù)減為零時則拒絕新的請求;漏桶則是按照常量固定速率流出請求,流入請求速率任意,當(dāng)流入的請求數(shù)累積到漏桶容量時,則新流入的請求被拒絕;
2.令牌桶限制的是平均流入速率,允許突發(fā)請求,只要有令牌就可以處理,支持一次拿3個令牌,4個令牌;漏桶限制的是常量流出速率,即流出速率是一個固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,從而平滑突發(fā)流入速率;
3.令牌桶允許一定程度的突發(fā),而漏桶主要目的是平滑流出速率;

Google開源工具包Guava提供了限流工具類RateLimiter,該類基于令牌桶算法(Token Bucket)來完成限流

下面是RateLimiter令牌桶實(shí)現(xiàn)

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LxRateLimit {

    double limitNum() default 20;  //默認(rèn)每秒放入桶中的token

    //獲取令牌的等待時間
    int timeOut() default 0;

    //等待時間單位
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
@Slf4j
@Order(1)
public class LxRateLimitAspect {

    //用來存放不同接口的RateLimiter(key為接口名稱,value為RateLimiter)
    private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>();
    private RateLimiter rateLimiter;

    /**
     * 定義切點(diǎn)
     * 1、通過掃包切入
     * 2、帶有指定注解切入
     */
    @Pointcut("@annotation(com.yang.aspect.LxRateLimit)")
    public void checkPointcut() { }

    @ResponseBody
    @Around(value = "checkPointcut()")
    public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
        log.info("攔截到了{(lán)}方法", joinPoint.getSignature().getName());
        Object obj = null;
        //獲取攔截的方法名
        Signature sig = joinPoint.getSignature();
        //獲取攔截的方法名
        MethodSignature msig = (MethodSignature) sig;
        //返回被織入增加處理目標(biāo)對象
        Object target = joinPoint.getTarget();
        //為了獲取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //獲取注解信息
        LxRateLimit annotation = currentMethod.getAnnotation(LxRateLimit.class);
        double limitNum = annotation.limitNum(); //獲取注解每秒加入桶中的token
        TimeUnit timeUnit = annotation.timeUnit();//獲取時間單位
        String functionName = msig.getName(); // 注解所在方法名區(qū)分不同的限流策略
        int timeOut = annotation.timeOut();

        //獲取rateLimiter
        if(map.containsKey(functionName)){
            rateLimiter = map.get(functionName);
        }else {
            map.put(functionName, RateLimiter.create(limitNum));
            rateLimiter = map.get(functionName);
        }

       try {
            if (rateLimiter.tryAcquire(timeOut,timeUnit)) {
                //執(zhí)行方法
                obj = joinPoint.proceed();
            } else {
                log.info("{}方法限流超載", joinPoint.getSignature().getName());
                return "方法限流超載";
            }
        } catch (Throwable throwable) {
            log.error("{}方法限流異常", joinPoint.getSignature().getName());
            throwable.printStackTrace();
            return "方法限流超載";
        }
        log.info("{}方法限流正常", joinPoint.getSignature().getName());
        return obj;
    }
}

但是RateLimiter只作用于一個jvm,而往往實(shí)際項(xiàng)目中會部署在多臺機(jī)器,運(yùn)行于多個jvm,這樣的話我們的LxRateLimit 又要修改。
比如我們有20服務(wù)器,我們的需求是所有服務(wù)器總限流每秒在100個左右,這樣的話,單個應(yīng)用中LxRateLimit.limitNum就要改為100/20=5。這樣就能保證。所有的服務(wù)器流量在100個左右(前提是nginx分發(fā)請求是平均的)。

當(dāng)然分布式環(huán)境也可以用redis限流,且不用修改limitNum數(shù)量,不過這樣增加了redis壓力(當(dāng)然完全不要用擔(dān)心redis,單機(jī)redis最高可以每秒處理幾十萬請求,何況我們可以做集群)

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLimit {

    //存儲key的前綴
    String prefix() default "limit";

    //獲取等待時間,redis最高每秒處理幾十萬請求
    int expire() default 1;

    //等待時間單位,毫秒
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

注意這里的策略有修改,不在是發(fā)放令牌,而是redis的過期時間,單位最好是毫秒。比如我們需求是1s保持500個pv,那么RedisLimit.expire=1000/500=2
以下是redisson實(shí)現(xiàn)


import com.yang.annotation.RedisLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {

    @Autowired
    private RedissonClient redisson;

    /**
     * 定義切點(diǎn)
     * 1、帶有指定注解切入
     */
    @Pointcut("@annotation(com.yang.annotation.RedisLimit)")
    public void checkPointcut() { }

    @ResponseBody
    @Around(value = "checkPointcut()")
    public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
        log.info("攔截到了{(lán)}方法", joinPoint.getSignature().getName());
        Object obj = null;
        // 得到類名
        String clazzName = joinPoint.getTarget().getClass().getName();
        //獲取攔截的方法
        MethodSignature msig = (MethodSignature) joinPoint.getSignature();
        //返回被織入增加處理目標(biāo)對象
        Object target = joinPoint.getTarget();
        //為了獲取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //獲取參數(shù)列表
        Object[] args = joinPoint.getArgs();
        //獲取注解信息
        RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
        String prefix = annotation.prefix();
        int expire = annotation.expire();//獲取時間單位
        TimeUnit timeUnit = annotation.timeUnit();//獲取時間單位
        String key = getKey(prefix,clazzName,msig.getName()); //注解限制的前綴+所在類+方法名區(qū)分不同的限流策略
        RLock lock = redisson.getLock(key);
        try {
            boolean res = lock.tryLock(0,expire,timeUnit);
            if (res) {
                //執(zhí)行方法
                obj = joinPoint.proceed();
            } else {
                log.info("{}方法限流超載", msig.getName());
                return "方法限流超載";
            }
        } catch (Exception throwable) {
            log.error("{}方法限流異常", msig.getName());
            throwable.printStackTrace();
            return "方法限流超載";
        }
        log.info("{}方法限流正常", msig.getName());
        return obj;
    }

    /**
     *      * 根據(jù)類名、方法名和參數(shù)生成Key
     *      * @param clazzName
     *      * @param methodName
     *      * @return key格式:全類名|方法名|參數(shù)類型
     *
     */
    private String getKey( String prefix,String clazzName, String methodName) {
        StringBuilder key = new StringBuilder();
        key.append(prefix);
        key.append(clazzName);
        key.append(methodName);
        return key.toString();
    }

}

注意這里不要手動釋放鎖,等時間到了自動釋放。
也可以用原生的redisTemplate 實(shí)現(xiàn)

import com.google.common.util.concurrent.RateLimiter;
import com.yang.annotation.RedisLimit;
import com.yang.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {

    @Autowired
    private RedisTemplate redisTemplate ;

    /**
     * 定義切點(diǎn)
     * 1、帶有指定注解切入
     */
    @Pointcut("@annotation(com.yang.annotation.RedisLimit)")
    public void checkPointcut() { }

    @ResponseBody
    @Around(value = "checkPointcut()")
    public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
        log.info("攔截到了{(lán)}方法", joinPoint.getSignature().getName());
        Object obj = null;
        // 得到類名
        String clazzName = joinPoint.getTarget().getClass().getName();
        //獲取攔截的方法
        MethodSignature msig = (MethodSignature) joinPoint.getSignature();
        //返回被織入增加處理目標(biāo)對象
        Object target = joinPoint.getTarget();
        //為了獲取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //獲取參數(shù)列表
        Object[] args = joinPoint.getArgs();
        //獲取注解信息
        RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
        String prefix = annotation.prefix();
        int expire = annotation.expire();//獲取時間單位
        TimeUnit timeUnit = annotation.timeUnit();//獲取時間單位
        String key = getKey(prefix,clazzName,msig.getName()); // 注解限制的前綴+所在類+方法名區(qū)分不同的限流策略
        String value = getValue(key,args);
        try {
            if (!redisTemplate.hasKey(key)) {
                //執(zhí)行方法
                redisTemplate.opsForValue().set(key,value,expire,timeUnit);
                obj = joinPoint.proceed();
            } else {
                log.info("{}方法限流超載", msig.getName());
                return "方法限流超載";
            }
        } catch (Throwable throwable) {
            log.error("{}方法限流異常", msig.getName());
            throwable.printStackTrace();
        }
        log.info("{}方法限流正常", msig.getName());
        return obj;
    }

    /**
     *      * 根據(jù)類名、方法名和參數(shù)生成Key
     *      * @param clazzName
     *      * @param methodName
     *      * @return key格式:全類名|方法名|參數(shù)類型
     *
     */
    private String getKey( String prefix,String clazzName, String methodName) {
        StringBuilder key = new StringBuilder();
        key.append(prefix);
        key.append(clazzName);
        key.append(methodName);
        return key.toString();
    }

    /**
     *      * 根據(jù)類名、方法名和參數(shù)生成value
     *      * @param clazzName
     *      * @param methodName
     *      * @param args
     *      * @return key格式:全類名|方法名|參數(shù)類型
     *
     */
    private String getValue(String key, Object[] args) {
        StringBuilder value = new StringBuilder();
        value.append(key);
        value.append(Arrays.toString(args));
        return value.toString();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容