1.異步IO
在與外部系統(tǒng)交互(用數(shù)據(jù)庫中的數(shù)據(jù)擴充流數(shù)據(jù))的時候,需要考慮與外部系統(tǒng)的通信延遲對整個流處理應(yīng)用的影響。
簡單地訪問外部數(shù)據(jù)庫的數(shù)據(jù),比如使用 MapFunction,通常意味著同步交互: MapFunction 向數(shù)據(jù)庫發(fā)送一個請求然后一直等待,直到收到響應(yīng)。在許多情況下,等待占據(jù)了函數(shù)運行的大部分時間。
與數(shù)據(jù)庫異步交互是指一個并行函數(shù)實例可以并發(fā)地處理多個請求和接收多個響應(yīng)。這樣,函數(shù)在等待的時間可以發(fā)送其他請求和接收其他響應(yīng)。至少等待的時間可以被多個請求攤分。大多數(shù)情況下,異步交互可以大幅度提高流處理的吞吐量
僅僅提高 MapFunction 的并行度(parallelism)在有些情況下也可以提升吞吐量,但是這樣做通常會導(dǎo)致非常高的資源消耗:更多的并行 MapFunction 實例意味著更多的 Task、更多的線程、更多的 Flink 內(nèi)部網(wǎng)絡(luò)連接、 更多的與數(shù)據(jù)庫的網(wǎng)絡(luò)連接、更多的緩沖和更多程序內(nèi)部協(xié)調(diào)的開銷。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/operators/asyncio.html

