首页 > 数据库 >Redis+Hbase+RocketMQ 实际使用问题案例分享

Redis+Hbase+RocketMQ 实际使用问题案例分享

时间:2023-01-19 16:47:28浏览次数:50  
标签:MQClientAPIImpl java Redis redis 重试 RocketMQ apache Hbase rocketmq

需求

  1. 将Hbase数据,解析后推送到RocketMQ。
  2. redis使用list数据类型,存储了需要推送的数据的RowKey及表名。

简单画个流程图就是:

分析及确定方案

Redis

  1. 明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey;
  2. 一次取多个元素加快效率;
  3. 取了之后放入重试队列,并删除原来的元素;
  4. 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;
  5. 明确从list中取值所使用的redis命令;范围获取LRANGE;范围删除(留下指定范围的数据)LTRIM;判断list长度LLEN;加入listRPUSH;删除LREM等等;
  6. 从Hbase获取数据失败和发送到mq失败都令重试次数加一;
  7. 每次碰到重试次数不为0的数据都休眠1s;
  8. 设置最大重试次数,达到限制后丢弃;
  9. 考虑客户redis部署方式,单机、主从、集群、哨兵等;
  10. 选择合适的客户端,Jedis、Redisson、Lettuce等;
  11. 编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;

Hbase

  1. 基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;当时看这个觉得不错https://juejin.cn/post/6844903797655863309
  2. 因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;
  3. 确定批量获取数据方式为批量Get,没用scan
  4. 了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone相关方法;
  5. 考虑所有都没数据时休眠10s;

RocketMQ

  1. 有现成的发送代码,公司封装好的;
  2. 调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);
  3. 调整服务端的内存、线程数等参数;

实现

配置

#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 集群地址,逗号分隔
hbase.zookeeper.quorum=
# Zookeeper 端口
hbase.zookeeper.property.clientPort=2181
# 消息目的rocketmq地址
rocketmq.server.host=
# 发送消息间隔时间,防止发送过快mq受不了
rocketmq.send.interval.millisec=10
# 每次从redis读取数据量限制。
data.access.redisDataSize=100
# 失败数据重试次数,超过的直接丢弃
data.access.retryNum=10
# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back

部分代码

获取配置,其余的直接@Value("${}")

@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {

    /**
     * key:topic; value:redis的key
     */
    private Map<String, String> topicKeyMap = new HashMap<>();

    /**
     * 一次从redis中读取数据量限制
     */
    private long redisDataSize = 50;

    /**
     * 失败数据重试次数
     */
    private int retryNum = 10;

}

开启接入:

@Component
public class AdapterRunner implements ApplicationRunner {

    @Resource
    private DataAccessService dataAccessService;

    @Override
    public void run(ApplicationArguments args) {
        System.out.println("项目已启动,开始接入数据到RocketMQ……");
        dataAccessService.accessData2Mq();
    }
}

其他代码其实也在分析里了。

踩坑

  1. mq发送问题
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
	at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。

总结

程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。

标签:MQClientAPIImpl,java,Redis,redis,重试,RocketMQ,apache,Hbase,rocketmq
From: https://www.cnblogs.com/letscrazy/p/case.html

相关文章

  • redis事务
    Redis的事务仅保证了数据的一致性,不具有像DBMS一样的ACID特性。这组命令中的某些命令的执行失败不会影响其它命令的执行,不会引发回滚,即不具备原子性。这组命令通过......
  • redis数据类型常用命令
    String字符串若字符串中有空格注意用引号。set--创建key#若键不存在则创建,若存在则更新其值,在设置操作成功完成时返回OKsetkeyvalue#设置key时同时指定过......
  • redis数据类型应用场景
    Redis总共有8种数据类型,前5种为常用(基本)数据类型Redis五种基本数据类型String字符串概述Redis最基本的类型,默认最大能存储512MB数据。String类型的Value中可以存......
  • redis配置文件
    Redis的配置文件位于Redis安装目录下,文件名为redis.conf(Windows名为redis.windows.conf)。可以在登入Redis后通过CONFIG命令查看或设置配置项通过config命令查......
  • redis的key操作及切换数据库
    redis默认有16个库(0-15),进入后默认在第0个库#切换数据库使用select命令,index为数据库标号,切换数据库后除了0号库其余库会在端口后的[]会显示编号selectindex#示例......
  • redis安装
    1.到官网寻找自己需要的版本wgethttps://github.com/redis/redis/archive/7.0.7.tar.gz2.编译&编译安装#解压并改名tarzxvf7.0.7.tar.gz-C/usr/local/mv/us......
  • redis启停和连接
    启动#redis-server命令用于启动redis,安装后该命令已位于/usr/local/bin/目录下,可在任意路径执行#直接执行redis-server则是前台启动,使用redis-server指定配置文件启动......
  • HBASE列族不能太多的真相 (一个table有几个列族就有几个 Store)
    HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了table中的一个region,HRegion中由多个HStore组成。每个HStore对应了Table中的一个columnfamily的存储,可以看......
  • Redis下载安装与配置(linux)
    一、Redis下载与安装1.下载安装包官网下载地址:Download|Redis点击"Download7.0.7",即可进行下载。2.将安装包上传至服务器2.1将安装包上传至/usr/local目录并解压......
  • redis面试题
    1.项目中是否使用过redis?为什么要使用redis?使用过之前使用的都是修改某个value值,如登录账号被锁定30分钟,查看还剩余的时间,或者想将账号由锁定状态更新为未锁定状态,删......