flinksql讀寫redis( 四 )

4.2、使用SET命令作為sink表寫入命令@Testpublic void testSet() throws Exception {long start = System.currentTimeMillis();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";/***1、這里因為command是SET,所以需要一個key , 這里key就是使用主鍵,多個就用下劃線拼接起來,*2、并行度配置項sink.parallelism沒有配置,默認為核心數*/String sink ="CREATE TABLE sink_students\n" +"(\n" +"schoolstring, \n" +"numberBIGINT ,\n" +"namestring,\n" +"class_idBIGINT, \n" +"class_namestring, \n" +"primary key(school,number) not enforced" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='SET'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.school,s.number,s.name,s.class_id,d.class_namefrom students s"+ " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();long end = System.currentTimeMillis();System.out.println("耗時:" + (end - start) + "ms");}4.3、使用HSET命令作為sink表寫入命令(不指定key)@Testpublic void testHSet() throws Exception {long start = System.currentTimeMillis();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";/***1、這里因為command是HSET,所以需要一個key和一個field,這里是按照表申明的順序,第一個作為key , *第二個作為field,由于需要更新,也需要一個主鍵,這里最好把前兩個字段一起作為主鍵*2、作為sink有一個sink.key.ttl參數可以設置key保存在redis的ttl生存時間 , 單位秒,默認為-1表示長期保存*/String sink ="CREATE TABLE sink_students\n" +"(\n" +"schoolstring, \n" +"numberBIGINT ,\n" +"namestring,\n" +"class_idBIGINT, \n" +"class_namestring, \n" +"primary key(school,number) not enforced" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'sink.parallelism' = '16',\n" +"'sink.key.ttl' = '300',\n" +"'command'='HSET'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.school,s.number,s.name,s.class_id,d.class_namefrom students s"+ " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();long end = System.currentTimeMillis();System.out.println("耗時:" + (end - start) + "ms");}

推薦閱讀