Java實現(xiàn)本地任務(wù)聚合與分布式任務(wù)聚合

場景介紹:在高并發(fā)場景,如果調(diào)用鏈是A ->B,A不需要立馬獲取請求的最終結(jié)果(可以理解為異步),而A的請求邏輯是可以批量處理,這時候就可以使用聚合批量處理請求。使用聚合,可以極大的提高服務(wù)的處理能力,是高并發(fā)場景中效果最好的優(yōu)化之一。
下面介紹兩種任務(wù)聚合的實現(xiàn)方式,目前代碼只針對了處理聚合的時間間隔,沒有控制可聚合的最大數(shù)量,如有需要可以在代碼上加以處理。

聚合實現(xiàn)的基礎(chǔ)類

需要實現(xiàn)聚合,至少需要兩個方法,一個為插入一個聚合任務(wù)的子任務(wù)的方法,一個為處理聚合后的任務(wù)。此外還需要控制聚合時間,比如把5秒內(nèi)的請求數(shù)據(jù)聚合。
基礎(chǔ)類如下:

public abstract class Polymerization {
    protected int polTime = 5000;
    public Polymerization(int polTime){
        this.polTime = polTime;
    }

    /**
     * 插入一個聚合任務(wù)的子任務(wù)
     * @param data 任務(wù)數(shù)據(jù)
     * @param polymerizationId 任務(wù)類型的id
     */
    public abstract void pushPolymerization(String data,String polymerizationId);

    /**
     * 處理聚合后的任務(wù)
     * @param data 任務(wù)聚合后數(shù)據(jù)列表
     * @param polymerizationId 任務(wù)類型的id
     */
    public abstract void dealData(List<String> data, String polymerizationId);
}

本地任務(wù)聚合的實現(xiàn)方式

實現(xiàn)思路

因為只是本地任務(wù)聚合,只需要考慮多線程安全問題。從效率、性能考慮,直接只使用一個ConcurrentLinkedQueue類型的成員變量,配合內(nèi)部類封裝時間,任務(wù)id,任務(wù)數(shù)據(jù)。邏輯代碼可以無鎖執(zhí)行,而ConcurrentLinkedQueue本身是高性能多線程安全類。所以在性能方面是非常優(yōu)秀的。實現(xiàn)代碼如下,邏輯已注釋:

public class PolymerizationLocalDemo extends Polymerization{
    private ConcurrentLinkedQueue<PolData> dataQueue = new ConcurrentLinkedQueue<>();

    public PolymerizationLocalDemo(int polTime) {
        super(polTime);
    }

    /**
     * 聚合任務(wù)封裝類
     */
    @Data
    @AllArgsConstructor
    private static class PolData{
        /**
         * 任務(wù)數(shù)據(jù)
         */
        private String data;
        /**
         * 任務(wù)類型id
         */
        private String polymerizationId;
        /**
         * 任務(wù)的出生時間
         */
        private long time;
    }
    @Override
    public void pushPolymerization(String data, String polymerizationId){
        //直接插入到隊列尾部
        dataQueue.add(new PolData(data,polymerizationId,System.currentTimeMillis()));
    }
    @Override
    @Async
    public void dealData(List<String> data,String polymerizationId){
        //todo 這里加處理邏輯就好
    }
    @Scheduled(fixedDelay = 1000)
    public void timeJob() {
        PolData head = dataQueue.peek();
        long now = System.currentTimeMillis();
        //讀取頭部數(shù)據(jù),當(dāng)頭部數(shù)據(jù)距離當(dāng)前大于polTime,就處理頭部數(shù)據(jù)時間+polTime時間范圍內(nèi)的數(shù)據(jù)
        if(head == null|| head.getTime()+polTime>now){
            return;
        }
        List<PolData> data = new LinkedList<>();
        //如果當(dāng)前隊列不為空且頭部數(shù)據(jù)時間屬于聚合時間范圍內(nèi),則poll出頭部數(shù)據(jù)
        while (dataQueue.peek()!=null&&dataQueue.peek().getTime() < head.getTime()+polTime) {
            data.add(dataQueue.poll());
        }
        //根據(jù)任務(wù)類型id分類
        Map<String,List<PolData>> mapDate = data.stream().collect(Collectors.groupingBy(PolData::getPolymerizationId));
        //分類后調(diào)用dealData處理聚合
        for(Map.Entry<String,List<PolData>> entry:mapDate.entrySet()){
            dealData(entry.getValue().stream().map(PolData::getData).collect(Collectors.toList()), entry.getKey());
        }
    }
}

