前言
談到以excel格式導出數(shù)據(jù),很容易想到的實現(xiàn)思路就是:前端發(fā)送導出請求,后端使用org.apache.poi進行數(shù)據(jù)處理,然后使用reponse把流文件響應給前端,供客戶端下載。但是這種實現(xiàn)方式會帶來以下問題:
1.數(shù)據(jù)量比較大的情況下,會導致OOM
2.數(shù)據(jù)龐大,后端響應前端超時
3.采用同步方式導出數(shù)據(jù),響應時間長,用戶體驗感差
基于此,本文提供一種異步導出數(shù)據(jù)的方案,實現(xiàn)百萬級別數(shù)據(jù)的導出。
程序流程圖

技術采用
Spring Boot、線程池、Hikari連接池數(shù)據(jù)庫連接池、七牛云、MyBatis-Plus、定時器Scheduled、POI,MySQL
技術細節(jié)
1.線程池
Q1:采用線程池有什么好處?
A1:
①降低資源消耗:通過池化技術重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗;
②提高響應速度:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行;
③提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調優(yōu)和監(jiān)控。
Q2:ThreadPoolExecutor參數(shù)含義?
A2:
①corePoolSize:核心線程數(shù)
* 核心線程會一直存活,即使沒有任務需要執(zhí)行
* 當線程數(shù)小于核心線程數(shù)時,即使有線程空閑,線程池也會優(yōu)先創(chuàng)建新線程處理
* 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
②queueCapacity:任務隊列容量(阻塞隊列)
* 當核心線程數(shù)達到最大時,新任務會放在隊列中排隊等待執(zhí)行
③maxPoolSize:最大線程數(shù)
* 當線程數(shù)>=corePoolSize,且任務隊列已滿時。線程池會創(chuàng)建新線程來處理任務
* 當線程數(shù)=maxPoolSize,且任務隊列已滿時,線程池采取拒絕策略
④keepAliveTime:線程空閑時間
* 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數(shù)量=corePoolSize
* 如果allowCoreThreadTimeout=true,則會直到線程數(shù)量=0
⑤allowCoreThreadTimeout:允許核心線程超時
⑥r(nóng)ejectedExecutionHandler:任務拒絕處理器
* 兩種情況會拒絕處理任務:
- 當線程數(shù)已經(jīng)達到maxPoolSize,切隊列已滿,會拒絕新任務
- 當線程池被調用shutdown()后,會等待線程池里的任務執(zhí)行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
* 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
* ThreadPoolExecutor類有幾個內(nèi)部實現(xiàn)類來處理這類情況:
- AbortPolicy 丟棄任務,拋運行時異常
- CallerRunsPolicy 交由提交線程任務的線程執(zhí)行,會影響程序的整體性能
- DiscardPolicy 不處理新任務,直接丟棄
- DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執(zhí)行)的任務
* 實現(xiàn)RejectedExecutionHandler接口,可自定義處理器
Q3:線程池任務調度流程?
A3:

