<meta charset="utf-8">
基于structuredStreaming的流任務(wù)中需要將hive小表(一般是維度表)參與到流計(jì)算的處理中,以下研究是證明spark讀取hive數(shù)據(jù)的及時(shí)性,即hive數(shù)據(jù)變動(dòng)后在sparksql中能正確讀取到最新的hive數(shù)據(jù),同時(shí)考慮性能,嘗試將hive表數(shù)據(jù)做緩存,并且維護(hù)緩存的更新。
1、不做緩存
代碼:
while (true) {
sc.catalog().refreshTable("beian.analyze_ip1");
Dataset<Row> dataset1 = sc.sql("select * from beian.analyze_ip1");
// dataset1.cache();
LOG.error("總數(shù):" + dataset1.toJavaRDD().collect().size());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
操作步驟:
1、清空表beian.analyze_ip1
2、啟動(dòng)saprk程序;
3、當(dāng)“總數(shù):0”出現(xiàn)后在hue執(zhí)行寫(xiě)入表的sql,使表中有數(shù)據(jù);
結(jié)果:

注意:必須使用sc.catalog().refreshTable()方法刷新元數(shù)據(jù),因?yàn)閟park默認(rèn)在啟動(dòng)時(shí)緩存了hive的元數(shù)據(jù),如果在運(yùn)行時(shí)hive表的數(shù)據(jù)有了變動(dòng),比如增加了分區(qū),那么必須代碼顯示刷新獲取最新的元數(shù)據(jù)。
2、加緩存
在不加緩存情況下spark每次都要去讀取hive文件,流任務(wù)的速度遠(yuǎn)遠(yuǎn)大于hive文件讀取速度,因此需要加緩存,加了緩存后還需要定時(shí)維護(hù)緩存中的內(nèi)容。
代碼:
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
dataset1.unpersist(true);
sc.catalog().refreshTable("beian.analyze_ip1");
Dataset<Row> dataset2 = sc.sql("select * from beian.analyze_ip1");
dataset1 = dataset2;
dataset1.persist(StorageLevel.MEMORY_ONLY());
LOG.error("總數(shù):" + dataset1.toJavaRDD().collect().size());
}
注意:創(chuàng)建dataset2再賦值給dataset1是必須的; 上述while中的內(nèi)容可以寫(xiě)到定時(shí)器中定時(shí)執(zhí)行。
操作步驟同上
結(jié)果:
