首页 > 数据库 >flink 利用 redis 去重

flink 利用 redis 去重

时间:2023-01-11 11:31:07浏览次数:35  
标签:java String lettuce flink redis 利用 key import


flink 利用 redis 去重

对于每一条待处理的 record,根据算法计算其唯一key:

    key = getMessageKey(record);

如果 key 不存在,设置key值=0和超时. redis 保证 setnx 指令是原子的:

    result = cmds.set(key, "0", nxShortTimeArgs);

如果 key 存在, result 返回 nil。如果返回 OK 说明线程获得了key,继续操作:

    onCollectKey(key, value, out);

上面的过程记录下来,就是下面这个类,使用时需要继承并实现几个重载函数:

/**
* RedisStatefulMapFunction.java
*
* https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFlatMapFunction.html
* https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeHint.html

*
*
* create: 2020-11-19
* update: 2020-12-15
*/
package com.cheungmine.flink.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pepstack.guru.LogUtil;
import com.pepstack.guru.RedisClusterIO;

import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.SetArgs;


public class RedisStatefulMapFunction<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
private static final Logger logger = LoggerFactory.getLogger(RedisStatefulMapFunction.class);

private transient RedisClusterClient redisClient;
private transient StatefulRedisClusterConnection<String, String> connection;

private static final String[] propKeys = new String[] {
"redis.cluser.uris"
,"redis.timeout.millis"
,"redis.expire.seconds"
};

private final String[] redisClusterUris;

private final int redisTimeoutMillis;
private final long redisExpireSeconds;

private final SetArgs nxShortTimeArgs;
private final SetArgs xxExpireTimeArgs;


protected RedisStatefulMapFunction(Properties kvprops) throws Exception {
// hard code here:
redisClusterUris = new String[] {
"redis://[email protected]:7001?timeout=30s",
"redis://[email protected]:7002?timeout=30s",
"redis://[email protected]:7003?timeout=30s",
"redis://[email protected]:7004?timeout=30s",
"redis://[email protected]:7005?timeout=30s",
"redis://[email protected]:7006?timeout=30s",
"redis://[email protected]:7007?timeout=30s",
"redis://[email protected]:7008?timeout=30s",
"redis://[email protected]:7009?timeout=30s"
};

redisTimeoutMillis = 30*1000;
redisExpireSeconds = 86400*30;

nxShortTimeArgs = SetArgs.Builder.nx().px(redisTimeoutMillis);
xxExpireTimeArgs = SetArgs.Builder.xx().ex(redisExpireSeconds);
}

// 子类必须覆盖此方法
public String getMessageKey(IN value)
{
// 根据 IN value 记录计算其签名值 (签名值一样的记录被任务是重复记录)
// 子类应该实现这个方法, 返回 null 值表面放弃本消息
return null;
}


// 子类必须覆盖此方法
public void onCollectKey (String key, IN value, Collector<OUT> out)
{
LogUtil.debug(logger, "{%s}", key);
}


// 子类最好忽略此方法
public void onDiscardKey (String key, IN value, Collector<OUT> out)
{
// 其他线程已经处理了这个消息
LogUtil.warn(logger, "{%s}", key);
}


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

redisClient = RedisClusterIO.createClient(redisClusterUris, Duration.ofMillis(redisTimeoutMillis));
connection = redisClient.connect();
}


@Override
public void close() throws Exception {
super.close();

connection.close();
RedisClusterIO.shutdownClient(redisClient);
}


@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {
RedisAdvancedClusterCommands<String, String> cmds = connection.sync();

// 根据 IN value 记录计算其签名值 key
final String key = getMessageKey(value);

if (key != null) {
// redis 保证 setnx 指令是原子的, 仅仅当 key 不存在设置其值和过期时间
final String result = cmds.set(key, "0", nxShortTimeArgs);

if (result != null && result.equals("OK")) {
// 处理该记录
onCollectKey(key, value, out);

// 至此被任务记录操作成功, 设置一个较长的过期时间.
// redis 保证 setxx 指令是原子的, 仅当 key 存在设置其值和过期时间
cmds.set(key, "1", xxExpireTimeArgs);
} else {
// 忽略该记录
onDiscardKey(key, value, out);
}
}
}
}

redis 集群操作类 RedisClusterIO:

/**
* RedisClusterIO.java
* 操作 Redis 集群
*
*/
package com.pepstack.guru;

