首页 > 数据库 >flinksql读写redis

flinksql读写redis

时间:2022-10-10 22:45:19浏览次数:89  
标签:string flinksql redis class sink id 读写 name

0、前言

  最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个。已经适配了各个版本的flink,从flink1.12到flink1.15。

  简单介绍一下功能吧:

  • 将redis作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua脚本封装的批量弹出提高消费性能
  • 将redis作为维表时支持GET、HGET等命令;支持lookup缓存
  • 将redis作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl时间
  • 支持flink常见的序列化反序列化方式,如json、csv等,具体参见flink官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/

1、redis作为流表

1.1、数据准备

  

    @Before
    public void init() {
        /**
            设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false
        */
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"学生" + i + "\",\n" +
                    "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化学生数据
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班级数据
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
        }

        /**
         * 初始化学校班级数据
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
            }
        }
    }

1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消费指定的key的list或者set的数据

    @Test
    public void testBlpopSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";



        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students select * from students";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

2、redis作为维表(不带format)

2.1、数据准备

    @Before
    public void init() {
        /**
            设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false
        */
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"学生" + i + "\",\n" +
                    "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化学生数据
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班级数据
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
        }

        /**
         * 初始化学校班级数据
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
            }
        }
    }

2.2、使用GET作为维表查询命令

    @Test
    public void testGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        /**
            这里需要注意的是,由于使用get命令,而且没有加format属性,所以维表只能有两个字段,多了也识别不到,
            详细可以看源码里的注释
        */
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='GET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);
        /**
            这里join的字段必须是GET命令的key
        */
        String sql =
                " insert into sink_students "
                + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

2.3、使用HGET作为维表查询命令

    @Test
    public void testHGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";
        /**
            这里需要注意的是,由于使用hget命令,而且没有加format属性,所以维表只能有三个字段,多了也识别不到,
            详细可以看源码里的注释
        */
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);
        /**
            这里需要注意的是,由于使用hget命令,这里join的参数两个参数顺序没有关系,真正执行hget命令哪个字段作为key,
            哪个字段作为field只与维表定义的时候的字段顺序有关系
        */
        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                        + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

3、redis作为维表(带format)

3.1、数据准备

    @Before
    public void init() {
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"学生" + i + "\",\n" +
                    "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化学生数据
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班级数据
         */
        for(int i = 0;i < 10;i++) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("class_id",String.valueOf(i + 1));
            jsonObject.put("class_name","银河" + (i + 1) + "班");
            jsonObject.put("remark","remark" + i);
            redisOperator.set(String.valueOf(i + 1),jsonObject.toString());
        }

        /**
         * 初始化学校班级数据
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("class_id",String.valueOf(i));
                jsonObject.put("class_name","银河" + i + "班");
                jsonObject.put("remark","remark" + i);
                jsonObject.put("school","学校" + j);
                redisOperator.hset("学校" + j, String.valueOf(i), jsonObject.toString());
            }
        }
    }

3.2、使用GET作为维表查询命令

    @Test
    public void testGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        /**
         * 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为get命令的key的字段
         * 一定要放在表申明的第一位,并且get命令的value的值使用format格式化后,比如是json格式,则json里一定要包含作为维表查询的
         *  join on后面带的作为key的查询列,不然会报空指针异常
         */ 
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string ,\n   " +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'format'='json', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='GET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark  from students s"
                        + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

3.3、使用HGET作为维表查询命令

    @Test
    public void testHGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        /**
         * 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为hget命令的key的字段
         * 一定要放在表申明的第一位,field的字段一定要放在申明的第二位,并且hget命令的value的值使用format格式化后,比如是json格式,          * 则json里一定要包含作为维表查询的 join on后面带的作为key和field的查询列,不然会报空指针异常
         */ 
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string,   " +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'format'='json', \n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark  from students s"
                        + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

4、redis作为sink表

