FlinkSQL之Windowing TVF( 二 )

 package net.cyan.FlinkSql.TVF; ? import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; ? import java.time.Duration; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo2_Window_TVF_Hop {     public static void main(String[] args) {         //創建執行環境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //創建表的運行環境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         env.setParallelism(1);         DataStream<WaterSensor> waterSensorStream =                 env.fromElements(                         new WaterSensor("sensor_1", 1000L, 10),                         new WaterSensor("sensor_1", 2000L, 20),                         new WaterSensor("sensor_2", 3000L, 30),                         new WaterSensor("sensor_1", 4000L, 40),                         new WaterSensor("sensor_1", 5000L, 50),                         new WaterSensor("sensor_2", 6000L, 60))                       .assignTimestampsAndWatermarks(                                 WatermarkStrategy                                       .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))                                       .withTimestampAssigner((ws, ts) -> ws.getTs()) ?                       );         //創建table         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());         //創建表         tabEnv.createTemporaryView("sensor",table);         //執行sql         //TVF 中的hop滾動窗口         //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作為一張表存在         //first interval :滑動步長,second interval :窗口長度         //特別注意?。。。?nbsp;        // 1.TVF 中滑動窗口的滑動步長與窗口長度必須是整數倍的關系,不然會報錯         // 例如:滑動步長為2,窗口長度就不能為5,可以為6         // 2.如果在sql中使用了hop窗口,則一定需要group by,而且group by后一定有window_start,window_end兩個字段         tabEnv.sqlQuery("select" +                 "window_start,window_end,id," +                 "sum(vc) sum_vc" +                 " from table (hop(table sensor,descriptor(et),interval '2' second,interval '6' second ))" +                 " group by window_start,window_end,id ")               .execute()               .print(); ? ? ?   } }sql實現TVF的累計窗口
累計窗口的應用:
需求:每天每隔一個小時統計一次當天的pv(瀏覽量)
流的方式如何解決:
1、用滾動窗口 ,  窗口長度設為1h
2、每天的第一個窗口清除狀態 , 后面的不清 , 進行狀態的累加
或者
用滾動窗口,長度設置為2day
自定義觸發器,每隔1小時對窗內的元素計算一次 , 不關閉窗口
sql的方式如何解決?
直接使用累計窗口cumulate
 //TVF 中的cumulate累計窗口 //cumulate(table tableName,descriptor(timecol),step,size):作為一張表存在 //tableName:表名 //timecol:時間屬性字段 //step:累計步長 , 跟滑動步長類似 //size:窗口長度 //特別注意?。。。?nbsp;//1.累計窗口的步長與窗口長度同樣是需要整數倍關系 // 2.如果在sql中使用了cumulate窗口,則一定需要group by,而且group by后一定有window_start,window_end兩個字段

推薦閱讀