2021-01-24-Flink-29(Flink 異步IO)

1.異步IO

在與外部系統(tǒng)交互(用數(shù)據(jù)庫中的數(shù)據(jù)擴充流數(shù)據(jù))的時候,需要考慮與外部系統(tǒng)的通信延遲對整個流處理應(yīng)用的影響。
簡單地訪問外部數(shù)據(jù)庫的數(shù)據(jù),比如使用 MapFunction,通常意味著同步交互: MapFunction 向數(shù)據(jù)庫發(fā)送一個請求然后一直等待,直到收到響應(yīng)。在許多情況下,等待占據(jù)了函數(shù)運行的大部分時間。
與數(shù)據(jù)庫異步交互是指一個并行函數(shù)實例可以并發(fā)地處理多個請求和接收多個響應(yīng)。這樣,函數(shù)在等待的時間可以發(fā)送其他請求和接收其他響應(yīng)。至少等待的時間可以被多個請求攤分。大多數(shù)情況下,異步交互可以大幅度提高流處理的吞吐量

僅僅提高 MapFunction 的并行度(parallelism)在有些情況下也可以提升吞吐量,但是這樣做通常會導(dǎo)致非常高的資源消耗:更多的并行 MapFunction 實例意味著更多的 Task、更多的線程、更多的 Flink 內(nèi)部網(wǎng)絡(luò)連接、 更多的與數(shù)據(jù)庫的網(wǎng)絡(luò)連接、更多的緩沖和更多程序內(nèi)部協(xié)調(diào)的開銷。

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/operators/asyncio.html

image.png
public class LogBean {

    public String uid;

    public Double longitude;

    public Double latitude;

    public String province;

    public String city;

    public LogBean(){}

    public LogBean(String uid, Double longitude, Double latitude) {
        this.uid = uid;
        this.longitude = longitude;
        this.latitude = latitude;
    }

    public static LogBean of(String uid, Double longitude, Double latitude) {
        return new LogBean(uid, longitude, latitude);
    }

    @Override
    public String toString() {
        return "LogBean{" +
                "uid='" + uid + '\'' +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                ", province='" + province + '\'' +
                ", city='" + city + '\'' +
                '}';
    }
}
public class AsyncHttpGeoQueryFunction extends RichAsyncFunction<String, LogBean> {
    private transient CloseableHttpAsyncClient httpclient; //異步請求的HttpClient
    private String url; //請求高德地圖URL地址
    private String key; //請求高德地圖的秘鑰,注冊高德地圖開發(fā)者后獲得
    private int maxConnTotal; //異步HTTPClient支持的最大連接
    public AsyncHttpGeoQueryFunction(String url, String key, int maxConnTotal) {
        this.url = url;
        this.key = key;
        this.maxConnTotal = maxConnTotal;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        RequestConfig requestConfig = RequestConfig.custom().build();
        httpclient = HttpAsyncClients.custom() //創(chuàng)建HttpAsyncClients請求連接池
                .setMaxConnTotal(maxConnTotal) //設(shè)置最大連接數(shù)
                .setDefaultRequestConfig(requestConfig).build();
        httpclient.start(); //啟動異步請求httpClient
    }

    @Override
    public void asyncInvoke(String line, ResultFuture<LogBean> resultFuture) throws Exception {
        //使用fastjson將json字符串解析成json對象
        LogBean bean = JSON.parseObject(line, LogBean.class);
        double longitude = bean.longitude; //獲取經(jīng)度
        double latitude = bean.latitude; //獲取維度
        //將經(jīng)緯度和高德地圖的key與請求的url進行拼接
        HttpGet httpGet = new HttpGet(url + "?location=" + longitude + "," + latitude + "&key=" + key);
        //發(fā)送異步請求,返回Future
        Future<HttpResponse> future = httpclient.execute(httpGet, null);
        CompletableFuture.supplyAsync(new Supplier<LogBean>() {
            @Override
            public LogBean get() {
                try {
                    HttpResponse response = future.get();
                    String province = null;
                    String city = null;
                    if (response.getStatusLine().getStatusCode() == 200) {
                        //解析返回的結(jié)果,獲取省份、城市等信息
                        String result = EntityUtils.toString(response.getEntity());
                        JSONObject jsonObj = JSON.parseObject(result);
                        JSONObject regeocode = jsonObj.getJSONObject("regeocode");
                        if (regeocode != null && !regeocode.isEmpty()) {
                            JSONObject address = regeocode.getJSONObject("addressComponent");
                            province = address.getString("province");
                            city = address.getString("city");
                        }
                    }
                    bean.province = province; //將返回的結(jié)果給省份賦值
                    bean.city = city; //將返回的結(jié)果給城市賦值
                    return bean;
                } catch (Exception e) {
                    return null;
                }
            }
        }).thenAccept((LogBean result) -> {
            //將結(jié)果添加到resultFuture中輸出(complete方法的參數(shù)只能為集合,如果只有一個元素,就返回一個單例集合)
            resultFuture.complete(Collections.singleton(result));
        });
    }
    @Override
    public void close() throws Exception {
        httpclient.close(); //關(guān)閉HttpAsyncClients請求連接池
    }
}

