Java百萬級別數(shù)據(jù)異步導出功能實現(xiàn)

前言

談到以excel格式導出數(shù)據(jù),很容易想到的實現(xiàn)思路就是:前端發(fā)送導出請求,后端使用org.apache.poi進行數(shù)據(jù)處理,然后使用reponse把流文件響應給前端,供客戶端下載。但是這種實現(xiàn)方式會帶來以下問題:
1.數(shù)據(jù)量比較大的情況下,會導致OOM
2.數(shù)據(jù)龐大,后端響應前端超時
3.采用同步方式導出數(shù)據(jù),響應時間長,用戶體驗感差
基于此,本文提供一種異步導出數(shù)據(jù)的方案,實現(xiàn)百萬級別數(shù)據(jù)的導出。

程序流程圖

程序流程圖.png

技術采用

Spring Boot、線程池、Hikari連接池數(shù)據(jù)庫連接池、七牛云、MyBatis-Plus、定時器Scheduled、POI,MySQL

技術細節(jié)

1.線程池

Q1:采用線程池有什么好處?
A1:
①降低資源消耗:通過池化技術重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗;
②提高響應速度:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行;
③提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調優(yōu)和監(jiān)控。

Q2:ThreadPoolExecutor參數(shù)含義?
A2:
①corePoolSize:核心線程數(shù)
* 核心線程會一直存活,即使沒有任務需要執(zhí)行
* 當線程數(shù)小于核心線程數(shù)時,即使有線程空閑,線程池也會優(yōu)先創(chuàng)建新線程處理
* 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉

②queueCapacity:任務隊列容量(阻塞隊列)
* 當核心線程數(shù)達到最大時,新任務會放在隊列中排隊等待執(zhí)行

③maxPoolSize:最大線程數(shù)
* 當線程數(shù)>=corePoolSize,且任務隊列已滿時。線程池會創(chuàng)建新線程來處理任務
* 當線程數(shù)=maxPoolSize,且任務隊列已滿時,線程池采取拒絕策略

④keepAliveTime:線程空閑時間
* 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數(shù)量=corePoolSize
* 如果allowCoreThreadTimeout=true,則會直到線程數(shù)量=0

⑤allowCoreThreadTimeout:允許核心線程超時

⑥r(nóng)ejectedExecutionHandler:任務拒絕處理器
* 兩種情況會拒絕處理任務:
- 當線程數(shù)已經(jīng)達到maxPoolSize,切隊列已滿,會拒絕新任務
- 當線程池被調用shutdown()后,會等待線程池里的任務執(zhí)行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
* 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
* ThreadPoolExecutor類有幾個內(nèi)部實現(xiàn)類來處理這類情況:
- AbortPolicy 丟棄任務,拋運行時異常
- CallerRunsPolicy 交由提交線程任務的線程執(zhí)行,會影響程序的整體性能
- DiscardPolicy 不處理新任務,直接丟棄
- DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執(zhí)行)的任務
* 實現(xiàn)RejectedExecutionHandler接口,可自定義處理器

Q3:線程池任務調度流程?
A3:

任務調度流程.png

線程池的配置:
@Configuration
public class TaskExecutorConfig {
    /**
     * 文件導出線程池
     */
    @Bean
    public ThreadPoolTaskExecutor fileExportExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(6);
        executor.setQueueCapacity(2);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("fileExportExecutor-");
        // 設置拒絕策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任務結束后再關閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(3);
        return executor;
    }
}
任務執(zhí)行
//將導出任務交給線程池執(zhí)行
Future<Boolean> future = fileExportExecutor.submit(exportDataTask);

執(zhí)行線程任務的方式有兩種:
①excute(),用于提交不需要返回值的任務,無法判斷是否任務已經(jīng)執(zhí)行完成
②submit(),用于提交需要返回值的任務,會返回一個Future對象,通過調用Future類中的get()可以獲取任務的執(zhí)行結果

線程任務偽代碼
@Component
@Data
/**
 * spring容器管理的對象默認是單例,需貼上@Scope("prototype")設置成多例
 * 將對象注入到ObjectFactory類中,通過getObject()方法獲取
 *  @Autowired
 *   private ObjectFactory<ExportDataTask> exportDataTasks;
 */