image.png
public class LogBean {
public String uid;
public Double longitude;
public Double latitude;
public String province;
public String city;
public LogBean(){}
public LogBean(String uid, Double longitude, Double latitude) {
this.uid = uid;
this.longitude = longitude;
this.latitude = latitude;
}
public static LogBean of(String uid, Double longitude, Double latitude) {
return new LogBean(uid, longitude, latitude);
}
@Override
public String toString() {
return "LogBean{" +
"uid='" + uid + '\'' +
", longitude=" + longitude +
", latitude=" + latitude +
", province='" + province + '\'' +
", city='" + city + '\'' +
'}';
}
}
public class AsyncHttpGeoQueryFunction extends RichAsyncFunction<String, LogBean> {
private transient CloseableHttpAsyncClient httpclient; //異步請求的HttpClient
private String url; //請求高德地圖URL地址
private String key; //請求高德地圖的秘鑰,注冊高德地圖開發(fā)者后獲得
private int maxConnTotal; //異步HTTPClient支持的最大連接
public AsyncHttpGeoQueryFunction(String url, String key, int maxConnTotal) {
this.url = url;
this.key = key;
this.maxConnTotal = maxConnTotal;
}
@Override
public void open(Configuration parameters) throws Exception {
RequestConfig requestConfig = RequestConfig.custom().build();
httpclient = HttpAsyncClients.custom() //創(chuàng)建HttpAsyncClients請求連接池
.setMaxConnTotal(maxConnTotal) //設(shè)置最大連接數(shù)
.setDefaultRequestConfig(requestConfig).build();
httpclient.start(); //啟動異步請求httpClient
}
@Override
public void asyncInvoke(String line, ResultFuture<LogBean> resultFuture) throws Exception {
//使用fastjson將json字符串解析成json對象
LogBean bean = JSON.parseObject(line, LogBean.class);
double longitude = bean.longitude; //獲取經(jīng)度
double latitude = bean.latitude; //獲取維度
//將經(jīng)緯度和高德地圖的key與請求的url進行拼接
HttpGet httpGet = new HttpGet(url + "?location=" + longitude + "," + latitude + "&key=" + key);
//發(fā)送異步請求,返回Future
Future<HttpResponse> future = httpclient.execute(httpGet, null);
CompletableFuture.supplyAsync(new Supplier<LogBean>() {
@Override
public LogBean get() {
try {
HttpResponse response = future.get();
String province = null;
String city = null;
if (response.getStatusLine().getStatusCode() == 200) {
//解析返回的結(jié)果,獲取省份、城市等信息
String result = EntityUtils.toString(response.getEntity());
JSONObject jsonObj = JSON.parseObject(result);
JSONObject regeocode = jsonObj.getJSONObject("regeocode");
if (regeocode != null && !regeocode.isEmpty()) {
JSONObject address = regeocode.getJSONObject("addressComponent");
province = address.getString("province");
city = address.getString("city");
}
}
bean.province = province; //將返回的結(jié)果給省份賦值
bean.city = city; //將返回的結(jié)果給城市賦值
return bean;
} catch (Exception e) {
return null;
}
}
}).thenAccept((LogBean result) -> {
//將結(jié)果添加到resultFuture中輸出(complete方法的參數(shù)只能為集合,如果只有一個元素,就返回一個單例集合)
resultFuture.complete(Collections.singleton(result));
});
}
@Override
public void close() throws Exception {
httpclient.close(); //關(guān)閉HttpAsyncClients請求連接池
}
}
public class AsyncQueryFromHttpDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
String url = "http://localhost:8080/api"; //異步IO發(fā)生REST請求的地址
int capacity = 20; //最大異步并發(fā)請求數(shù)量
//使用AsyncDataStream調(diào)用unorderedWait方法,并傳入異步請求的Function
DataStream<Tuple2<String, String>> result = AsyncDataStream.unorderedWait(
lines, //輸入的數(shù)據(jù)流
new HttpAsyncFunction(url, capacity), //異步查詢的Function實例
5000, //超時時間
TimeUnit.MILLISECONDS, //時間單位
capacity); //異步請求隊列最大的數(shù)量,不傳該參數(shù)默認值為100
result.print();
env.execute();
}
}
public class AsyncQueryFromHttpDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置job的重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
String url = "https://restapi.amap.com/v3/geocode/regeo"; //異步請求高德地圖的地址
String key = "4924f7ef5c86a278f5500851541cdcff"; //請求高德地圖的秘鑰,注冊高德地圖開發(fā)者后獲得
int capacity = 50; //最大異步并發(fā)請求數(shù)量
//使用AsyncDataStream調(diào)用unorderedWait方法,并傳入異步請求的Function
SingleOutputStreamOperator<LogBean> result = AsyncDataStream.unorderedWait(
lines, //輸入的數(shù)據(jù)流
new AsyncHttpGeoQueryFunction(url, key, capacity), //異步查詢的Function實例
3000, //超時時間
TimeUnit.MILLISECONDS, //時間單位
capacity);//異步請求隊列最大的數(shù)量,不傳該參數(shù)默認值為100
result.print();
env.execute();
}
}
public class AsyncQueryFromMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); //設(shè)置job的重啟策略
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
int capacity = 50;
DataStream<Tuple2<String, String>> result = AsyncDataStream.orderedWait(
lines, //輸入的數(shù)據(jù)流
new MySQLAsyncFunction(capacity), //異步查詢的Function實例
3000, //超時時間
TimeUnit.MILLISECONDS, //時間單位
capacity); //異步請求隊列最大的數(shù)量,不傳該參數(shù)默認值為100
result.print();
env.execute();
}
}
public class HttpAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
private transient CloseableHttpAsyncClient httpclient; //異步請求的HttpClient
private String url; //請求的URL地址
private int maxConnTotal; //異步HTTPClient支持的最大連接
public HttpAsyncFunction(String url, int maxConnTotal) {
this.url = url;
this.maxConnTotal = maxConnTotal;
}
@Override
public void open(Configuration parameters) throws Exception {
RequestConfig requestConfig = RequestConfig.custom().build();
httpclient = HttpAsyncClients.custom() //創(chuàng)建HttpAsyncClients請求連接池
.setMaxConnTotal(maxConnTotal) //設(shè)置最大連接數(shù)
.setDefaultRequestConfig(requestConfig).build();
httpclient.start(); //啟動異步請求httpClient
}
@Override
public void asyncInvoke(String uid, final ResultFuture<Tuple2<String, String>> resultFuture)
throws Exception {
HttpGet httpGet = new HttpGet(url + "/?uid=" + uid); //請求的地址和參數(shù)
Future<HttpResponse> future = httpclient.execute(httpGet, null); //執(zhí)行請求返回future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
HttpResponse response = future.get(); //調(diào)用Future的get方法獲取請求的結(jié)果
String res = null;
if(response.getStatusLine().getStatusCode() == 200) {
res = EntityUtils.toString(response.getEntity());
}
return res;
} catch (Exception e) {
return null;
}
}
}).thenAccept((String result) -> {
//將結(jié)果添加到resultFuture中輸出(complete方法的參數(shù)只能為集合,如果只有一個元素,就返回一個單例集合)
resultFuture.complete(Collections.singleton(Tuple2.of(uid, result)));
});
}
@Override
public void close() throws Exception {
httpclient.close(); //關(guān)閉HttpAsyncClients請求連接池
}
}
public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
private transient DruidDataSource dataSource; //使用alibaba的Druid數(shù)據(jù)庫連接池
private transient ExecutorService executorService; //用于提交多個異步請求的線程池
private int maxConnTotal; //線程池最大線程數(shù)量
public MySQLAsyncFunction(int maxConnTotal) {
this.maxConnTotal = maxConnTotal;
}
@Override
public void open(Configuration parameters) throws Exception {
executorService = Executors.newFixedThreadPool(maxConnTotal); //創(chuàng)建固定的大小的線程池
dataSource = new DruidDataSource(); //創(chuàng)建數(shù)據(jù)庫連接池并指定對應(yīng)的參數(shù)
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUsername("root");
dataSource.setPassword("123456");
dataSource.setUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8");
dataSource.setMaxActive(maxConnTotal);
}
@Override
public void close() throws Exception {
dataSource.close(); //關(guān)閉數(shù)據(jù)庫連接池
executorService.shutdown(); //關(guān)閉線程池
}
@Override
public void asyncInvoke(String id, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
//調(diào)用線程池的submit方法,將查詢請求丟入到線程池中異步執(zhí)行,返回Future對象
Future<String> future = executorService.submit(() -> {
return queryFromMySql(id); //查詢數(shù)據(jù)庫的方法
});
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return future.get(); //獲取查詢的結(jié)果
} catch (Exception e) {
return null;
}
}
}).thenAccept((String result) -> {
//將id和查詢的結(jié)果用Tuple2封裝,放入到ResultFuture中輸出
resultFuture.complete(Collections.singleton(Tuple2.of(id, result)));
});
}
private String queryFromMySql(String param) throws SQLException {
String sql = "SELECT id, info FROM t_data WHERE id = ?";
String result = null;
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
stmt.setString(1, param); //設(shè)置查詢參數(shù)
rs = stmt.executeQuery(); //執(zhí)行查詢
while (rs.next()) {
result = rs.getString("info"); //返回查詢結(jié)果
}
} finally {
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}
return result;
}
}