上傳文件多線程批量落盤

編寫一個抽象類

import com.alibaba.druid.util.StringUtils;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;

import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;

import com.iflytek.epdcloud.mhk.mark.common.entity.MachineEvaluate;

import com.iflytek.epdcloud.mhk.mark.common.enumeration.ResponseEnumeration;

import com.iflytek.epdcloud.mhk.mark.web.exception.BusinessException;

import lombok.extern.slf4j.Slf4j;

import org.springframework.web.multipart.MultipartFile;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.HashMap;

import java.util.List;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.Collectors;

@Slf4j

public abstract class? FileUploadService {

public T t;

? ? //默認一批是1000個

? ? public static final IntegerbatchSizeIndex =1000;

? ? public static final IntegerThreadNumIndex =15;

? ? /**

? ? * 上傳文件

? ? * @param file

? ? * @param fileRunnable

? ? * @param batchSize

? ? * @param threadNum

? ? * @param list

? ? * @param args

? ? */

? public void uploadFile(MultipartFile file,FileRunnable fileRunnable,Integer batchSize,Integer threadNum,ArrayList list,Object ... args){

analyse(file,fileRunnable,batchSize,threadNum,list,args);

? }

/**

? ? * 解析文件

? ? * @param file

? ? * @param fileRunnable

? ? * @param batchSize

? ? * @param threadNum

? ? * @param list

? ? * @param args

? ? */

? ? ? public? void analyse(MultipartFile file,FileRunnable fileRunnable,Integer batchSize,Integer threadNum,ArrayList list,Object... args){

AtomicInteger atomicInteger =new AtomicInteger(0);

? ? ? ? HashMap map =new HashMap<>();

? ? ? ? BufferedReader bufferedReader =null;

? ? ? ? InputStreamReader inputStreamReader =null;

? ? ? ? InputStream is=null;

? ? ? ? try {

is = file.getInputStream();

? ? ? ? ? ? inputStreamReader =new InputStreamReader(is);

? ? ? ? ? ? bufferedReader =new BufferedReader(inputStreamReader);

? ? ? ? ? ? String line =null;

? ? ? ? ? ? while((line= bufferedReader.readLine())!=null) {

String[] array = line.split("\t");

? ? ? ? ? ? ? ? List collect = Arrays.stream(array).filter(s12 ->

!StringUtils.isEmpty(s12)

).collect(Collectors.toList());

? ? ? ? ? ? ? ? if((!CollectionUtils.isEmpty(collect))){

atomicInteger.incrementAndGet();

? ? ? ? ? ? ? ? ? ? modify(collect,list,args);

? ? ? ? ? ? ? ? }else {

log.error("第{}行的數(shù)據(jù)格式有誤",atomicInteger.toString());

? ? ? ? ? ? ? ? }

}

bufferedReader.close();

? ? ? ? }catch (Exception e){

log.error("文件解析失敗:{}",e.getMessage());

? ? ? ? ? ? throw? new BusinessException(ResponseEnumeration.MACHINE_FILE_ANALYSE_ERROR);

? ? ? ? }finally {

try {

if (bufferedReader !=null) {

bufferedReader.close();

? ? ? ? ? ? ? ? }

if (inputStreamReader !=null) {

inputStreamReader.close();

? ? ? ? ? ? ? ? }

}catch (IOException e) {

throw? new BusinessException(ResponseEnumeration.MACHINE_FILE_ANALYSE_ERROR);

? ? ? ? ? ? }

}

batchSave(list, batchSize, fileRunnable,threadNum);

? ? }

/**

? ? * 批量添加數(shù)據(jù)到數(shù)據(jù)庫

? ? * @param modifyList

? ? * @param batchSize

? ? * @param runnable

? ? * @param threadNum

? ? */

? ? public void batchSave(List modifyList, Integer batchSize, FileRunnable runnable, Integer threadNum) {

if(CollectionUtils.isEmpty(modifyList)){

log.info("沒有數(shù)據(jù)要添加數(shù)據(jù)庫");

return;

? ? ? ? }

if(ObjectUtils.isEmpty(batchSize))

batchSize =batchSizeIndex;

? ? ? ? if(ObjectUtils.isEmpty(threadNum))

threadNum =ThreadNumIndex;

? ? ? ? int batchNum = modifyList.size() / batchSize;

? ? ? ? CountDownLatch countDownLatch =new CountDownLatch(batchNum+1);

? ? ? ? LinkedBlockingQueue runnables =new LinkedBlockingQueue<>(Integer.MAX_VALUE);

? ? ? ? ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(threadNum,

? ? ? ? ? ? ? ? threadNum,

? ? ? ? ? ? ? ? 20,

? ? ? ? ? ? ? ? TimeUnit.SECONDS,

? ? ? ? ? ? ? ? runnables,new BatchSaveThreadFactory());

? ? ? ? List newList =null;

? ? ? ? for (int i=1;i<=batchNum+1;i++

) {

if(i!=batchNum+1){

newList = modifyList.subList((i-1)*batchSize,i*batchSize);

? ? ? ? ? ? ? ? FileRunnable fileRunnable = runnable.newInstance();

? ? ? ? ? ? ? ? fileRunnable.setCountDownLatch(countDownLatch);

? ? ? ? ? ? ? ? fileRunnable.setList(newList);

? ? ? ? ? ? ? ? threadPoolExecutor.execute(fileRunnable);

? ? ? ? ? ? }else {

newList = modifyList.subList((i-1)*batchSize,modifyList.size());

? ? ? ? ? ? ? ? FileRunnable fileRunnable = runnable.newInstance();

? ? ? ? ? ? ? ? fileRunnable.setCountDownLatch(countDownLatch);

? ? ? ? ? ? ? ? fileRunnable.setList(newList);

? ? ? ? ? ? ? ? threadPoolExecutor.execute(fileRunnable);

? ? ? ? ? ? }

}

try {

countDownLatch.await();

? ? ? ? }catch (InterruptedException e) {

log.error("文件落盤失敗:{}",e.getMessage());

? ? ? ? ? ? throw new BusinessException(ResponseEnumeration.MACHINE_FILE_SAVE_ERROR);

? ? ? ? }

threadPoolExecutor.shutdown();

? ? }

/**

? ? * 解析之后每行數(shù)據(jù)的處理

? ? * @param collect

? ? * @param list

? ? * @param args

? ? */

? ? public abstract? void modify(List collect,List list,Object... args);

}


