在spark structuredStreaming中使用hive表并支持緩存

<meta charset="utf-8">

基于structuredStreaming的流任務(wù)中需要將hive小表(一般是維度表)參與到流計(jì)算的處理中,以下研究是證明spark讀取hive數(shù)據(jù)的及時(shí)性,即hive數(shù)據(jù)變動(dòng)后在sparksql中能正確讀取到最新的hive數(shù)據(jù),同時(shí)考慮性能,嘗試將hive表數(shù)據(jù)做緩存,并且維護(hù)緩存的更新。

1、不做緩存

代碼:

while (true) {
          sc.catalog().refreshTable("beian.analyze_ip1");
          Dataset<Row> dataset1 = sc.sql("select * from beian.analyze_ip1");
          //  dataset1.cache();
          LOG.error("總數(shù):" +  dataset1.toJavaRDD().collect().size());
          try {
              Thread.sleep(10000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }

操作步驟:

1、清空表beian.analyze_ip1

2、啟動(dòng)saprk程序;

3、當(dāng)“總數(shù):0”出現(xiàn)后在hue執(zhí)行寫(xiě)入表的sql,使表中有數(shù)據(jù);

結(jié)果:

1.png

注意:必須使用sc.catalog().refreshTable()方法刷新元數(shù)據(jù),因?yàn)閟park默認(rèn)在啟動(dòng)時(shí)緩存了hive的元數(shù)據(jù),如果在運(yùn)行時(shí)hive表的數(shù)據(jù)有了變動(dòng),比如增加了分區(qū),那么必須代碼顯示刷新獲取最新的元數(shù)據(jù)。

2、加緩存

在不加緩存情況下spark每次都要去讀取hive文件,流任務(wù)的速度遠(yuǎn)遠(yuǎn)大于hive文件讀取速度,因此需要加緩存,加了緩存后還需要定時(shí)維護(hù)緩存中的內(nèi)容。

代碼:

while (true) {
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    dataset1.unpersist(true);
    sc.catalog().refreshTable("beian.analyze_ip1");
    Dataset<Row> dataset2 = sc.sql("select * from beian.analyze_ip1");
    dataset1 = dataset2;
    dataset1.persist(StorageLevel.MEMORY_ONLY());
    LOG.error("總數(shù):" +  dataset1.toJavaRDD().collect().size());
}

注意:創(chuàng)建dataset2再賦值給dataset1是必須的; 上述while中的內(nèi)容可以寫(xiě)到定時(shí)器中定時(shí)執(zhí)行。

操作步驟同上

結(jié)果:

2.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容