一、在需要使用流式查詢(xún)的mapper文件中,定義流式查詢(xún)方法
package com.unionpay.dao.db2;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.unionpay.entity.TblMallOrder;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.mapping.ResultSetType;
import org.apache.ibatis.session.ResultHandler;
/**
* (TblMallOrder)表數(shù)據(jù)庫(kù)訪問(wèn)層
*
* @author liudong
* @since 2020-09-15 17:07:13
*/
@Mapper
public interface TblMallOrderDao extends BaseMapper<TblMallOrder> {
@Select("${sql}")
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 1000)
@ResultType(TblMallOrder.class)
void dynamicSelectLargeData1(@Param("sql") String sql, ResultHandler<TblMallOrder> handler);
@Select("${sql}")
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 1000)
@ResultType(Map.class)
void dynamicSelectLargeData2(@Param("sql") String sql, ResultHandler<Map> handler);
}
二、使用示例
@RestController
public class TestSearchLargeData {
// 這是每批處理的大小
private final static int BATCH_SIZE = 1000;
private int size;
// 存儲(chǔ)每批數(shù)據(jù)的臨時(shí)容器
private List<TblMallOrder> mallOrders;
@Autowired
private TblMallOrderDao tblMallOrderDao;
@GetMapping("/getLargeData1")
public void getLargeData1() {
String sql = "select * from t_mall_order";
tblMallOrderDao.dynamicSelectLargeData1(sql, new ResultHandler<TblMallOrder>() {
@Override
public void handleResult(ResultContext<? extends TblMallOrder> resultContext) {
TblMallOrder tblMallOrder = resultContext.getResultObject();
System.out.println(tblMallOrder);
}
});
}
@GetMapping("/getLargeData2")
public void getLargeData2() {
String sql = "select * from t_mall_order";
tblMallOrderDao.dynamicSelectLargeData1(sql, new ResultHandler<TblMallOrder>() {
@Override
public void handleResult(ResultContext<? extends TblMallOrder> resultContext) {
TblMallOrder tblMallOrder = resultContext.getResultObject();
System.out.println(tblMallOrder);
// 你可以看自己的項(xiàng)目需要分批進(jìn)行處理或者單個(gè)處理,這里以分批處理為例
mallOrders.add(tblMallOrder);
size++;
if (size == BATCH_SIZE) {
handle();
}
}
});
//用來(lái)完成最后一批數(shù)據(jù)處理
handle();
}
/**
* 數(shù)據(jù)處理
*/
private void handle(){
try{
// 在這里可以對(duì)你獲取到的批量結(jié)果數(shù)據(jù)進(jìn)行需要的業(yè)務(wù)處理
}catch (Exception e){
e.printStackTrace();
}finally {
// 處理完每批數(shù)據(jù)后后將臨時(shí)清空
size = 0;
mallOrders.clear();
}
}
}
三、總結(jié)
- Oracle和DB2,當(dāng)我們執(zhí)行一個(gè)SQL查詢(xún)語(yǔ)句的時(shí)候,需要在客戶(hù)端和服務(wù)器端都打開(kāi)一個(gè)游標(biāo),并且分別申請(qǐng)一塊內(nèi)存空間,作為存放查詢(xún)的數(shù)據(jù)的一個(gè)緩沖區(qū)。這塊內(nèi)存區(qū),存放多少條數(shù)據(jù)就由fetchsize來(lái)決定,同時(shí)每次網(wǎng)絡(luò)包會(huì)傳送fetchsize條記錄到客戶(hù)端。應(yīng)該很容易理解,如果fetchsize設(shè)置為20,當(dāng)我們從服務(wù)器端查詢(xún)數(shù)據(jù)往客戶(hù)端傳送時(shí),每次可以傳送20條數(shù)據(jù),但是兩端分別需要20條數(shù)據(jù)的內(nèi)存空閑來(lái)保存這些數(shù)據(jù)。fetchsize決定了每批次可以傳輸?shù)挠涗洍l數(shù),但同時(shí),也決定了內(nèi)存的大小。這塊內(nèi)存,在oracle服務(wù)器端是動(dòng)態(tài)分配的。而在客戶(hù)端,PS對(duì)象會(huì)存在一個(gè)緩沖中(LRU鏈表),也就是說(shuō),這塊內(nèi)存是事先配好的,應(yīng)用端內(nèi)存的分配在conn.prepareStatement(sql)或都conn.CreateStatement(sql)的時(shí)候完成。
- MySQL本身并沒(méi)有FetchSize方法, 它是使用CS阻塞的方式,通過(guò)網(wǎng)絡(luò)流控制服務(wù)端,MySQL在執(zhí)行ResultSet.next()方法時(shí),會(huì)通過(guò)數(shù)據(jù)庫(kù)連接一條一條的返回。MySQL按照自己的節(jié)奏不斷的把buffer寫(xiě)回網(wǎng)絡(luò)中。flush buffer的過(guò)程是阻塞式的,也就是說(shuō)如果網(wǎng)絡(luò)中發(fā)生了擁塞,send buffer被填滿(mǎn),會(huì)導(dǎo)致buffer一直flush不出去,那MySQL的處理線程會(huì)阻塞,從而避免數(shù)據(jù)把客戶(hù)端內(nèi)存撐爆。這樣帶來(lái)的問(wèn)題:如果使用了流式查詢(xún),一個(gè)MySQL數(shù)據(jù)庫(kù)連接同一時(shí)間只能為一個(gè)ResultSet對(duì)象服務(wù),并且如果該ResultSet對(duì)象沒(méi)有關(guān)閉,勢(shì)必會(huì)影響其他查詢(xún)對(duì)數(shù)據(jù)庫(kù)連接的使用!此為大坑,所以sharding-sphere費(fèi)勁心思要提供兩種數(shù)據(jù)庫(kù)連接模式,如果應(yīng)用對(duì)數(shù)據(jù)庫(kù)連接的消耗要求嚴(yán)苛,那么流式查詢(xún)就不再適合。
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。