分布式任務(wù)聚合的實現(xiàn)方式

實現(xiàn)思路

因為是分布式環(huán)境,所以要聚合的話需要使用redis作為中間件來存儲信息和過期信息。并用rua腳本來保證操作的原子性。需要定時任務(wù)定時插入空數(shù)據(jù)去判斷是否可以聚合了。這樣設(shè)計邏輯比較簡單,垃圾數(shù)據(jù)量可控制。如果需要進一步優(yōu)化,則需要用兩個lua腳本來實現(xiàn)。代碼如下,邏輯已注釋

public class PolymerizationDemo extends Polymerization{
    /**
     * lua腳本,大致邏輯:
     * 首先拿取當(dāng)前類型任務(wù)的到期時間
     * 然直接插入redis隊列
     * 如果當(dāng)前時間小于過期時間,返回0和隊列的長度
     * 如果當(dāng)前時間已經(jīng)大于過期時間,則把所有數(shù)據(jù)返回,并刪除隊列數(shù)據(jù),并重新設(shè)置聚合到期時間
     **/
    private static final String SCRIPT_LUA = "" +
            "  local ts = redis.call('GET', KEYS[2])" +
            "local size =redis.call('RPUSH', KEYS[1], ARGV[1])"+
            "  if tonumber( ARGV[2]) < tonumber(ts) then" +
            "    return {0,  size}" +
            "  else" +
            "    redis.call('set', KEYS[2], ARGV[3])" +
            "    local items = redis.call('LRANGE', KEYS[1], -1, size)" +
            "    redis.call('DEL', KEYS[1])" +
            "    return items" +
            "  end";
    @Autowired
    private StringRedisTemplate redis;

    private static final String EXPIRE_KEY = "expire_key_";
    private static final String DATA_KEY = "expire_key_";

    public PolymerizationDemo(int polTime) {
        super(polTime);
    }


    public List evalScript(String lua, List<String> keys, Object... values) {
        DefaultRedisScript<List> redisScript = new DefaultRedisScript<List>(lua, List.class);
        return redis.execute(redisScript,keys,values);
    }
    @Override
    public void pushPolymerization(String data, String polymerizationId){
        //當(dāng)前任務(wù)類型的一次聚合的最終時間的key
        String timeExpireKey = EXPIRE_KEY+polymerizationId;
        //當(dāng)前任務(wù)類型保存數(shù)據(jù)的隊列的key
        String listDateKey = DATA_KEY+polymerizationId;
        long timeNow = System.currentTimeMillis();
        //如果當(dāng)前聚合成功后,新的聚合到期時間
        long timePreExpire = timeNow+polTime;
        List<String> keys = Lists.newArrayList(timeExpireKey,listDateKey);
        //執(zhí)行rua腳本
        List<String> result = (List<String>) evalScript(SCRIPT_LUA,keys,data,timeNow,timePreExpire);
        // 如果第一個返回0,表示還沒有到聚合的時候
        if(!"0".equals(result.get(0))){
            dealData(result,polymerizationId);
        }
    }
    @Override
    @Async
    public void dealData(List<String> data,String polymerizationId){
        //移除無用的心跳數(shù)據(jù)
        while (data.remove("empty")){

        }
        //邏輯
    }
    @Scheduled(fixedDelay = 1000)
    public void timeJob(){
        List<String> idList =  Lists.newArrayList("id1","id2","id3");
        for(String id:idList){
            //嘗試插入特殊標記empty的數(shù)據(jù)來嘗試聚合
            pushPolymerization("empty",id);
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容