public class AsyncQueryFromHttpDemo1 {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        String url = "http://localhost:8080/api"; //異步IO發(fā)生REST請求的地址
        int capacity = 20; //最大異步并發(fā)請求數(shù)量
        //使用AsyncDataStream調(diào)用unorderedWait方法,并傳入異步請求的Function
        DataStream<Tuple2<String, String>> result = AsyncDataStream.unorderedWait(
                lines, //輸入的數(shù)據(jù)流
                new HttpAsyncFunction(url, capacity), //異步查詢的Function實例
                5000, //超時時間
                TimeUnit.MILLISECONDS, //時間單位
                capacity); //異步請求隊列最大的數(shù)量,不傳該參數(shù)默認值為100
        result.print();
        env.execute();
    }
}

public class AsyncQueryFromHttpDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置job的重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        String url = "https://restapi.amap.com/v3/geocode/regeo"; //異步請求高德地圖的地址
        String key = "4924f7ef5c86a278f5500851541cdcff"; //請求高德地圖的秘鑰,注冊高德地圖開發(fā)者后獲得
        int capacity = 50; //最大異步并發(fā)請求數(shù)量
        //使用AsyncDataStream調(diào)用unorderedWait方法,并傳入異步請求的Function
        SingleOutputStreamOperator<LogBean> result = AsyncDataStream.unorderedWait(
                lines, //輸入的數(shù)據(jù)流
                new AsyncHttpGeoQueryFunction(url, key, capacity), //異步查詢的Function實例
                3000, //超時時間
                TimeUnit.MILLISECONDS, //時間單位
                capacity);//異步請求隊列最大的數(shù)量,不傳該參數(shù)默認值為100
        result.print();
        env.execute();
    }
}
public class AsyncQueryFromMySQL {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); //設(shè)置job的重啟策略
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        int capacity = 50;
        DataStream<Tuple2<String, String>> result = AsyncDataStream.orderedWait(
                lines, //輸入的數(shù)據(jù)流
                new MySQLAsyncFunction(capacity), //異步查詢的Function實例
                3000, //超時時間
                TimeUnit.MILLISECONDS, //時間單位
                capacity); //異步請求隊列最大的數(shù)量,不傳該參數(shù)默認值為100
        result.print();
        env.execute();

    }
}
public class HttpAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
    private transient CloseableHttpAsyncClient httpclient; //異步請求的HttpClient
    private String url; //請求的URL地址
    private int maxConnTotal; //異步HTTPClient支持的最大連接
    public HttpAsyncFunction(String url, int maxConnTotal) {
        this.url = url;
        this.maxConnTotal = maxConnTotal;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        RequestConfig requestConfig = RequestConfig.custom().build();
        httpclient = HttpAsyncClients.custom() //創(chuàng)建HttpAsyncClients請求連接池
                .setMaxConnTotal(maxConnTotal) //設(shè)置最大連接數(shù)
                .setDefaultRequestConfig(requestConfig).build();
        httpclient.start(); //啟動異步請求httpClient
    }
    @Override
    public void asyncInvoke(String uid, final ResultFuture<Tuple2<String, String>> resultFuture)
            throws Exception {
        HttpGet httpGet = new HttpGet(url + "/?uid=" + uid); //請求的地址和參數(shù)
        Future<HttpResponse> future = httpclient.execute(httpGet, null); //執(zhí)行請求返回future
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    HttpResponse response = future.get(); //調(diào)用Future的get方法獲取請求的結(jié)果
                    String res = null;
                    if(response.getStatusLine().getStatusCode() == 200) {
                        res = EntityUtils.toString(response.getEntity());
                    }
                    return res;
                } catch (Exception e) {
                    return null;
                }
            }
        }).thenAccept((String result) -> {
            //將結(jié)果添加到resultFuture中輸出(complete方法的參數(shù)只能為集合,如果只有一個元素,就返回一個單例集合)
            resultFuture.complete(Collections.singleton(Tuple2.of(uid, result)));
        });
    }
    @Override
    public void close() throws Exception {
        httpclient.close(); //關(guān)閉HttpAsyncClients請求連接池
    }
}
public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
    private transient DruidDataSource dataSource; //使用alibaba的Druid數(shù)據(jù)庫連接池
    private transient ExecutorService executorService; //用于提交多個異步請求的線程池
    private int maxConnTotal; //線程池最大線程數(shù)量
    public MySQLAsyncFunction(int maxConnTotal) {
        this.maxConnTotal = maxConnTotal;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        executorService = Executors.newFixedThreadPool(maxConnTotal); //創(chuàng)建固定的大小的線程池
        dataSource = new DruidDataSource(); //創(chuàng)建數(shù)據(jù)庫連接池并指定對應(yīng)的參數(shù)
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        dataSource.setUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8");
        dataSource.setMaxActive(maxConnTotal);
    }
    @Override
    public void close() throws Exception {
        dataSource.close(); //關(guān)閉數(shù)據(jù)庫連接池
        executorService.shutdown(); //關(guān)閉線程池
    }
    @Override
    public void asyncInvoke(String id, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
        //調(diào)用線程池的submit方法,將查詢請求丟入到線程池中異步執(zhí)行,返回Future對象
        Future<String> future = executorService.submit(() -> {
            return queryFromMySql(id); //查詢數(shù)據(jù)庫的方法
        });
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return future.get(); //獲取查詢的結(jié)果
                } catch (Exception e) {
                    return null;
                }
            }
        }).thenAccept((String result) -> {
            //將id和查詢的結(jié)果用Tuple2封裝,放入到ResultFuture中輸出
            resultFuture.complete(Collections.singleton(Tuple2.of(id, result)));
        });
    }

    private String queryFromMySql(String param) throws SQLException {
        String sql = "SELECT id, info FROM t_data WHERE id = ?";
        String result = null;
        Connection connection = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        try {
            connection = dataSource.getConnection();
            stmt = connection.prepareStatement(sql);
            stmt.setString(1, param); //設(shè)置查詢參數(shù)
            rs = stmt.executeQuery(); //執(zhí)行查詢
            while (rs.next()) {
                result = rs.getString("info"); //返回查詢結(jié)果
            }
        } finally {
            if (rs != null) {
                rs.close();
            }
            if (stmt != null) {
                stmt.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
        return result;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 如官網(wǎng)所描述的Flink支持兩種方式實現(xiàn)異步IO查詢外部系統(tǒng)https://ci.apache.org/proje...
    〇白衣卿相〇閱讀 1,213評論 0 1
  • 1.為什么需要異步IO flink在做實時處理時,有時候需要和外部數(shù)據(jù)交互,但是通常情況下這個交互過程是同步的,這...
    k_wzzc閱讀 1,480評論 0 0
  • 1 概述 流計算系統(tǒng)中經(jīng)常需要與外部系統(tǒng)進行交互,我們通常的做法如向數(shù)據(jù)庫發(fā)送用戶a的查詢請求,然后等待結(jié)果返回,...
    薛定諤的貓Plus閱讀 4,861評論 0 5
  • 本頁闡述了使用Flink的API來進行外部數(shù)據(jù)存儲的異步I/O,對于不熟悉異步或者事件驅(qū)動編程的用戶,一篇關(guān)于Fu...
    寫B(tài)ug的張小天閱讀 6,616評論 1 2
  • 維表JOIN-繞不過去的業(yè)務(wù)場景 在Flink 流處理過程中,經(jīng)常需要和外部系統(tǒng)進行交互,用維度表補全事實表中的字...
    王知無閱讀 952評論 0 4

友情鏈接更多精彩內(nèi)容