import com.pepstack.guru.LogUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Iterator;
import java.util.Map.Entry;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import java.time.Duration;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;

// https://github.com/lettuce-io/lettuce-core/blob/6.0.1.RELEASE/src/test/java/io/lettuce/examples/ConnectToRedisCluster.java
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;


public class RedisClusterIO {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterIO.class);

private RedisClusterIO() {
}


public static RedisClusterClient createClient(String[] uris, Duration defaultTimeout) throws Exception {
RedisClusterClient redisClient = null;

List<RedisURI> list = new ArrayList<RedisURI>();
for (int i = 0; i < uris.length; i++) {
// uri Syntax:
// redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
LogUtil.info(logger, String.format("redis-server.%d=%s", i+1, uris[i]));

list.add(RedisURI.create(uris[i]));
}

redisClient = RedisClusterClient.create(list);

if (defaultTimeout != null) {
redisClient.setDefaultTimeout(defaultTimeout);
}

return redisClient;
}


public static RedisClusterClient createClient(Properties props) throws Exception {
int maxid = 0;
while (props.getProperty(String.format("redis-server.%d", maxid+1), null) != null) {
maxid++;
}

final String[] uris = new String[maxid];
for (int i = 0; i < maxid; i++) {
uris[i] = props.getProperty(String.format("redis-server.%d", i + 1));
}
return createClient(uris, null);
}


public static void shutdownClient(RedisClusterClient client) throws Exception {
client.shutdown();
}
}

特此记录。

标签:java,String,lettuce,flink,redis,利用,key,import
From: https://blog.51cto.com/mapaware/6001817

相关文章

  • CentOS下redis安装部署
    1.安装依赖redis是由C语言开发,因此安装之前必须要确保服务器已经安装了gcc,可以通过如下命令查看机器是否安装:gcc-v如果没有安装则通过以下命令安装:yuminstall-ygcc......
  • Redis-01-初见NoSQL
    目录1.为什么要用NoSQL2.什么是NoSQL?2.1NotOnlyStructuredQueryLanguage2.2Nosql特点2.3传统的RDBMS(关系型数据库)2.4Nosql2.5Nosql的四大分类2.5.1KV键值......
  • Redis缓存何以一枝独秀?(2) —— 聊聊Redis的数据过期、数据淘汰以及数据持久化的实现
    大家好,又见面了。本文是笔者作为掘金技术社区签约作者的身份输出的缓存专栏系列内容,将会通过系列专题,讲清楚缓存的方方面面。如果感兴趣,欢迎关注以获取后续更新。上......
  • python利用matplotlib生成迷宫
    起因我想要写一个项目叫python迷宫游戏,需求是玩家能和机器对抗率先走出迷宫,至少要有两个等级的电脑。慢慢来,首先迷宫游戏需要有一个迷宫并展示出来,这便是这篇博客的目的......
  • Python爬虫-第三章-4-利用BeautifulSoup模块爬取某网壁纸图库图片
    思路:1.提取子页面链接2.访问子链接页面,提取下载地址3.访问下载地址下载内容到本地#DemoDescribe:数据解析bs4importtimeimportrequestsimportrandomimportstringfr......
  • 利用packstack工具快速安装单机版OpenStack的完整过程以及出现错误的解决方法
    CentOS7快速安装单机版OpenStack准备环境安装CentOS7虚拟机,vCPU2x2,内存16G,网络模式为NAT,要保证可以正常上网。[root@localhost~]#pingwww.baidu.comPINGw......
  • 高性能IO模型:为什么单线程Redis能那么快?
    你好,我是蒋德钧。今天,我们来探讨一个很多人都很关心的问题:“为什么单线程的Redis能那么快?”首先,我要和你厘清一个事实,我们通常说,Redis是单线程,主要是指Redis的网络IO和键......
  • Redis的客户端
    站在java开发语言的角度,虽然可供选择的客户端非常多,但是官方主要推荐使用以下三种客户端,原因大概是它们对redis提供的支持更加全面、api操作更佳丰富。1、JedisJedi......
  • redis
    redisredisredis安装、启动、连接、以及redis五种数据类型及命令redis高级用法之慢查询、pipline与事务、发布订阅、bitmap位图、hyperloglog、geo、持久化、主从red......
  • Docker 利用buildx插件构建多平台镜像
    ##安装DockerBuildx[安装DockerBuildx](https://docs.docker.com/build/buildx/install/)##运行Docker镜像分发的跨平台模拟器```shdockerrun--privile......