并行處理框架parseq調(diào)研

引入目的:為解決項(xiàng)目中存在的并且不好優(yōu)化的n+1查詢慢問題

使用時(shí)異步調(diào)用僅需一行 engine.run(e -> e.printStackTrace(), suppliers);

示例效果:

查詢一組20個(gè)數(shù)據(jù),每個(gè)子查詢需要耗時(shí)1秒,返回結(jié)果并行組件需要1061ms,串行需要20057ms。并行處理的速度取決于任務(wù)數(shù)、線程數(shù)和子查詢中最慢的那個(gè)。

package com.github.chenmingang.parseq;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

public class OnlineSimulation {
    //    static EngineAgent engine = EngineFactory.defaultEngine();
    static EngineAgent engine = EngineFactory.getEngine(20, 1, 20);

    public static void main(String[] args) {
        long l1 = System.currentTimeMillis();
        List<Model> result1 = new OnlineSimulation().queryAsync();
        long l2 = System.currentTimeMillis();
        System.out.println("time1:" + (l2 - l1));
        //time1:1063

        long l3 = System.currentTimeMillis();
        List<Model> result2 = new OnlineSimulation().query();
        long l4 = System.currentTimeMillis();
        System.out.println("time2:" + (l4 - l3));
        //time2:20060

        engine.shutdown();
    }

    //n+1 查詢
    public List<Model> query() {
        // 第一步查詢
        List<Model> modelList = new ArrayList<>();
        for (int i = 1; i <= 20; i++) {
            modelList.add(new Model(i));
        }
        // 第二步查詢
        for (Model model : modelList) {
            setName(model);
        }
        return modelList;
    }

    //n+1 查詢
    public List<Model> queryAsync() {

        // 第一步查詢
        List<Model> modelList = new ArrayList<>();
        for (int i = 1; i <= 20; i++) {
            modelList.add(new Model(i));
        }
        // 第二步查詢
        List<Supplier<Model>> suppliers = new ArrayList<>();
        for (Model model : modelList) {
            suppliers.add(() -> setName(model));
        }
        engine.run(e -> e.printStackTrace(), suppliers);

        return modelList;
    }

    /**
     * 執(zhí)行子查詢并合并數(shù)據(jù)的模擬
     *
     * @param model
     * @return
     */
    private Model setName(Model model) {
        model.setName("name-" + model.getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
//        throw new RuntimeException("");
        return model;
    }

    class Model {
        private Integer id;
        private String name;

        public Model(Integer id) {
            this.id = id;
        }

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

根據(jù)這種列表式的查詢場景對parseq做的封裝

package com.github.chenmingang.parseq;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.batching.BatchingSupport;

import java.util.concurrent.*;

public class EngineFactory {

    private static EngineFactory INSTANCE = new EngineFactory();
    private final EngineAgent defaultEngine;

    private EngineFactory() {
        int numCores = Runtime.getRuntime().availableProcessors();
        defaultEngine = getEngine(numCores + 1, 1, numCores + 1);
    }

    public static EngineAgent defaultEngine() {
        return INSTANCE.defaultEngine;
    }

    public static EngineAgent getEngine(int poolSize, int scheduleSize, int queueNum) {
        ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(scheduleSize);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
        ThreadPoolExecutor executors = new ThreadPoolExecutor(poolSize, poolSize,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueNum), threadFactory,
                (r, executor) -> {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });

        final EngineBuilder builder = new EngineBuilder().setTaskExecutor(executors).setTimerScheduler(scheduler);
        final BatchingSupport batchingSupport = new BatchingSupport();

        builder.setPlanDeactivationListener(batchingSupport);
        return new EngineAgent(builder.build(), executors, scheduler);
    }

}
package com.github.chenmingang.parseq;

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Supplier;

public class EngineAgent {

    private Engine engine;
    private ThreadPoolExecutor executors;
    private ScheduledExecutorService scheduler;

    public EngineAgent(Engine engine, ThreadPoolExecutor executors,ScheduledExecutorService scheduler) {
        this.engine = engine;
        this.executors = executors;
        this.scheduler = scheduler;
    }
/**
     * 并行運(yùn)行任務(wù)
     *
     * @param exceptionHandle 異常處理
     * @param suppliers       待執(zhí)行的子任務(wù)
     * @param <T>             每個(gè)子任務(wù)的返回結(jié)果
     * @return
     */
    public <T> List<T> run(Consumer<Throwable> exceptionHandle, Supplier<T>... suppliers) {
        List<Supplier<T>> list = new ArrayList<>();
        Collections.addAll(list, suppliers);
        return run(exceptionHandle, list);
    }

    /**
     * 并行運(yùn)行任務(wù)
     *
     * @param exceptionHandle 異常處理
     * @param suppliers       待執(zhí)行的子任務(wù)
     * @param <T>             每個(gè)子任務(wù)的返回結(jié)果
     * @return
     */
    public <T> List<T> run(Consumer<Throwable> exceptionHandle, List<Supplier<T>> suppliers) {
        if (suppliers == null || suppliers.size() == 0) {
            return null;
        }
        List<Task<T>> tasks = new ArrayList<>();
        for (Supplier<T> supplier : suppliers) {
            Task<T> task = this.task(supplier);
            tasks.add(task);
        }
        ParTask<T> parTask = Tasks.par(tasks);
        engine.run(parTask);
        try {
            parTask.await();
            List<T> successful = parTask.getSuccessful();
            Throwable error = parTask.getError();
            if (error != null && exceptionHandle != null) {
                exceptionHandle.accept(error);
            }
            return successful;
        } catch (InterruptedException e) {
            logger.error("中斷異常", e);
        }
        return null;
    }

    public <T> SettablePromise<T> async(Supplier<T> supplier) {
        final SettablePromise<T> promise = Promises.settable();
        getExecutors().execute(() -> {
            try {
                promise.done(supplier.get());
            } catch (Exception e) {
                promise.fail(e);
            }
        });
        return promise;
    }

    public <T> Task<T> task(Supplier<T> supplier) {
        return Task.async(() -> async(supplier));
    }

    public void run(final Task<?> task) {
        engine.run(task);
    }

    public void shutdown() {
        engine.shutdown();
        executors.shutdown();
    }

    public ThreadPoolExecutor getExecutors() {
        return executors;
    }

    public ScheduledExecutorService getScheduler() {
        return scheduler;
    }
}

依賴版本

        <dependency>
            <groupId>com.linkedin.parseq</groupId>
            <artifactId>parseq</artifactId>
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>com.linkedin.parseq</groupId>
            <artifactId>parseq-batching</artifactId>
            <version>2.6.5</version>
        </dependency>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容