本篇將演示如何使用 Flink SQL 實(shí)現(xiàn)上一篇demo5的功能,上一篇傳送門 Apache Flink 學(xué)習(xí)筆記(三)
Flink SQl 是無限接近關(guān)系型數(shù)據(jù)庫sql語句的抽象模塊,SQL和Table API查詢可以無縫混合,SQL查詢是使用sqlQuery()方法指定的TableEnvironment,該方法返回SQL查詢的結(jié)果為Table。
直接上代碼demo6
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.util.Date;
/**
* Flink SQL
*/
public class Demo6 {
private static final String APP_NAME = "app_name";
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //設(shè)置窗口的時(shí)間單位為process time
env.setParallelism(1);//全局并發(fā)數(shù)
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
//設(shè)置topic和 app name
//FlinkKafkaManager 源碼見筆記二
FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", APP_NAME, properties);
FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
consumer.setStartFromLatest();
//獲取DataStream,并轉(zhuǎn)成Bean3
DataStream<Bean3> stream = env.addSource(consumer).map(new FlatMap());
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
tableEnvironment.registerDataStream("common", stream, "timestamp,appId,module,tt.proctime");//注冊表名
//module 必須加``,否則報(bào)錯(cuò)
String sql = "SELECT appId, COUNT(`module`) AS totals FROM common WHERE appId = '100007336' OR appId = '100013668' GROUP BY TUMBLE(tt, INTERVAL '10' SECOND),appId";
Table query = tableEnvironment.sqlQuery(sql);
DataStream<Row> result = tableEnvironment.toAppendStream(query, Row.class);
result.process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(String.format("AppId:%s, Module Count:%s", value.getField(0).toString(), value.getField(1).toString()));
}
});
try {
env.execute(APP_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class FlatMap implements MapFunction<JSONObject, Bean3> {
@Override
public Bean3 map(JSONObject jsonObject) throws Exception {
return new Bean3(new Date().getTime(), jsonObject.getString("appId"), jsonObject.getString("module"));
}
}
}
除了StreamTableEnvironment處理不一樣,其余代碼幾乎沒有改變,這里需要注意的是,Table API中的window函數(shù)在SQL里表現(xiàn)為TUMBLE(tt, INTERVAL '10' SECOND)(針對滾動窗口)
更多細(xì)節(jié)部分,參考官網(wǎng)文檔即可(我自己也沒怎么看,特殊需求特殊對待,哈哈)。