@Scope("prototype")
public class ExportDataTask implements Callable<Boolean> {
    @Autowired
    private FileService fileService;
    @Override
    public Boolean call() throws Exception {
        try {
            //TODO:將狀態(tài)改成進行中
            //導出數(shù)據(jù)到excel,并且將文件上傳到七牛云,返回文件在七牛云的下載路徑以及導出的記錄數(shù)
            map = fileService.exportExcel(template.getSqlStr(), params, headers, containBean, recordId);
            if (map == null || Thread.currentThread().isInterrupted()) {
                return false;
            }
            //TODO:將文件在七牛云下載路徑寫入數(shù)據(jù)庫,并將導出狀態(tài)改成已完成
            return true;
        }catch (Exception e) {
            //TODO:把導出狀態(tài)改成導出異常
            log.error(e.getMessage(),e);
            return false;
        }
    }

創(chuàng)建線程任務的方式有三種:①繼承Thread類;②實現(xiàn)Runnable接口;③實現(xiàn)Callable接口
引用鏈接:美團技術團隊->https://tech.meituan.com/archives

2.數(shù)據(jù)庫連接池

Q1:數(shù)據(jù)庫連接池的主要參數(shù)?
A1:
①driver-class-name:驅動類名
②maxLifetime:一個連接的生命時長(毫秒),超時而且沒被使用則被釋放
③maximumPoolSize:連接池中允許的最大連接數(shù)
④url:數(shù)據(jù)庫url
⑤username:數(shù)據(jù)庫用戶名
⑥password:數(shù)據(jù)庫密碼

數(shù)據(jù)庫連接池執(zhí)行sql語句
        //從數(shù)據(jù)庫連接池中獲取連接對象
        Connection conn = dataSource.getConnection();
        //獲取sql預編譯語句,使用預編譯語句可以防止sql注入
        stmt = conn.prepareStatement(sqlStr);
        //設置sql參數(shù)
        if (StringUtils.hasLength(params)) {
            String[] paramGroup = params.split(",");
            for (int i = 0; i < paramGroup.length; i++) {
                stmt.setObject(i + 1, paramGroup[i]);
            }
        }
        //執(zhí)行sql語句
        rs = stmt.executeQuery();
3.使用poi將數(shù)據(jù)轉換成工作簿
        //創(chuàng)建一個excel工作簿
        SXSSFWorkbook workbook = new SXSSFWorkbook();
        SXSSFSheet sheet = null;
        //設置每一行的數(shù)據(jù)
        while (rs.next()) {
            //一個表只能存1048575行,多了就換表
            if (count % 1048575 == 0){
                sheet = workbook.createSheet();
                SXSSFRow row0 = sheet.createRow(0);
                String[] titles = headers.split(",");
                //初始化行號
                line = 0;
                //設置表頭
                for (int i = 0; i < titles.length; i++) {
                    SXSSFCell cell = row0.createCell(i);
                    cell.setCellValue(titles[i]);
                }
            }
            count++;
            line++;
            //新建一行
            SXSSFRow row = sheet.createRow(line);
            //設置每一項的數(shù)據(jù)
            for (int j = 0; j < beans.length; j++) {
                if (Thread.currentThread().isInterrupted()) {
                    return null;
                }
                SXSSFCell cell = row.createCell(j);
                cell.setCellValue(rs.getString(beans[j]));
            }
        }

workbook的類型有三種:
①HSSFWorkbook:針對EXCEL 2003版本,擴展名為.xls
②XSSFWorkbook:其對應的是EXCEL2007+ ,擴展名為.xlsx ,最多可以導出104萬行,會出現(xiàn)OOM
③SXSSFWorkbook:可以其對應的是EXCEL2007+,根據(jù)行數(shù)將內(nèi)存中的數(shù)據(jù)持久化寫到文件中,避免OOM

4.七牛云

Q1:七牛云的主要參數(shù)?
A1:
①accessKey:-
②secretKey:-
③bucket:-
④domain:域名

       ByteArrayOutputStream bos = new ByteArrayOutputStream();
        //將數(shù)據(jù)寫到數(shù)據(jù)流中,并存放在字節(jié)數(shù)組中
        workbook.write(bos);
        byte[] bytes = bos.toByteArray();
        rs.close();
        bos.close();
        Calendar calendar = Calendar.getInstance();
        int year = calendar.get(Calendar.YEAR);
        int month = calendar.get(Calendar.MONTH) + 1;
        //文件名
        String key = year + "/" + month + "/" +
                UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6) + ".xlsx";
        //創(chuàng)建七牛云token
        String token = Auth.create(accessKey, secretKey).uploadToken(bucket);
        UploadManager uploadManager = new UploadManager(new Configuration(Region.region2()));
        //上傳至七牛云
        Response response = uploadManager.put(bytes, key, token);
        //處理結果
        DefaultPutRet putRet = new Gson().fromJson(response.bodyString(), DefaultPutRet.class);
        //文件在七牛云上面的下載鏈接
        String qiniuUrl = domain + "/" + putRet.key;
5.任務中斷

實現(xiàn)思路:任務提交給線程池執(zhí)行后,返回Future對象,按照:導出記錄id->future對象的格式存入ExpiringMap中,中斷時使用future對象調用cancel(true)方法,并且捕捉future對象調用get()方法拋出的的CancellationException

private final static ExpiringMap<Integer, Future> threadMap = ExpiringMap.builder()
                                                                              .expiration(2, TimeUnit.HOURS)
                                                                              .expirationPolicy(ExpirationPolicy.CREATED)
                                                                              .build();
        future.cancel(true);
        try {
            future.get();
        } catch (CancellationException e) {
            exportRecordDAO.update(null,updateWrapper);
            log.info("記錄id為{}的任務已經(jīng)被取消",recordId);
6.定時器

作用:定時將因宕機或者長時間導出未響應的導出記錄切換成異常狀態(tài)
在啟動類上貼上@EnableScheduling注解:

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

在定時任務上貼上@Scheduled注解:

    @Scheduled(cron = " 0 0 0/2 * * ?")
    public void executeScheduledTask(){
        //TODO:將超過兩個小時還處于未開始,進行中狀態(tài)中的導出記錄切換成導出異常
    }
7.系統(tǒng)限流

考慮到并發(fā)情況下導出大批量數(shù)據(jù),會造成服務器CPU負載過高,所以針對單用戶,同時導出的任務不能超多2個。實現(xiàn)方式有兩種:①到數(shù)據(jù)庫中統(tǒng)計當前用戶導出記錄處于進行中的數(shù)量;②將用戶的導出記錄處于進行中的數(shù)量存入redis中,從redis中獲取數(shù)據(jù)作判斷

實現(xiàn)功能

實測可以導出860萬的數(shù)據(jù),加上sql運行時間,總耗時2分鐘

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容