flinksql讀寫redis

0、前言最近有個需求,需要使用flinksql讀寫redis,由于官網上并沒有redis的connector,在網上找了很久,開源的幾個connector又沒法滿足要求,所有這里就自己動手實現了一個 。已經適配了各個版本的flink,從flink1.12到flink1.15 。
簡單介紹一下功能吧:

  • 將redis作為流表時支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua腳本封裝的批量彈出提高消費性能
  • 將redis作為維表時支持GET、HGET等命令;支持lookup緩存
  • 將redis作為sink表時支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl時間
  • 支持flink常見的序列化反序列化方式,如json、csv等,具體參見flink官網:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/
1、redis作為流表1.1、數據準備
@Beforepublic void init() {/**設置當前屬于測試模式,在這個測試模式下 , 當流表數據消費完成后程序會停止,方便測試,這個模式默認false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"學生" + i + "\",\n" +"\"school\": \"學校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化學生數據*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班級數據*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");}/*** 初始化學校班級數據*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班");}}}1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消費指定的key的list或者set的數據@Testpublic void testBlpopSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(sink);String sql =" insert into sink_students select * from students";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}2、redis作為維表(不帶format)2.1、數據準備@Beforepublic void init() {/**設置當前屬于測試模式 , 在這個測試模式下,當流表數據消費完成后程序會停止,方便測試 , 這個模式默認false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"學生" + i + "\",\n" +"\"school\": \"學校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化學生數據*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班級數據*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");}/*** 初始化學校班級數據*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班");}}}2.2、使用GET作為維表查詢命令@Testpublic void testGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/**這里需要注意的是,由于使用get命令,而且沒有加format屬性,所以維表只能有兩個字段,多了也識別不到,詳細可以看源碼里的注釋*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='GET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);/**這里join的字段必須是GET命令的key*/String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_namefrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}

推薦閱讀