FlinkSql之TableAPI詳解

一、FlinkSql的概念核心概念Flink 的 Table API 和 SQL 是流批統一的 API 。這意味著 Table API & SQL 在無論有限的批式輸入還是無限的流式輸入下,都具有相同的語義 。因為傳統的關系代數以及 SQL 最開始都是為了批式處理而設計的 ,  關系型查詢在流式場景下不如在批式場景下容易理解.
動態表和連續查詢動態表(Dynamic Tables) 是 Flink 的支持流數據的 Table API 和 SQL 的核心概念 。
與表示批處理數據的靜態表不同,動態表是隨時間變化的 ??梢韵癫樵冹o態批處理表一樣查詢它們 。查詢動態表將生成一個連續查詢(Continuous Query) 。一個連續查詢永遠不會終止,結果會生成一個動態表 。查詢不斷更新其(動態)結果表,以反映其(動態)輸入表上的更改 。
TableAPI
首先需要導入依賴
 <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>     <version>${flink.version}</version>     <scope>provided</scope> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>     <version>${flink.version}</version>     <scope>provided</scope> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-csv</artifactId>     <version>${flink.version}</version> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-json</artifactId>     <version>${flink.version}</version> </dependency> ? <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --> <dependency>     <groupId>org.apache.commons</groupId>     <artifactId>commons-compress</artifactId>     <version>1.21</version> </dependency> /**  * 使用TableAPI的基本套路:  * 1.創建表的執行環境  * 2.創建表,將流轉換為動態表,表的字段名從bean的屬性名自動抽取  * 3.對動態表進行查詢  * 4.把動態表轉換為流  */這里需要注意的問題:
1.TableAPI 中將動態表轉換為流時有兩種方法
 DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);toAppendStream方法只能在查詢時使用,不能使用包含聚合函數等更新語句
 DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);toRetractStream則可以使用
2.上述兩種方法內傳入的參數Row.class,表示將表中查詢出的數據封裝為行類型,也就是對每行進行封裝,解決查詢出的數據列少于或者多于原表 。如何能夠確保所查詢的數據與之前封裝的Bean有完全一致的結構則也可以封裝為原Bean.class
代碼實現:
 package net.cyan.FlinkSql; ? import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; ? import static org.apache.flink.table.api.Expressions.$; ? /**  * 使用TableAPI的基本套路:  * 1.創建表的執行環境  * 2.創建表,將流轉換為動態表,表的字段名從bean的屬性名自動抽取  * 3.對動態表進行查詢  * 4.把動態表轉換為流  */ public class Demo1 {     public static void main(String[] args) {         Configuration configuration=new Configuration();         configuration.setInteger("rest.port",3333);         //創建執行環境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);         env.setParallelism(1);         //模擬數據         DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements(                 new WaterSensor("S1", 1000L, 10),                 new WaterSensor("S1", 1000L, 10),                 new WaterSensor("S2", 2000L, 20),                 new WaterSensor("S3", 3000L, 30),                 new WaterSensor("S4", 4000L, 40),                 new WaterSensor("S5", 5000L, 50)       );         //創建表的執行環境         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);         //創建表 , 將流轉換為動態表,表的字段名從bean的屬性名自動抽取         Table table = tableEnvironment.fromDataStream(WaterSensorSource);         //對表進行查詢         //1、過時的查詢書寫         Table result = table               .where("id='S1'")               .select("*");         //2、不過時的書寫         Table result1 = table //              .where($("id").isEqual("S1"))               .select($("id"), $("ts"), $("vc"));         //3.聚合函數         Table select = table               .groupBy($("id"))               .aggregate($("vc").sum().as("sum_vc"))               .select($("id"), $("sum_vc"));         //把動態表轉換為流 , 使用到了之前創建的表運行環境 ?         SingleOutputStreamOperator<Row> tuple2DataStream = tableEnvironment               .toRetractStream(select, Row.class)               .filter(t -> t.f0)               .map(t -> t.f1); //      DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class); //      DataStream<Row> rowDataStream1 = tableEnvironment.toAppendStream(result1, Row.class); //      rowDataStream.print(); //      rowDataStream1.print();         tuple2DataStream.print(); ? ?         try {             //啟動執行環境             env.execute();       } catch (Exception e) {             e.printStackTrace();       } ? ? ?   } }

推薦閱讀