flink 利用 redis 去重
对于每一条待处理的 record,根据算法计算其唯一key:
key = getMessageKey(record);
如果 key 不存在,设置key值=0和超时. redis 保证 setnx 指令是原子的:
result = cmds.set(key, "0", nxShortTimeArgs);
如果 key 存在, result 返回 nil。如果返回 OK 说明线程获得了key,继续操作:
onCollectKey(key, value, out);
上面的过程记录下来,就是下面这个类,使用时需要继承并实现几个重载函数:
/**
* RedisStatefulMapFunction.java
*
* https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFlatMapFunction.html
* https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeHint.html
*
*
* create: 2020-11-19
* update: 2020-12-15
*/
package com.cheungmine.flink.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.pepstack.guru.LogUtil;
import com.pepstack.guru.RedisClusterIO;
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.SetArgs;
public class RedisStatefulMapFunction<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
private static final Logger logger = LoggerFactory.getLogger(RedisStatefulMapFunction.class);
private transient RedisClusterClient redisClient;
private transient StatefulRedisClusterConnection<String, String> connection;
private static final String[] propKeys = new String[] {
"redis.cluser.uris"
,"redis.timeout.millis"
,"redis.expire.seconds"
};
private final String[] redisClusterUris;
private final int redisTimeoutMillis;
private final long redisExpireSeconds;
private final SetArgs nxShortTimeArgs;
private final SetArgs xxExpireTimeArgs;
protected RedisStatefulMapFunction(Properties kvprops) throws Exception {
// hard code here:
redisClusterUris = new String[] {
"redis://[email protected]:7001?timeout=30s",
"redis://[email protected]:7002?timeout=30s",
"redis://[email protected]:7003?timeout=30s",
"redis://[email protected]:7004?timeout=30s",
"redis://[email protected]:7005?timeout=30s",
"redis://[email protected]:7006?timeout=30s",
"redis://[email protected]:7007?timeout=30s",
"redis://[email protected]:7008?timeout=30s",
"redis://[email protected]:7009?timeout=30s"
};
redisTimeoutMillis = 30*1000;
redisExpireSeconds = 86400*30;
nxShortTimeArgs = SetArgs.Builder.nx().px(redisTimeoutMillis);
xxExpireTimeArgs = SetArgs.Builder.xx().ex(redisExpireSeconds);
}
// 子类必须覆盖此方法
public String getMessageKey(IN value)
{
// 根据 IN value 记录计算其签名值 (签名值一样的记录被任务是重复记录)
// 子类应该实现这个方法, 返回 null 值表面放弃本消息
return null;
}
// 子类必须覆盖此方法
public void onCollectKey (String key, IN value, Collector<OUT> out)
{
LogUtil.debug(logger, "{%s}", key);
}
// 子类最好忽略此方法
public void onDiscardKey (String key, IN value, Collector<OUT> out)
{
// 其他线程已经处理了这个消息
LogUtil.warn(logger, "{%s}", key);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
redisClient = RedisClusterIO.createClient(redisClusterUris, Duration.ofMillis(redisTimeoutMillis));
connection = redisClient.connect();
}
@Override
public void close() throws Exception {
super.close();
connection.close();
RedisClusterIO.shutdownClient(redisClient);
}
@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {
RedisAdvancedClusterCommands<String, String> cmds = connection.sync();
// 根据 IN value 记录计算其签名值 key
final String key = getMessageKey(value);
if (key != null) {
// redis 保证 setnx 指令是原子的, 仅仅当 key 不存在设置其值和过期时间
final String result = cmds.set(key, "0", nxShortTimeArgs);
if (result != null && result.equals("OK")) {
// 处理该记录
onCollectKey(key, value, out);
// 至此被任务记录操作成功, 设置一个较长的过期时间.
// redis 保证 setxx 指令是原子的, 仅当 key 存在设置其值和过期时间
cmds.set(key, "1", xxExpireTimeArgs);
} else {
// 忽略该记录
onDiscardKey(key, value, out);
}
}
}
}
redis 集群操作类 RedisClusterIO:
/**
* RedisClusterIO.java
* 操作 Redis 集群
*
*/
package com.pepstack.guru;
import com.pepstack.guru.LogUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.time.Duration;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
// https://github.com/lettuce-io/lettuce-core/blob/6.0.1.RELEASE/src/test/java/io/lettuce/examples/ConnectToRedisCluster.java
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
public class RedisClusterIO {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterIO.class);
private RedisClusterIO() {
}
public static RedisClusterClient createClient(String[] uris, Duration defaultTimeout) throws Exception {
RedisClusterClient redisClient = null;
List<RedisURI> list = new ArrayList<RedisURI>();
for (int i = 0; i < uris.length; i++) {
// uri Syntax:
// redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
LogUtil.info(logger, String.format("redis-server.%d=%s", i+1, uris[i]));
list.add(RedisURI.create(uris[i]));
}
redisClient = RedisClusterClient.create(list);
if (defaultTimeout != null) {
redisClient.setDefaultTimeout(defaultTimeout);
}
return redisClient;
}
public static RedisClusterClient createClient(Properties props) throws Exception {
int maxid = 0;
while (props.getProperty(String.format("redis-server.%d", maxid+1), null) != null) {
maxid++;
}
final String[] uris = new String[maxid];
for (int i = 0; i < maxid; i++) {
uris[i] = props.getProperty(String.format("redis-server.%d", i + 1));
}
return createClient(uris, null);
}
public static void shutdownClient(RedisClusterClient client) throws Exception {
client.shutdown();
}
}
特此记录。
标签:java,String,lettuce,flink,redis,利用,key,import From: https://blog.51cto.com/mapaware/6001817