線程池的配置:
@Configuration
public class TaskExecutorConfig {
/**
* 文件導出線程池
*/
@Bean
public ThreadPoolTaskExecutor fileExportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("fileExportExecutor-");
// 設置拒絕策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任務結束后再關閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(3);
return executor;
}
}
任務執(zhí)行
//將導出任務交給線程池執(zhí)行
Future<Boolean> future = fileExportExecutor.submit(exportDataTask);
執(zhí)行線程任務的方式有兩種:
①excute(),用于提交不需要返回值的任務,無法判斷是否任務已經(jīng)執(zhí)行完成
②submit(),用于提交需要返回值的任務,會返回一個Future對象,通過調用Future類中的get()可以獲取任務的執(zhí)行結果
線程任務偽代碼
@Component
@Data
/**
* spring容器管理的對象默認是單例,需貼上@Scope("prototype")設置成多例
* 將對象注入到ObjectFactory類中,通過getObject()方法獲取
* @Autowired
* private ObjectFactory<ExportDataTask> exportDataTasks;
*/
@Scope("prototype")
public class ExportDataTask implements Callable<Boolean> {
@Autowired
private FileService fileService;
@Override
public Boolean call() throws Exception {
try {
//TODO:將狀態(tài)改成進行中
//導出數(shù)據(jù)到excel,并且將文件上傳到七牛云,返回文件在七牛云的下載路徑以及導出的記錄數(shù)
map = fileService.exportExcel(template.getSqlStr(), params, headers, containBean, recordId);
if (map == null || Thread.currentThread().isInterrupted()) {
return false;
}
//TODO:將文件在七牛云下載路徑寫入數(shù)據(jù)庫,并將導出狀態(tài)改成已完成
return true;
}catch (Exception e) {
//TODO:把導出狀態(tài)改成導出異常
log.error(e.getMessage(),e);
return false;
}
}
創(chuàng)建線程任務的方式有三種:①繼承Thread類;②實現(xiàn)Runnable接口;③實現(xiàn)Callable接口
引用鏈接:美團技術團隊->https://tech.meituan.com/archives
2.數(shù)據(jù)庫連接池
Q1:數(shù)據(jù)庫連接池的主要參數(shù)?
A1:
①driver-class-name:驅動類名
②maxLifetime:一個連接的生命時長(毫秒),超時而且沒被使用則被釋放
③maximumPoolSize:連接池中允許的最大連接數(shù)
④url:數(shù)據(jù)庫url
⑤username:數(shù)據(jù)庫用戶名
⑥password:數(shù)據(jù)庫密碼
數(shù)據(jù)庫連接池執(zhí)行sql語句
//從數(shù)據(jù)庫連接池中獲取連接對象
Connection conn = dataSource.getConnection();
//獲取sql預編譯語句,使用預編譯語句可以防止sql注入
stmt = conn.prepareStatement(sqlStr);
//設置sql參數(shù)
if (StringUtils.hasLength(params)) {
String[] paramGroup = params.split(",");
for (int i = 0; i < paramGroup.length; i++) {
stmt.setObject(i + 1, paramGroup[i]);
}
}
//執(zhí)行sql語句
rs = stmt.executeQuery();
3.使用poi將數(shù)據(jù)轉換成工作簿
//創(chuàng)建一個excel工作簿
SXSSFWorkbook workbook = new SXSSFWorkbook();
SXSSFSheet sheet = null;
//設置每一行的數(shù)據(jù)
while (rs.next()) {
//一個表只能存1048575行,多了就換表
if (count % 1048575 == 0){
sheet = workbook.createSheet();
SXSSFRow row0 = sheet.createRow(0);
String[] titles = headers.split(",");
//初始化行號
line = 0;
//設置表頭
for (int i = 0; i < titles.length; i++) {
SXSSFCell cell = row0.createCell(i);
cell.setCellValue(titles[i]);
}
}
count++;
line++;
//新建一行
SXSSFRow row = sheet.createRow(line);
//設置每一項的數(shù)據(jù)
for (int j = 0; j < beans.length; j++) {
if (Thread.currentThread().isInterrupted()) {
return null;
}
SXSSFCell cell = row.createCell(j);
cell.setCellValue(rs.getString(beans[j]));
}
}
workbook的類型有三種:
①HSSFWorkbook:針對EXCEL 2003版本,擴展名為.xls
②XSSFWorkbook:其對應的是EXCEL2007+ ,擴展名為.xlsx ,最多可以導出104萬行,會出現(xiàn)OOM
③SXSSFWorkbook:可以其對應的是EXCEL2007+,根據(jù)行數(shù)將內(nèi)存中的數(shù)據(jù)持久化寫到文件中,避免OOM
4.七牛云
Q1:七牛云的主要參數(shù)?
A1:
①accessKey:-
②secretKey:-
③bucket:-
④domain:域名
ByteArrayOutputStream bos = new ByteArrayOutputStream();
//將數(shù)據(jù)寫到數(shù)據(jù)流中,并存放在字節(jié)數(shù)組中
workbook.write(bos);
byte[] bytes = bos.toByteArray();
rs.close();
bos.close();
Calendar calendar = Calendar.getInstance();
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH) + 1;
//文件名
String key = year + "/" + month + "/" +
UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6) + ".xlsx";
//創(chuàng)建七牛云token
String token = Auth.create(accessKey, secretKey).uploadToken(bucket);
UploadManager uploadManager = new UploadManager(new Configuration(Region.region2()));
//上傳至七牛云
Response response = uploadManager.put(bytes, key, token);
//處理結果
DefaultPutRet putRet = new Gson().fromJson(response.bodyString(), DefaultPutRet.class);
//文件在七牛云上面的下載鏈接
String qiniuUrl = domain + "/" + putRet.key;
5.任務中斷
實現(xiàn)思路:任務提交給線程池執(zhí)行后,返回Future對象,按照:導出記錄id->future對象的格式存入ExpiringMap中,中斷時使用future對象調用cancel(true)方法,并且捕捉future對象調用get()方法拋出的的CancellationException
private final static ExpiringMap<Integer, Future> threadMap = ExpiringMap.builder()
.expiration(2, TimeUnit.HOURS)
.expirationPolicy(ExpirationPolicy.CREATED)
.build();
future.cancel(true);
try {
future.get();
} catch (CancellationException e) {
exportRecordDAO.update(null,updateWrapper);
log.info("記錄id為{}的任務已經(jīng)被取消",recordId);
6.定時器
作用:定時將因宕機或者長時間導出未響應的導出記錄切換成異常狀態(tài)
在啟動類上貼上@EnableScheduling注解:
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
在定時任務上貼上@Scheduled注解:
@Scheduled(cron = " 0 0 0/2 * * ?")
public void executeScheduledTask(){
//TODO:將超過兩個小時還處于未開始,進行中狀態(tài)中的導出記錄切換成導出異常
}
7.系統(tǒng)限流
考慮到并發(fā)情況下導出大批量數(shù)據(jù),會造成服務器CPU負載過高,所以針對單用戶,同時導出的任務不能超多2個。實現(xiàn)方式有兩種:①到數(shù)據(jù)庫中統(tǒng)計當前用戶導出記錄處于進行中的數(shù)量;②將用戶的導出記錄處于進行中的數(shù)量存入redis中,從redis中獲取數(shù)據(jù)作判斷
實現(xiàn)功能
實測可以導出860萬的數(shù)據(jù),加上sql運行時間,總耗時2分鐘