4.1、数据准备

    @Before
    public void init() {
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"学生" + i + "\",\n" +
                    "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化学生数据
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班级数据
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");
        }

        /**
         * 初始化学校班级数据
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");
            }
        }
    }

4.2、使用LPush、RPUSH、SADD命令作为sink表写入命令

@Test
    public void testLPushSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是LPUSH,所以不需要primary key(number) not enforced, 因为这种命令只支持INSERT语义
         *  2、并行度配置项sink.parallelism没有配置,默认为核心数
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='sink_students_list',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='LPUSH'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

4.2、使用SET命令作为sink表写入命令

    @Test
    public void testSet() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是SET,所以需要一个key,这里key就是使用主键,多个就用下划线拼接起来,
         *  2、并行度配置项sink.parallelism没有配置,默认为核心数
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='SET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start) + "ms");
    }

4.3、使用HSET命令作为sink表写入命令(不指定key)

    @Test
    public void testHSet() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是HSET,所以需要一个key和一个field,这里是按照表申明的顺序,第一个作为key,
         *  第二个作为field,由于需要更新,也需要一个主键,这里最好把前两个字段一起作为主键
         *  2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认为-1表示长期保存
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'sink.key.ttl' = '300',\n" +
                        "  'command'='HSET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start) + "ms");
    }

4.4、使用HSET命令作为sink表写入命令(指定key)

    @Test
    public void testHSetWithKey() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、这里因为command是HSET,所以需要一个key和一个field,这里配置项指定了key,那么主键拼接就作为field,
         *  使用hset保存到redis
         *  2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认-1表示长期保存
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'key'='sink_students_hset',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'sink.key.ttl' = '300',\n" +
                        "  'command'='HSET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start) + "ms");
    }

5、配置说明

配置项描述
host redis的host
port redis的port
password redis的password
cluster-nodes redis的集群节点,ip和端口之间用英文冒号分隔,多个ip端口用英文逗号分割
master.name redis的sentinel模式的master节点的名称
sentinels.info redis的sentinel模式的info信息
sentinels.password redis的sentinel模式的密码
database redis的database,一般是0~15
command redis的命令,作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP;作为维表时支持GET、HGET;作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET
redis-mode redis的部署模式,single、cluster、sentinel
key redis需要访问的key,比如数据是以某个固定的key存放在redis里,值是一个list;redis执行lpush、rpush、sadd、hset等sink使用的命令时的key;
timeout 连接redis的超时时间,单位毫秒
max-total 连接redis的连接池的最大连接数
max-idle 连接redis的连接池的最大空闲数
min-idle 连接redis的连接池的最小空闲数
format 格式化数据格式,如json、csv
batch-fetch-rows 像LPOP、BLPOP、RPOP、BRPOP这种命令每次从redis拿到数据的条数
lookup.cache.max-rows 作为维表lookup模式,缓存在内存中的数据的最大条数
lookup.cache.ttl 作为维表lookup模式,缓存在内存中的数据的ttl超时时间,单位秒
lookup.max-retries 作为维表lookup模式,查找数据的失败重试次数
lookup.cache.load-all 作为维表lookup模式,查找数据是否加载所有,主要是针对hget命令,如:HGET KEY_NAME FIELD_NAME;是否根据key查出所有field的值,这里可以根据实际hash表的大小决定是否要查询所有出来缓存起来
sink.max-retries redis作为sink源时,最大重试次数
sink.parallelism redis作为sink源时,sink的并行数,默认并行度为核心数
sink.key.ttl redis作为sink源时,sink的数据保存在redis的ttl超时时间,单位秒,默认为-1表示长期保存
lookup.max-retries 作为维表lookup模式,查找数据的失败重试次数

源码地址:https://gitee.com/rongdi/flinksql-connector-redis/

标签:string,flinksql,redis,class,sink,id,读写,name
From: https://www.cnblogs.com/rongdi/p/16777702.html

相关文章

  • redis新数据类型Bitmaps基础操作
    1、首先连上你的redis以下一些命令很简单,或者执行命令后效果也无法体现的就没有进行截图展示2、Bitmaps相关操作说明:现在的计算机用二进制(位)作为信息的基础单位,Bitmaps......
  • Linux 下安装Redis
    下载地址:http://redis.io/download,下载最新稳定版本。本教程使用的最新文档版本为2.8.17,下载并安装:1#wgethttp://download.redis.io/releases/redis-6.0.8.tar.gz2......
  • Redis-2
    Redis配置文件在redis根目录提供redis.conf配置文件,如果不使用配置文件,redis会按照默认参数运行网络配置port:指定redis服务使用的端口,默认使用6379bind:配置客户端......
  • docker redis 修改密码
    进入redis[root@iZwz98nzsodcbigjqrrmxmZ~]#dockerexec-itredisbash进入bin目录root@1473acb2f8e7:/data#cd/usr/local/bin/运行命令root@1473acb2f8e7:/us......
  • 阿里云服务器上安装Redis
    阿里云服务器上安装Redis"多学习,多思考"目录阿里云服务器上安装Redis"多学习,多思考"1、下载安装1.1、解压缩1.2、安装C依赖1.3、编译2、修改配置文件3、启动Redis服务4、......
  • Redis和Lombok的下载安装
    下载地址:下载界面:下载好之后进行安装安装界面下一步安装路径的改动默认下一步继续下一步安装点击完成这是我们的安装目录最简单的启动方式是直接双......
  • 2022-10-10 (≥▽≤) Redis数据库
    1.RedisNoSQL:NotOnlySQL非关系型数据库NoSQL的四大类:键值(Key-Value)存储数据库,使用到一个哈希表,这个表中有一个指针指向特定的数据:Redis,Memcache。列存储数据库......
  • SpringBoot启动报错:Parameter 0 of method hmset in com.qcby.rbac.util.RedisUtils r
    SpringBoot启动报错,报错信息如下:  报错是由于A类中定义了含参数的构造函数,Spring自动构造和注入时未为该Bean传入参数,引起报错。查了很多资料,最后发现,我是因为注释......
  • redis本地缓存
    我不想带给你负面情绪,但又想让你知道我的不开心。为了系统性能的提升,一般会将部分数据放入缓存中,加快访问速度。而db承担数据罗盘工作。哪些数据适合放在缓存及时性......
  • StoneDB读写分离实践方案
    在StoneDB1.0版本中,InnoDB引擎处理OLTP的事务型业务,Tianmu引擎处理OLAP的分析型业务。因此,需要在主从复制环境的基础上做读写分离,所有的写操作和部分读操作走Inno......