起因
最近在工作做类似聊天室的一个功能,主要涉及多个管理员和多个用户交互,涉及几个功能点:
- 聊天列表
- 显示用户昵称
- 显示用户未读数
- 根据最新消息排序
- 单个用户未读消息状态记录
- 管理总消息数
初步方案
由于进入项目时间问题,设计是采用之前同事的设计,并没有过多想过并发的问题,设计之初的方案为:
- 聊天列表采用List结构进行缓存,单个用户的昵称和未读消息数及最后修改时间作为一个对象进行缓存。实时修改。
- 管理员总消息数作为一个缓存,每次先查后更新。
- 单个用户聊天记录单独缓存
开发过程中发现,多个用户同时发消息时,并发修改聊天列表List会产生并发问题,可能造成脏读脏写问题。由于方案已经定了,所以我这边的处理就是通过加锁解决对应的并发,搜索发现项目其实很少有锁机制处理,有些老业务用的synchronize的上锁,单个服务业务处理,由于我做的微服务属于多台部署,项目目前还没有锁机制,便根据Redis的SetNx写了个简单的分布式锁.避免引入过多依赖和锁机制。
简单锁如下:
public Boolean tryLock(Long roomId, String userId) {
try {
String key = InvigilationCacheKeyConstant.ROOM_SIMPLE_LOCK_KEY + roomId;
Long flag = CacheClient.setNX(key, userId, 5);
if (flag > 0) {
return true;
}
do {
Thread.sleep(100);
flag = CacheClient.setNX(key, userId, 5);
if (flag > 0) {
return true;
}
} while (true);
} catch (InterruptedException e) {
log.error("尝试加锁异常房间Id:{},用户Id:{}", roomId, userId, e);
}
return false;
}
public void releaseLock(Long roomId) {
try {
String key = InvigilationCacheKeyConstant.ROOM_SIMPLE_LOCK_KEY + roomId;
CacheClient.del(key);
} catch (Exception e) {
log.error("尝试解锁异常 房间Id:{}", roomId, e);
}
}
上锁逻辑处理
由于列表保存了用户的未读数,而且管理员也会在大厅新增新用户到列表,所以列表接口实际也会做列表成员新增操作,顾上锁的接口有以下几块地方处理:
- 聊天列表接口--上锁处理:新用户的加入聊天列表,防止用户端并发加入造成用户丢失
- 管理员发送消息接口--上锁处理:保存聊天记录时需要更新用户消息记录缓存,防止与用户并发修改丢失消息内容
- 管理员聊天详情接口--上锁处理:消减聊天列表单个用户未读消息数和管理员总消息未读数
- 用户发送消息接口--上锁处理:用户加入聊天列表,及未读消息数改变。防止其他用户并发修改造成数据丢失
由于结构采用不当,导致整体业务大部分地方业务都进行了上锁处理。再者没有限制单个用户的最大消息数,单接口压测时,数据由于都在缓存,导致性能测试整体性能极差。
优化前性能测试结果
优化方案
老大优化的思路是尽量去避免上锁处理业务,基于数据结构去优化问题。
初步构思:
- 聊天列表保存了太多有关用户维度的数据,是否可以剔除用户维度的数据就可以避免并发问题
- 限制单个用户消息数,我们的业务并不会有单个用户大量聊天记录的场景
- 优化系统其他查库业务逻辑使用缓存处理
- 优化现有逻辑,避免无效查询。比如业务截止再查业务截止相关的业务,不要每次都查都检查
优化方案:
- 聊天列表采用Redis的Zset保存单个用户的userId,用Zadd进行用户添加,避免并发修改问题,直接转变为SET集合,同时基于Zset的score,ZRANGEBYSCORE 可以实现分页和排序。
- socre计算方案:如果用户没有聊天记录,默认展示到列表后面,score设置时间戳最大值,如果用户聊天记录llen >0, 则将用户score以当前时间戳进行转换处理,zrangbyScore默认顺序是小到大,所以如果需要展示最新时间的列表需要将时间转换处理,这里我是采用将时间戳最大值减当前时间戳实现越大的时间,score越小。
- 用户单人的消息未读数和管理员未读消息总数采用Redis的incr,decr进行实时增加减少,原子性操作也避免了先查后改数据的问题。
- 单个用户的聊天记录采用Redis的List进行保存,通过rpush和lrange进行排序和分页。通过llen 进行比较消息最大长度,超过限制就丢弃消息并提醒(主要是应对压测,不然单个数据量太大,详情接口和分页扛不住。实际业务单个用户并不会有较大消息量,可通过Nacos配置实时刷新这个限制,便于业务控制。)
- 优化系统其他查库业务逻辑如果有缓存使用缓存处理(由于第一次接触这个业务,不懂之前有啥缓存。这也是坑)
- 优化现有逻辑,避免无效查询。如业务截止再查业务截止相关的业务,不要每次都查都检查
score算法
/**
* 分数算法--10位时间戳取倒叙,用于列表时间排序
*
* @param now now
* @return score
*/
private double getScoreByNow(long now) {
long max = 9999999999L;
return (double) (max - (now / 1000));
}
缓存结构优化
单个用户聊天记录从原来的String结构转为List结构
旧:
/**
* 获取 考生聊天记录
*
* @param examId examId
* @param userId userId
*/
public List<UserChatContentCacheDTO> getUserChatContentCache(Long examId, String userId) {
String key = InvigilationCacheKeyConstant.ROOM_USER_CHAT_INFOS + examId + SPLIT_STR + userId;
String arrayJson = CacheClient.get(key);
if (StringUtil.isEmpty(arrayJson)) {
return new ArrayList<>();
}
JSONArray jsonStrArray = JSON.parseArray(arrayJson);
return jsonStrArray.toJavaList(UserChatContentCacheDTO.class).stream().sorted(Comparator.comparing(UserChatContentCacheDTO::getTime)).collect(Collectors.toList());
}
新:
/**
* 获取 考生聊天记录
*
* @param examId examId
* @param userId userId
*/
public List<UserChatContentCacheDTO> getUserChatContentCache(Long examId, String userId, Integer start, Integer end) {
String key = InvigilationCacheKeyConstant.ROOM_USER_CHAT_INFOS + examId + SPLIT_STR + userId;
List<String> lrange = CacheClient.lrange(key, start, end);
if (CollectionUtil.isEmpty(lrange)) {
return new ArrayList<>();
}
List<UserChatContentCacheDTO> result = lrange.stream().map(k -> GsonUtil.toBean(k, UserChatContentCacheDTO.class)).collect(Collectors.toList());
return result;
}
/**
* 更新 考生监考员聊天记录到用户聊天记录
*
* @param examId examId
* @param userId userId
*/
public void updateUserChatContentCache(Long examId, String userId, UserChatContentCacheDTO chatInfos, Long timeOut) {
String key = InvigilationCacheKeyConstant.ROOM_USER_CHAT_INFOS + examId + SPLIT_STR + userId;
CacheClient.rpush(key, GsonUtil.toJson(chatInfos), timeOut);
}
列表结构从原来的String结构转Zset结构
同时分离用户维度专属的数据,只留用户id,查询时在查其他用户维度的缓存
旧:通过上锁先查加入后更新
/**
* 获取 私信列表
*
* @return ChatRoomUsersCacheDTO
*/
public List<ChatRoomUsersCacheDTO> getChatRoomUserChache(Long roomId) {
String key = InvigilationCacheKeyConstant.ROOM_USER_INFOS + roomId;
String arrayJson = CacheClient.get(key);
if (StringUtil.isEmpty(arrayJson)) {
return new ArrayList<>();
}
JSONArray jsonStrArray = JSON.parseArray(arrayJson);
List<ChatRoomUsersCacheDTO> chatRoomUsers = jsonStrArray.toJavaList(ChatRoomUsersCacheDTO.class);
return chatRoomUsers.stream().distinct().sorted(Comparator.comparing(ChatRoomUsersCacheDTO::getLastUpdateTime).reversed()).collect(Collectors.toList());
}
新:剔除用户维度数据,去除上锁逻辑,单独查询
/**
* 获取 私信列表
*
* @return ChatRoomUsersCacheDTO
*/
public List<ChatRoomUsersCacheDTO> getChatRoomUserChache(Long roomId, Integer start, Integer end) {
String key = InvigilationCacheKeyConstant.ROOM_USER_INFOS + roomId;
Set<String> userIds = CacheClient.zrangeByScoreAndLimit(key, start, end);
if (CollectionUtil.isEmpty(userIds)) {
return new ArrayList<>();
}
List<ChatRoomUsersCacheDTO> result = userIds.stream().map(uid -> {
String userOtherKey = InvigilationCacheKeyConstant.ROOM_USER_CHAT_OTHER_INFOS + roomId + SPLIT_STR + uid;
String name = CacheClient.get(userOtherKey, String.class);
String userCountKey = InvigilationCacheKeyConstant.USER_NOT_READ_COUNT + roomId + SPLIT_STR + uid;
String userCount = CacheClient.get(userCountKey);
ChatRoomUsersCacheDTO chatRoomUsersCacheDTO = new ChatRoomUsersCacheDTO();
chatRoomUsersCacheDTO.setUserId(uid);
chatRoomUsersCacheDTO.setName(name);
if (StringUtil.isNotEmpty(userCount)) {
chatRoomUsersCacheDTO.setUnReadCount(Integer.parseInt(userCount));
} else {
chatRoomUsersCacheDTO.setUnReadCount(0);
}
return chatRoomUsersCacheDTO;
}).collect(Collectors.toList());
return result;
}
/**
* 更新 私信列表
*
* @return ChatRoomUsersCacheDTO
*/
public void updateChatRoomUserChache(Long roomId, Long examId, String uid, Double score, Long timeOut) {
String key = InvigilationCacheKeyConstant.ROOM_USER_INFOS + roomId;
String userContentKey = InvigilationCacheKeyConstant.ROOM_USER_CHAT_INFOS + examId + SPLIT_STR + uid;
Long length = CacheClient.llen(userContentKey);
if (length != null) {
if (length <= 0) {
CacheClient.zadd(key, InvigilationCacheKeyConstant.MAX_TIME_SCORE, uid, timeOut);
return;
}
}
CacheClient.zadd(key, score, uid, timeOut);
}
优化后性能测试结果
再次优化
经过上面的优化,系统性能已经满足目前的业务需求,但还有提升空间,保存消息可以改成异步消息来提升性能
优化思路:
- 用户和管理员保存聊天的接口都改为异步,采用消息队列消费后发送IM消息
public void saveChatContent(ChatContentVo vo) {
Long examId = vo.getExamId();
String userId = vo.getUserId();
Long userChatContentLength = invigilationUserCacheBiz.getUserChatContentLength(examId, userId);
if (userChatContentLength > MAX_USER_CHAT_COUNT) {
log.info("单个用户超过最大消息数聊天限制 examId:{},userId:{}", examId, userId);
throw new BloomRpcException(InvigilationErrorCodeEnum.MESSAGE_COUNT_LIMIT.getCode(), InvigilationErrorCodeEnum.MESSAGE_COUNT_LIMIT.getMsg());
}
sendMessageToSaveUserChat(vo);
}
private void sendMessageToSaveUserChat(ChatContentVo chatContentVo) {
MessageVo imMessage = new MessageVo();
imMessage.setType(MessageTypeEnum.MQ_TOPIC_INVIGILATION_SAVE_USER_CHAT.getValue());
imMessage.setKey(IdGenerator.getIdStr());
imMessage.setMessage(GsonUtil.toJson(chatContentVo));
iMqSender.send(imMessage);
}
其他服务消费具体消息
public class InvigilationMqConsumerHandler implements IMqConsumerHandler {
@Resource
private InvigilationAdminBiz invigilationAdminBiz;
@Override
public boolean hander(MessageBean messageBean) {
int type = messageBean.getType();
if (MessageTypeEnum.MQ_TOPIC_INVIGILATION_SAVE_USER_CHAT.getValue() == type) {
String message = messageBean.getMessage();
log.info("invigilation save user chat messageBean:{}", message);
if (StringUtil.isNotEmpty(message)) {
try {
CommonChatVO chatVO = GsonUtil.toBean(message, CommonChatVO.class);
invigilationAdminBiz.saveUserChat(chatVO);
return true;
} catch (Exception e) {
log.error("用户考试私信保存消息消费异常:message:{}", message, e);
}
}
}
if (MessageTypeEnum.MQ_TOPIC_INVIGILATION_SAVE_ADMIN_CHAT.getValue() == type) {
String message = messageBean.getMessage();
log.info("invigilation save admin chat messageBean:{}", message);
if (StringUtil.isNotEmpty(message)) {
try {
CommonChatVO chatVO = GsonUtil.toBean(message, CommonChatVO.class);
invigilationAdminBiz.saveChat(chatVO);
return true;
} catch (Exception e) {
log.error("监考员保存监考私信消息消费异常:message:{}", message, e);
}
}
}
if (MessageTypeEnum.MQ_TOPIC_INVIGILATION_SAVE_CHAT.getValue() == messageBean.getType()) {
String message = messageBean.getMessage();
log.info("invigilation save chat to db messageBean:{}", message);
if (StringUtil.isNotEmpty(message)) {
try {
MonitorGroupChat groupChat = GsonUtil.toBean(message, MonitorGroupChat.class);
invigilationAdminBiz.saveChatContent(groupChat);
return true;
} catch (Exception e) {
log.error("保存私信消息到数据库消费异常:message:{}", message, e);
}
}
}
return false;
}
优化结果
单端接口提升三倍左右
标签:一次,return,String,examId,性能,userId,用户,message,优化 From: https://www.cnblogs.com/hnusthuyanhua/p/16895683.html