流式查询2、mybatis通过用ResultHandler 流式查询,每次查询返回定义的500条,再去批量处理(可异步并发):
package com.aswatson.cdc.redis.schedule;
import com.aswatson.cdc.redis.common.lock.DistLock;
import com.aswatson.cdc.redis.common.lock.DistLockRegistry;
import com.aswatson.cdc.redis.conf.Constant;
import com.aswatson.cdc.redis.domain.repository.RedisTaskLogRepository;
import com.aswatson.cdc.redis.domain.repository.StaffProfileRepository;
import com.aswatson.cdc.redis.infrastructure.dao.redisDao.mapper.RedisLogMapper;
import com.aswatson.cdc.redis.infrastructure.dao.redisDao.model.RedisLogPO;
import com.aswatson.cdc.redis.utils.LocalCache;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.ResultContext;
import org.apache.ibatis.session.ResultHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class StaffProfileRefreshCache {
public static String tableName = "STAFF_PROFILE";
@Value("${buId}")
private String buId;
@Value("${staffProfile.cacheCount}")
private Long staffProfileCashCount;
@Autowired
private DistLockRegistry distLockRegistry;
@Autowired
private RedisLogMapper redisLogMapper;
@Autowired
private RedisTaskLogRepository redisTaskLogRepository;
@Autowired
private StaffProfileRepository staffProfileRepository;
public void refreshStaffProfile(String lockId, String count) throws Exception {
log.info("refreshStaffProfile log: {}, {}, {}", lockId, LocalDate.now(), count);
LocalDateTime startTime = LocalDateTime.now();
DistLock lock = distLockRegistry.getLock(lockId);
if(!lock.tryLock()) {
upsertRedisLockLog(lockId);
return;
}
Optional.ofNullable(count).ifPresent(x -> staffProfileCashCount = Long.valueOf(count));
final int BATCH_SIZE = 1000;
List<RedisLogPO> redisLogPOs = new ArrayList<>();
redisLogMapper.selectLargeDataByTableNameAndCount(buId, tableName, staffProfileCashCount,
new ResultHandler<RedisLogPO>() {
private int size;
@Override public void handleResult(ResultContext<? extends RedisLogPO> resultContext) {
redisLogPOs.add(resultContext.getResultObject());
size++;
if (size == BATCH_SIZE) {
staffProfileRefreshByKey(redisLogPOs);
size = 0;
}
}
});
Optional.of(redisLogPOs)
.filter(list -> !list.isEmpty())
.ifPresent(this::staffProfileRefreshByKey);
redisTaskLogRepository.saveRedisTaskLog(lockId, startTime, InetAddress.getLocalHost().getHostAddress());
}
public void staffProfileRefreshByKey(List<RedisLogPO> redisLogPOs) {
try {
redisLogPOs.parallelStream().forEach(redisLog -> {
String key = LocalCache.filterRedisKey(redisLog.getRedisName());
if (StringUtils.isNotBlank(key)) {
staffProfileRepository.refreshStaffProfileByKey(key.substring(25));
}
});
} catch (Exception e) {
log.error("e", e);
throw new RuntimeException(e);
} finally {
redisLogPOs.clear();
}
}
private void upsertRedisLockLog(String lockId) throws UnknownHostException {
String serverIp = InetAddress.getLocalHost().getHostAddress();
LocalDateTime startTime = LocalDateTime.now();
redisTaskLogRepository.saveRedisTaskLogByLock(lockId, startTime, serverIp, Constant.LOCK_EXIST);
}
}
void selectLargeDataByTableNameAndCount(@Param("buId")String buId, @Param("tableName")String tableName,
@Param("cacheCount")Long cacheCount,
ResultHandler<RedisLogPO> handler);
<select id="selectLargeDataByTableNameAndCount" resultType="com.aswatson.cdc.redis.infrastructure.dao.redisDao.model.RedisLogPO" resultSetType="FORWARD_ONLY" fetchSize = "500">
SELECT * from REDIS_ACCESS_LOG where table_name = #{tableName} and NAME_SPACE = #{buId} ORDER BY COUNT DESC LIMIT ${cacheCount}
</select>
标签:String,cdc,ResultHandler,aswatson,redis,查询,mybatis,import,com
From: https://www.cnblogs.com/lgg20/p/18311132