flinksql讀寫redis( 三 )

3.3、使用HGET作為維表查詢命令@Testpublic void testHGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/*** 這里測試的核心是維表有format=json配置項,有了format配置項后,字段個數不受限制,但是需要注意的是,作為hget命令的key的字段* 一定要放在表申明的第一位,field的字段一定要放在申明的第二位,并且hget命令的value的值使用format格式化后,比如是json格式,* 則json里一定要包含作為維表查詢的 join on后面帶的作為key和field的查詢列,不然會報空指針異常*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring," +"remarkstring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'format'='json', \n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring, \n" +"remarkstring" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_name,d.remarkfrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}4、redis作為sink表4.1、數據準備@Beforepublic void init() {RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"學生" + i + "\",\n" +"\"school\": \"學校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化學生數據*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班級數據*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");}/*** 初始化學校班級數據*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班");}}}4.2、使用LPush、RPUSH、SADD命令作為sink表寫入命令@Testpublic void testLPushSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";/***1、這里因為command是LPUSH , 所以不需要primary key(number) not enforced,因為這種命令只支持INSERT語義*2、并行度配置項sink.parallelism沒有配置,默認為核心數*/String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='sink_students_list',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='LPUSH'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_namefrom students s"+ " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}

推薦閱讀