使用dstream.foreachRDD發(fā)送數(shù)據到外部系統(tǒng)

使用dstream.foreachRDD發(fā)送數(shù)據到外部系統(tǒng)
經過Spark Streaming處理后的數(shù)據經常需要推到外部系統(tǒng),比如緩存、數(shù)據庫、消息系統(tǒng)、文件系統(tǒng)、實時數(shù)據大屏等

放一張官網上的圖:

圖片來自https://spark.apache.org/docs/latest/streaming-programming-guide.html#overview

其中,最常用的就是使用方法dstream.foreachRDD
看下最佳用法:

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});
  1. 循環(huán)每個分區(qū)
  2. 在每個分區(qū)中,從連接池獲取連接(數(shù)據庫/緩存等)
  3. 循環(huán)操作每條記錄,存儲或者發(fā)送數(shù)據
  4. 釋放連接

幾個常見的錯誤/低效用法

  1. dirver端創(chuàng)建連接, worker端使用連接(序列化/初始化錯誤等)
dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});
  1. 每條記錄創(chuàng)建一個連接(開銷太高)
dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

上面的代碼會在worker端創(chuàng)建連接并使用,但是每條記錄都會創(chuàng)建新的連接
當然可以使用連接池進行優(yōu)化,但是還有更好的方法

  1. 每個分區(qū)創(chuàng)建一個連接(可進一步使用連接池優(yōu)化)
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

上面的沒啥大問題了,使用連接池后就是最上面的最佳用法了

參考
Design Patterns for using foreachRDD

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容