使用dstream.foreachRDD發(fā)送數(shù)據到外部系統(tǒng)
經過Spark Streaming處理后的數(shù)據經常需要推到外部系統(tǒng),比如緩存、數(shù)據庫、消息系統(tǒng)、文件系統(tǒng)、實時數(shù)據大屏等
放一張官網上的圖:

圖片來自https://spark.apache.org/docs/latest/streaming-programming-guide.html#overview
其中,最常用的就是使用方法dstream.foreachRDD
看下最佳用法:
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
});
});
- 循環(huán)每個分區(qū)
- 在每個分區(qū)中,從連接池獲取連接(數(shù)據庫/緩存等)
- 循環(huán)操作每條記錄,存儲或者發(fā)送數(shù)據
- 釋放連接
幾個常見的錯誤/低效用法
- dirver端創(chuàng)建連接, worker端使用連接(序列化/初始化錯誤等)
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(record -> {
connection.send(record); // executed at the worker
});
});
- 每條記錄創(chuàng)建一個連接(開銷太高)
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
上面的代碼會在worker端創(chuàng)建連接并使用,但是每條記錄都會創(chuàng)建新的連接
當然可以使用連接池進行優(yōu)化,但是還有更好的方法
- 每個分區(qū)創(chuàng)建一個連接(可進一步使用連接池優(yōu)化)
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
上面的沒啥大問題了,使用連接池后就是最上面的最佳用法了