具體每個線程執(zhí)行的業(yè)務邏輯

在落盤時要根據(jù)業(yè)務場景選擇是否去重

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.CountDownLatch;

public? abstract class FileRunnableimplements Runnable{

private CountDownLatchcountDownLatch;

? ? private Listlist;

? ? public abstract FileRunnablenewInstance();

? ? @Override

? ? public void run() {

long l = System.currentTimeMillis();

? ? ? ? doSave(list);

? ? ? ? countDownLatch.countDown();

? ? ? ? System.out.println("耗時"+(System.currentTimeMillis()-l));

? ? }

public abstract void doSave(List list);

? ? public void setCountDownLatch(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

? ? }

public void setList(List list) {

this.list = list;

? ? }

}


繼承抽象類進行操作

import com.alibaba.druid.util.StringUtils;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;

import com.iflytek.epdcloud.mhk.mark.common.entity.MachineEvaluate;

import com.iflytek.epdcloud.mhk.mark.common.enumeration.MachineEvaluateStatusEnumeration;

import com.iflytek.epdcloud.mhk.mark.common.enumeration.ResponseEnumeration;

import com.iflytek.epdcloud.mhk.mark.common.service.IMachineEvaluateService;

import com.iflytek.epdcloud.mhk.mark.common.utils.SnowflakeIdWorker;

import com.iflytek.epdcloud.mhk.mark.web.buiness.MachineEvaluateBusiness;

import com.iflytek.epdcloud.mhk.mark.web.exception.BusinessException;

import com.iflytek.epdcloud.mhk.mark.web.executor.BatchSaveThreadFactory;

import com.iflytek.epdcloud.mhk.mark.web.executor.FileUploadService;

import com.iflytek.epdcloud.mhk.mark.web.executor.MachineEvalutteTask;

import com.iflytek.epdcloud.mhk.mark.web.model.param.MachineEvaluateDto;

import com.iflytek.epdcloud.mhk.mark.web.model.vo.MachineEvaluateVo;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;

import org.springframework.web.multipart.MultipartFile;

import java.util.*;

/**

*?

*/

@Slf4j

@Service

public class MachineEvaluateBusinessImplextends FileUploadServiceimplements MachineEvaluateBusiness? {

@Autowired

? ? private IMachineEvaluateServiceiMachineEvaluateService;

? ? @Transactional

@Override

? ? public void upload(MachineEvaluateDto dto)? {

HashMap map =new HashMap<>();

? ? ? ? ArrayList machineEvaluateArrayList =new ArrayList<>();

? ? ? ? MachineEvalutteTask machineEvalutteTask =new MachineEvalutteTask(iMachineEvaluateService, machineEvaluateArrayList);

? ? ? ? uploadFile(dto.getFile(),machineEvalutteTask,null ,null,machineEvaluateArrayList,dto.getQuestionTypeId(),map);

? ? }

@Override

? ? public MachineEvaluateVogetUploadResult(Long questionTypeId) {

MachineEvaluateVo machineEvaluateVo =new MachineEvaluateVo();

? ? ? ? Long total =iMachineEvaluateService.getTotal(questionTypeId,null);

? ? ? ? Long validTotal =iMachineEvaluateService.getTotal(questionTypeId,MachineEvaluateStatusEnumeration.EVALUATE_ENTERED.getCode());

? ? ? ? machineEvaluateVo.setUploadTotal(total);

? ? ? ? machineEvaluateVo.setValidTotal(validTotal);

? ? return machineEvaluateVo;

? ? }

@Override

? ? public void modify(List collect,List list, Object... args) {

{

while((collect.size()>=2)&&(collect.size()%2==0))

{

List strings = collect.subList(0, 2);

? ? ? ? ? ? ? ? String s1 = strings.get(0);

? ? ? ? ? ? ? ? String s2 = strings.get(1);

? ? ? ? ? ? ? ? Long questionTypeId = (Long) args[0];

? ? ? ? ? ? ? ? Map map = (Map) args[1];

? ? ? ? ? ? ? ? strings.clear();

? ? ? ? ? ? ? ? String s3 = map.get(s1+questionTypeId);

? ? ? ? ? ? ? ? if(!StringUtils.isEmpty(s3))

break;

? ? ? ? ? ? ? ? map.put(s1+questionTypeId,s2);

? ? ? ? ? ? ? ? MachineEvaluate machineEvaluate =new MachineEvaluate();

? ? ? ? ? ? ? ? machineEvaluate.setExamId(s1);

? ? ? ? ? ? ? ? machineEvaluate.setQuestionTypeId(questionTypeId);

? ? ? ? ? ? ? ? machineEvaluate.setScore(s2);

? ? ? ? ? ? ? ? machineEvaluate.setStatus(MachineEvaluateStatusEnumeration.EVALUATE_NOT_ENTER.getCode());

? ? ? ? ? ? ? ? machineEvaluate.setId(SnowflakeIdWorker.generateId());

? ? ? ? ? ? ? ? list.add(machineEvaluate);

? ? ? ? ? ? }

}

}

}

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容