FlinkSql之TableAPI詳解( 三 )

6> 寫入
 //寫入 select.executeInsert("c");完整代碼如下
 package net.cyan.FlinkSql; ? import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; ? public class Demo5_readWriteKafka {     public static void main(String[] args) {        //創建執行環境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //創建表的運行環境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         //創建表結構         Schema schema=new Schema()               .field("id",DataTypes.STRING())               .field("ts",DataTypes.BIGINT())               .field("vc",DataTypes.INT());         //創建kafka連接         tabEnv.connect(                 new Kafka()               .version("universal")// 版本號               .property("bootstrap.servers","hadoop102:9092")//地址               .property("group.id","cy")//消費者組               .topic("first")//消費主題 ?          )               .withFormat(new Json())//寫入的格式               .withSchema(schema)               .createTemporaryTable("a");         //創建表         Table select = tabEnv.from("a").select("*");         //創建寫入主題         tabEnv.connect(                 new Kafka()                       .version("universal")// 版本號                       .property("bootstrap.servers","hadoop102:9092")//地址                       .topic("first1")//消費主題                       .sinkPartitionerRoundRobin()//隨即分區 ?       )               .withFormat(new Json())//寫入的格式               .withSchema(schema)               .createTemporaryTable("c"); ?         //寫入         select.executeInsert("c"); ? ?   } } 【FlinkSql之TableAPI詳解】

推薦閱讀