相信不少開發(fā)者在遇到項(xiàng)目對(duì)數(shù)據(jù)進(jìn)行批量操作的時(shí)候,都會(huì)有不少的煩惱,尤其是針對(duì)數(shù)據(jù)量極大的情況下,效率問題就直接提上了菜板。
因此,開多線程來執(zhí)行批量任務(wù)是十分重要的一種批量操作思路,其實(shí)這種思路實(shí)現(xiàn)起來也十分簡(jiǎn)單,就拿批量更新的操作舉例。
整體流程如下:

image.png
步驟如下:
- 獲取需要進(jìn)行批量更新的大集合 A,對(duì)大集合進(jìn)行拆分操作,分成 N 個(gè)小集合 A-1 ~ A-N 。
- 開啟線程池,針對(duì)集合的大小進(jìn)行調(diào)參,對(duì)小集合進(jìn)行批量更新操作。
- 對(duì)流程進(jìn)行控制,控制線程執(zhí)行順序。
按照指定大小拆分集合的工具類:
public static <T> List<List<T>> split(List<T> resList, int subListLength) {
if (resList.isEmpty() || subListLength <= 0) {
return new ArrayList<>();
}
List<List<T>> ret = new ArrayList<>();
int size = resList.size();
if (size <= subListLength) {
// 數(shù)據(jù)量不足 subListLength 指定的大小
ret.add(resList);
} else {
int pre = size / subListLength;
int last = size % subListLength;
// 前面pre個(gè)集合,每個(gè)大小都是 subListLength 個(gè)元素
for (int i = 0; i < pre; i++) {
List<T> itemList = new ArrayList<>();
for (int j = 0; j < subListLength; j++) {
itemList.add(resList.get(i * subListLength + j));
}
ret.add(itemList);
}
//last的處理
if (last > 0) {
List<T> itemList = new ArrayList<>();
for (int i = 0; i < last; i++) {
itemList.add(resList.get(pre * subListLength + i));
}
ret.add(itemList);
}
}
return ret;
}
開啟異步執(zhí)行任務(wù)的線程池:
//開啟多線程
public void threadMethod() {
List<T> updateList = new ArrayList();
//初始化線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
// 大集合拆分成N個(gè)小集合, 這里集合的size可以稍微小一些(這里我用100剛剛好), 以保證多線程異步執(zhí)行, 過大容易回到單線程
List<T> splitNList = SplitListUtils.split(totalList, 100);
// 記錄單個(gè)任務(wù)的執(zhí)行次數(shù)
CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
// 對(duì)拆分的集合進(jìn)行批量處理, 先拆分的集合, 再多線程執(zhí)行
splitNList.stream().forEach(o -> {
//線程池執(zhí)行
threadPool.execute(new Thread(new Runnable() {
@Override
public void run() {
for (Entity yangshiwen : singleList) {
// 將每一個(gè)對(duì)象進(jìn)行數(shù)據(jù)封裝, 并添加到一個(gè)用于存儲(chǔ)更新數(shù)據(jù)的list
// ......
}
}
}));
// 任務(wù)個(gè)數(shù) - 1, 直至為0時(shí)喚醒a(bǔ)wait()
countDownLatch.countDown();
});
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
// 通過mybatis的批量插入的方式來進(jìn)行數(shù)據(jù)的插入, 這一步還是要做判空
if (!updateList.isEmpty()) {
batchUpdateEntity(updateList);
}
}