FlinkSql之TableAPI詳解( 二 )

二、TableAPI讀取文件使用TableAPI讀取文件時,我們首先需要知道去哪里讀取也就是文件路徑、讀取文件的格式、讀取出來的數據的結構也就是結果表的表結構及表名
 package net.cyan.FlinkSql; ? import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ? import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo2_readWriteText {     public static void main(String[] args) {         //創建執行環境 //      Configuration configuration = new Configuration(); //      configuration.setInteger("rest.port", 3333);         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);         //創建查詢的數據結果封裝類型         Schema schema = new Schema()               .field("id", DataTypes.STRING())               .field("ts", DataTypes.BIGINT())               .field("vc", DataTypes.INT());         talEnv               .connect(new FileSystem().path("input/sensor.txt"))  //讀取文件路徑               .withFormat(new Csv()) //讀取文件的數據格式               .withSchema(schema) //讀取出來的數據格式               .createTemporaryTable("sensor");//定義結果表名 ?         //進行查詢         Table select = talEnv.from("sensor")               .where($("id").isEqual("sensor_1"))               .select($("id"), $("ts"), $("vc")); ? ?         //將查詢結果寫入到新文件中         //同樣建立一個動態表連接         talEnv               .connect(new FileSystem().path("input/b.txt"))  //寫入路徑               .withFormat(new Csv()) //寫入文件的數據格式               .withSchema(schema) //寫入的數據格式               .createTemporaryTable("abc");//定義寫入表名         //進行寫入操作 ?         select.executeInsert("abc"); ? //      try { //          //啟動執行環境 //          env.execute(); //      } catch (Exception e) { //          e.printStackTrace(); //      } ?   } }三、TableAPI 讀取、寫入Kakfa基本流程
1>需要創建表的運行環境
 //創建表的運行環境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);2>創建查詢出的數據寫出結構
 //創建表結構 Schema schema=new Schema()       .field("id",DataTypes.STRING())       .field("ts",DataTypes.BIGINT())       .field("vc",DataTypes.INT());3> 創建kafka連接
 //創建kafka連接 tabEnv.connect(         new Kafka()       .version("universal")// 版本號       .property("bootstrap.servers","hadoop102:9092")//地址       .property("group.id","cy")//消費者組       .topic("first")//消費主題 ?  )       .withFormat(new Json())//寫入的格式       .withSchema(schema)       .createTemporaryTable("a");//臨時表4> 進行查詢
 //創建表 Table select = tabEnv.from("a").select("*");5> 創建寫入kafka連接
 //創建寫入主題 tabEnv.connect(         new Kafka()               .version("universal")// 版本號               .property("bootstrap.servers","hadoop102:9092")//地址               .topic("first1")//消費主題               .sinkPartitionerRoundRobin()//隨機分區 ? )       .withFormat(new Json())//寫入的格式       .withSchema(schema)       .createTemporaryTable("c");

推薦閱讀