分布式缓存
缓存常见的问题
缓存穿透
缓存和数据库中都没有的数据,而用户不断发起请求,导致数据压力过大,甚至击垮数据库
比如黑客会对你的系统进行攻击,拿一个不存在的id 去查询数据,会产生大量的请求到数据库去查询。可能会导致你的数据库由于压力过大而宕掉
解决办法:
1、在接口增加校验
2、在网关Nginx增加配置顶,对单个ip每秒访问次数超出阈值的ip都给拉黑
3、如果一个查询返回的数据为空(不管数据是否存在,还是系统故障),仍然将这个空结果缓存
4、布隆过滤器:利用高效的数据结构和算法快速判断key是否存在数据库,不存在直接return,存在则去查询DB刷新KV再return
图解:
缓存雪崩
原有缓存失效,新缓存未到期间,大量数据打垮DB。
比如程序首页大面积的缓存设置了相同的过期时间,同一时刻缓存失效,流量激增,DB撑不住,挂掉。
解决办法:
1、在批量往Redis存数据的时候,把每个Key的失效时间都加个随机值就好了,这样可以保证数据不会在同一时间大面积失效
2、如果Redis是集群部署,将热点数据均匀分布在不同的Redis库中也能避免全部失效的问题
3、设置热点数据永远不过期,有更新操作就更新缓存就好了(比如运维更新了首页商品,那你刷下缓存就完事了,不要设置过期时间),电商首页的数据也可以用这个操作,保险。
防止雪崩方案可以参考下图:
图解:
缓存击穿
缓存击穿是指一个Key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个Key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个完好无损的桶上凿开了一个洞
解决办法:
1、设置热点数据永不过期
2、增加互斥锁
图解:
缓存预热
就是系统上线后,将缓存数据直接加载到缓存系统。这样用户请求,不用先查询数据库,再将数据缓存。直接查询事先被预热的数据
解决办法:
1、直接写个缓存页面,上线时手工操作下
2、数据量不大的话,在项目启动的时候自动进行加载
3、定时刷新缓存
缓存更新
除了缓存服务器自带的缓存失效策略之外(Redis默认的有6中策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:
1、定时去清理过期的缓存;
2、当有用户请求过来时,再判断这个请求所用到的缓存是否过期,过期的话就去底层系统得到新数据并更新缓存。
两者各有优劣,第一种的缺点是维护大量缓存的key是比较麻烦的,第二种的缺点就是每次用户请求过来都要判断缓存失效,逻辑相对比较复杂!具体用哪种方案,没有最好,只有最合适!
缓存降级
当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。
降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。
在进行降级之前要对系统进行梳理,看看系统是不是可以丢卒保帅;从而梳理出哪些必须誓死保护,哪些可降级;比如可以参考日志级别设置预案:
(1)一般:比如有些服务偶尔因为网络抖动或者服务正在上线而超时,可以自动降级;
(2)警告:有些服务在一段时间内成功率有波动(如在95~100%之间),可以自动降级或人工降级,并发送告警;
(3)错误:比如可用率低于90%,或者数据库连接池被打爆了,或者访问量突然猛增到系统能承受的最大阀值,此时可以根据情况自动降级或者人工降级;
发送告警;
(3)错误:比如可用率低于90%,或者数据库连接池被打爆了,或者访问量突然猛增到系统能承受的最大阀值,此时可以根据情况自动降级或者人工降级;
(4)严重错误:比如因为特殊原因数据错误了,此时需要紧急人工降级
本地锁
本地锁的问题
在多个线程同时访问的时候,由于时序问题可能会出现查询好几次数据库的问题
举个例子:
场景:先查缓存,缓存中没有去查数据库:
1号查询没有,启动本地锁,去数据库查询了,然后查询完释放本地锁,1号线程去存储redis;
由于redis存储需要一定的时间;
2号去查询,因为1号还没有放到redis缓存,所以查不到数据,于是又去查数据库了。
这样就发生了2次查数据库的情况,实际业务可能会发生更多查数据库的情况
解决方法:
在缓存中没有,1号查询数据库这个方法结束之前,已经存入数据库,那么就解决了,实现查询一次数据库,即查询+放入缓存为一个原子操作
项目中用法
缓存调用查询数据的方法
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson(){
//给缓存中放json字符串,拿出的json字符串,还能逆转为能用的对象类型;【序列化与反序列化】
/**
* 1、空结果缓存:解决缓存穿透
* 2、设置过去时间(加随机值):解决缓存雪崩
* 3、加锁:解决缓存击穿
*/
//1、加入缓存逻辑,缓存中存的数据是json字符串
//JSON跨语言,跨平台兼容。
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (StringUtils.isEmpty(catalogJSON)){
//2、缓存中没有,查询数据库
System.out.println("缓存不命中...将要查询数据库...");
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
return catalogJsonFromDb;
}
System.out.println("缓存命中...直接返回...");
//转为我们制定的对象
//fastJSON,把字符串JSON按照我们要转的类型(对象),进行转换; json--->Map<String, List<Catelog2Vo>>
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String, List<Catelog2Vo>>>(){});
return result;
}
查询数据库的方法
//从数据库查询并封装分类数据
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
//只要是同一把锁,就能锁住需要这个锁的所有线程
//1、synchronized (this):SpringBoot所有的组件在容器中都是单例的。
//TODO 本地锁:synchronized,JUC(Lock),在分布式情况下,响应锁住所有,必须使用分布式锁
synchronized (this){
//得到锁以后,我们应该再去缓存中确认一次,如果没有才需要继续查询
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (!StringUtils.isEmpty(catalogJSON)){
//缓存不为null直接返回
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String, List<Catelog2Vo>>>(){});
return result;
}
System.out.println("查询了数据库");
/**
* 1、将数据库的多次查询变为一次
*
*/
List<CategoryEntity> selectList = baseMapper.selectList(null);
//1、查出所有1级分类
List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);
//2、封装数据
Map<String, List<Catelog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//1、每一个的一级分类,查到这个一级分类的二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
//2、封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo("v.getCatId()", null, l2.getCatId().toString(), l2.getName());
//1、找当前二级分类的三级分类封装成vo
List<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());
if (level3Catelog != null) {
List<Catelog2Vo.Catelog3Vo> collect = level3Catelog.stream().map(l3 -> {
//2、封装成指定格式
Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return catelog3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(collect);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
//3、查到的数据再放入缓存,将对象转为json放在缓存中
String s = JSON.toJSONString(parent_cid); //利用fastJSON把对象转为json字符串
redisTemplate.opsForValue().set("catalogJSON",s,1, TimeUnit.DAYS); //设置1天这个key过期
return parent_cid;
}
}
private List<CategoryEntity> getParent_cid(List<CategoryEntity> selectList,Long parent_cid) {
List<CategoryEntity> collect = selectList.stream().filter(item -> item.getParentCid() == parent_cid).collect(Collectors.toList());
return collect;
//return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", v.getCatId()));
}
这样就可以实现缓存中没有,只查询一次数据库了
本地锁总结
-
本地锁可以在单个实例的情况下,完成成功加锁。
-
但是在分布式多个实例下,本地锁只能锁住自己的单例,在分布式下就会出现查询多次的情况出现。
-
为了锁住所有的进程,我们就需要分布式锁了。
分布式缓存-Redis-Jedis-分布式锁
缓存使用
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而db承担数据落盘工作。
哪些数据适合放入缓存?
-
即时性、数据一致性要求不高的
-
访问量大且更新频率不高的数据(读多,写少)
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定),后台如果发布一个商品,买家需要5分钟才能看到新的商品一般还是可以接受的。
本地缓存可以使用map来做缓存,分布式下则不行,则需要加入缓存中间件来统一缓存
这里我们使用redis,Java使用redis有很多方法。
SpringBoot使用redis异常
springBoot可以直接使用redis,但是可能会有异常
@Override
public Map<String, List<Catalog2Vo>> getCatalogJson() {
String catalogJSON = stringRedisTemplate.opsForValue().get("catalogJSON");
if(StringUtils.isBlank("catalogJSON") || catalogJSON == null){
Map<String, List<Catalog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
String s = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catalogJSON",s,1,TimeUnit.SECONDS);
return catalogJsonFromDb;
}
Map<String, List<Catalog2Vo>> map = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2Vo>>>() {
});
return map;
}
堆外内存溢出:
原因:
springboot在2.0后默认使用lettuce操作redis客户端,它使用netty进行网络通信,lettuce的bug导致堆外内存溢出,底层实现是只要有操作,就会统计内存使用量,操作完会decrement减内存,可能是lettuce客户端在减内存的过程出错
解决:
对这个问题调节-Xmx大小不起作用,而且如果netty(netty如没有指定,默认是-Xmx300m)没指定堆外内存,默认使用-Xmx,所以一旦出现问题,就算调大-Xmx也总会到堆外内存溢出的地步。所以我们不能用io.netty.maxDirectMemory只修改堆外内存大小。
还有2种方案:
- 1、升级lettuce客户端
- 2、排除lettuce依赖,使用jedis
Jedis
jedis是什么
Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持;
使用jedis
Jedis实例不是线程安全的,所以不可以多个线程共用一个Jedis实例,但是创建太多的实现也不好因为这意味着会建立很多sokcet连接。
JedisPool是一个线程安全的网络连接池。可以用JedisPool创建一些可靠Jedis实例,可以从池中获取Jedis实例,使用完后再把Jedis实例还回JedisPool。这种方式可以避免创建大量socket连接并且会实现高效的性能.
依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.3</version>
</dependency>
RedisUtil
public class RedisUtil {
private JedisPool jedisPool;
public void initPool(String host,int port ,int database){
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(30);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWaitMillis(10*1000);
poolConfig.setTestOnBorrow(true);
jedisPool=new JedisPool(poolConfig,host,port,20*1000);
}
public Jedis getJedis(){
Jedis jedis = jedisPool.getResource();
return jedis;
}
}
RedisConfig
@Configuration
public class RedisConfig {
//读取配置文件中的redis的ip地址
@Value("${spring.redis.host:disabled}")
private String host;
@Value("${spring.redis.port:0}")
private int port;
@Value("${spring.redis.database:0}")
private int database;
@Bean
public RedisUtil getRedisUtil(){
if(host.equals("disabled")){
return null;
}
RedisUtil redisUtil=new RedisUtil();
redisUtil.initPool(host,port,database);
return redisUtil;
}
}
application.properties
spring.redis.host=redis服务地址
spring.redis.port=6379
spring.redis.database=0
使用测试
@Autowired
RedisUtil redisUtil;
@Test
public void testRedis() {
Jedis jedis = redisUtil.getJedis();
//jedis.set("test","test");
String s = jedis.get("test");
System.out.println(s);
}
业务使用
/**
* 查出所有分类 返回首页json
*/
@Override
public Map<String, List<Catalog2Vo>> getCatalogJson() {
//加入缓存逻辑
Jedis jedis = redisUtil.getJedis();
String catalogJson = jedis.get("catalogJson");
Map<String, List<Catalog2Vo>> json = null;
//缓存存在 转换返回
if (!StringUtils.isEmpty(catalogJson)) {
json = JSONObject.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {
});
return json;
}
//缓存没有从数据查询
json = getCatalogJsonFromDB();
//转成str 加入缓存
String jsonString = JSON.toJSONString(json);
jedis.set("catalogJson", jsonString);
return json;
}
/**
* 从数据库获取数据并封装返回
*
* @return
*/
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDB() {
//查询出所有分类
List<CategoryEntity> selectList = baseMapper.selectList(null);
//先查出所有一级分类
List<CategoryEntity> level1Categorys = getCategorys(selectList, 0L);
//封装数据 map k,v 结构
Map<String, List<Catalog2Vo>> map = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//每一个的一级分类,查到这个一级分类的二级分类
List<CategoryEntity> category2Entities = getCategorys(selectList, v.getCatId());
List<Catalog2Vo> catelog2Vos = null;
if (category2Entities != null) {
catelog2Vos = category2Entities.stream().map(level2 -> {
//封装catalog2Vo
Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), null, level2.getCatId().toString(), level2.getName());
//每一个二级分类,查到三级分类
List<CategoryEntity> category3Entities = getCategorys(selectList, level2.getCatId());
if (category3Entities != null) {
List<Object> catalog3List = category3Entities.stream().map(level3 -> {
//封装catalog3Vo
Catalog2Vo.Catalog3Vo catalog3Vo = new Catalog2Vo.Catalog3Vo(level2.getCatId().toString(), level3.getCatId().toString(), level3.getName());
return catalog3Vo;
}).collect(Collectors.toList());
//封装catalog3Vo到catalog2Vo
catalog2Vo.setCatalog3List(catalog3List);
}
return catalog2Vo;
}).collect(Collectors.toList());
}
//返回v=catalog2Vo
return catelog2Vos;
}));
return map;
}
测试ok
但是这样在高并发下缓存是会存在很多问题的,详情参见以上分布式-Redis-缓存问题
lua脚本:
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
就是加锁设置过期时间保证加锁操作是原子性的,解锁也是同样保持原子性。(原子性简单理解就是1和2两个操作都是要同时ok这整个任务才能算ok)
加锁获取数据
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedisLock() {
Jedis jedis = redisUtil.getJedis();
//加锁,设置过期时间,必须和加锁是同步的,原子的
String token = UUID.randomUUID().toString();
String lock = jedis.set("lock", token, "NX", "EX", 20);
System.out.println(lock);
Map<String, List<Catalog2Vo>> map = null;
//加锁成功
if ("ok".equals(lock)) {
map = getCatalogJsonFromDB();
//删除锁 lua脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList("lock"), Collections.singletonList(token));
return map;
} else {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋
return getCatalogJsonFromDBWithRedisLock();
}
}
Redisson-分布式锁-缓存数据一致性
Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上
项目整合
依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.5</version>
</dependency>
配置类
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+host+":"+port);
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
application.properties
spring.redis.host=redis服务地址
spring.redis.port=6379
spring.redis.database=0
测试
@Autowired
RedissonClient redissonClient;
@Test
public void testRedisson() {
RLock lock = redissonClient.getLock("anyLock");
//最常见的使用方法
lock.lock();
System.out.println(lock);
}
分布式锁
可重入锁(Reentrant Lock)
RLock lock = redisson.getLock("anyLock");
//阻塞式等待。默认加的锁都是30s时间。
lock.lock();
-
锁的自动续期,如果业务超长,运行期间自动给锁线上新的30s,不用担心业务时间长,锁自动过期被删除掉
-
加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了
// 加锁以后10秒钟自动解锁,无需调用unlock方法手动解锁
//自动解锁时间一定要大于业务的执行时间。
lock.lock(10, TimeUnit.SECONDS);
或者
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
Lock. Lock(10, TimeUnit.SECONDS);在锁时间到了以后,会不会自动续期。
-
如果我们传递了锁的超时时间,就执行lua脚本,进行占锁,默认超时就是我们指定的时间。
-
如果我们未指定锁的超时时间,就使用30 * 1000 【LockWatchdogTimeout看门狗的默认时间】
只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】
lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
- 默认值30000
- 监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。如果该看门口未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况
可以异步执行
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。
读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁ReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock(); //读锁
// 或
rwlock.writeLock().lock(); //写锁
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
lock.unlock();
写锁:
@Autowired
RedissonClient redisson;
@Autowired
StringRedisTemplate redisTemplate;
//保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁)。读锁是一个共享锁
//写锁没释放读就必须等待
//读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁
//写 + 读: 等待写锁释放
//写 + 写; 阻塞方式
//读 + 写; 有读锁。写也需要等待
//只要有写的存在,都必须等待
@GetMapping("/write")
@ResponseBody
public String writeValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = lock.writeLock();
try {
//1、改数据加写锁,读数据加读锁
s = UUID.randomUUID().toString();
rLock.lock();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue",s);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rLock.unlock();
}
return s;
}
读锁:
@GetMapping("/read")
@ResponseBody
public String readValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
//加读锁
RLock rLock = lock.readLock();
rLock.lock();
try {
s = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
}finally {
rLock.unlock();
}
return s;
}
//保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁) 。读锁是一个共享锁
//写锁没释放读就必须等待
读锁和写锁顺序
写+读 读等写锁释放
写+写 阻塞方式
读+写 写等读锁释放
读+读 相当无锁,只会在redis中记录好,记录当前的读锁
只要有写的存在,都必须等待
读+读
公平锁(Fair Lock)
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
异步执行的相关方法
RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
模拟放假锁门:
放假了,门卫等待锁门:
/**
* 放假,锁门
* 1班没人了,2
* 5个班全部走完,我们可以锁大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁都完成
return "放假了...";
}
班级走出校门
@GetMapping("/gogogo/{id}")
public String gogogo(@PathVariable("id") Long id){
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown(); //计数减一;
return id+"班的人都走了...";
}
浏览器访问:http://localhost:10001/lockDoor 处于等待状态
浏览器访问:http://localhost:10001/gogogo/1 2 3 4 5 ; 五个班全部走完,上面才能完成锁门
信号量(Semaphore)
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync()
模拟停车:
停车:
/**
* 车库停车
* 3车位
* 信号量也可以用作分布式限流;限制流量
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.acquire(); //获取一个信号,获取一个值,占一个车位
return "ok=>" + b;
}
开走车:
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release(); //释放一个车位
return "ok";
}
Value模拟停车位
访问:http://localhost:10001/park park的Value就 -1
访问:http://localhost:10001/go park的Value就 +1
做分布式限流:
/**
* 车库停车
* 3车位
* 信号量也可以用作分布式限流;限制流量
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
// park.acquire(); //获取一个信号,获取一个值,占一个车位
boolean b = park.tryAcquire(); //返回true或者false,有空位就返回true,没有则返回false
if(b){
//执行业务
}else {
return "error";
}
return "ok=>" + b;
}
更多参见官网:
项目使用
缓存中没有,查询数据库:
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbDbWithRedissonLock() {
//1、锁的名字。锁的粒度,越细越快。
//锁的粒度: 具体缓存的是某个数据, 11-号商品; product-11-lock product-12-lock
RLock lock = redisson.getLock("CatalogJson - lock");
lock.lock();
Map<String, List<Catelog2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb(); //从数据库获取数据的方法
} finally {
lock.unlock();
}
return dataFromDb;
}
缓存数据一致性
缓存数据一致性-解决方案
-
无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
-
1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
-
2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
-
3、缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
-
4、通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心脏数据,允许临时脏数据可忽略);
总结:
- 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
- 我们不应该过度设计,增加系统的复杂性
- 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )canal 解析 binary log 对象(原始为 byte 流)
canal Home
分布式之数据库和缓存双写一致性方案解析
缓存-SpringCache
SpringCache
概述
Spring 3.1 引入了激动人心的基于注释(annotation)的缓存(cache)技术,它本质上不是一个具体的缓存实现方案(例如 EHCache 或者 OSCache),而是一个对缓存使用的抽象,通过在既有代码中添加少量它定义的各种 annotation,即能够达到缓存方法的返回对象的效果。
Spring 的缓存技术还具备相当的灵活性,不仅能够使用 SpEL(Spring Expression Language)来定义缓存的 key 和各种 condition,还提供开箱即用的缓存临时存储方案,也支持和主流的专业缓存例如 EHCache 集成。
其特点总结如下:
- 通过少量的配置 annotation 注释即可使得既有代码支持缓存
- 支持开箱即用 Out-Of-The-Box,即不用安装和部署额外第三方组件即可使用缓存
- 支持 Spring Express Language,能使用对象的任何属性或者方法来定义缓存的 key 和 condition
- 支持 AspectJ,并通过其实现任何方法的缓存支持
- 支持自定义 key 和自定义缓存管理者,具有相当的灵活性和扩展性
一句话解释Spring Cache: 通过注解的方式,利用AOP的思想来解放缓存的管理。
基本概念
CacheManager源码 :
public interface CacheManager {
/**
* Return the cache associated with the given name.
* @param name the cache identifier (must not be {@code null})
* @return the associated cache, or {@code null} if none found
*/
@Nullable
Cache getCache(String name); //按照名字得到一个缓存
/**
* Return a collection of the cache names known by this manager.
* @return the names of all caches known by the cache manager
*/
Collection<String> getCacheNames(); //获取缓存管理器的所有缓存的名字
}
简单示例
创建Spring 项目
pom.xml
引入依赖: spring-boot-starter-cache、spring-boot-starter-data-redis
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.atguigu.gulimall</groupId>
<artifactId>gulimall-product</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>gulimall-product</name>
<description>谷粒商城-商品服务</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>com.atguigu.gulimall</groupId>
<artifactId>gulimall-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 页面修改不重启服务器实时更新 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!-- 模板引擎:thymeleaf -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- 以后使用redisson作为分布式锁,分布式对象等功能框架 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.aliyun.oss</groupId>-->
<!-- <artifactId>aliyun-sdk-oss</artifactId>-->
<!-- <version>3.10.2</version>-->
<!-- </dependency>-->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
创建书籍模型
public class Book {
private String isbn;
private String title;
public Book(String isbn, String title) {
this.isbn = isbn;
this.title = title;
}
public String getIsbn() {
return isbn;
}
public void setIsbn(String isbn) {
this.isbn = isbn;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
@Override
public String toString() {
return "Book{" + "isbn='" + isbn + '\'' + ", title='" + title + '\'' + '}';
}
}
创建书库
public interface BookRepository {
Book getByIsbn(String isbn);
}
模拟延迟存储库
@Component
public class SimpleBookRepository implements BookRepository {
@Override
public Book getByIsbn(String isbn) {
simulateSlowService();
return new Book(isbn, "Some book");
}
//simulateSlowService故意在每个getByIsbn中插入三秒钟的延迟。稍后,将通过缓存来加速该示例。
private void simulateSlowService() {
try {
long time = 3000L;
Thread.sleep(time);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
使用书库
CommandLineRunner注入BookRepository并使用不同参数多次调用
@Component
public class AppRunner implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AppRunner.class);
private final BookRepository bookRepository;
public AppRunner(BookRepository bookRepository) {
this.bookRepository = bookRepository;
}
@Override
public void run(String... args) throws Exception {
logger.info(".... Fetching books");
logger.info("isbn-1234 -->" + bookRepository.getByIsbn("isbn-1234"));
logger.info("isbn-4567 -->" + bookRepository.getByIsbn("isbn-4567"));
logger.info("isbn-1234 -->" + bookRepository.getByIsbn("isbn-1234"));
logger.info("isbn-4567 -->" + bookRepository.getByIsbn("isbn-4567"));
logger.info("isbn-1234 -->" + bookRepository.getByIsbn("isbn-1234"));
logger.info("isbn-1234 -->" + bookRepository.getByIsbn("isbn-1234"));
}
}
此时尝试运行该应用程序,则即使您多次检索完全相同的书,也是非常的慢。以下示例输出显示了我们的(故意延迟)代码创建的三秒延迟:
接下来给获取书方法加上缓存@Cacheable
@Component
public class SimpleBookRepository implements BookRepository {
@Override
@Cacheable("books")
public Book getByIsbn(String isbn) {
simulateSlowService();
return new Book(isbn, "Some book");
}
// simulateSlowService故意在每个getByIsbn中插入三秒钟的延迟。稍后,您将通过缓存来加速该示例。
private void simulateSlowService() {
try {
long time = 3000L;
Thread.sleep(time);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
第二步给启动类加上注解@EnableCaching开启缓存
@SpringBootApplication
//@EnableCaching注释触发后处理器检查每个的Spring bean的公共方法缓存注释的存在。如果找到了这样的注释,则会自动创建一个代理来拦截方法调用并相应地处理缓存行为。
@EnableCaching
public class CacheApplication {
public static void main(String[] args) {
SpringApplication.run(CacheApplication.class, args);
}
}
启动服务发现第一次检索书仍然需要三秒钟。但是,同一本书的第二次和后续时间要快得多,这表明缓存正在完成其工作。
恭喜你!您刚刚在Spring托管的bean上启用了缓存。
注解
@Cacheable 的作用
- 主要针对方法配置,能够根据方法的请求参数对其结果进行缓存
@Cacheable 主要的参数
- value 缓存的名称,在 spring 配置文件中定义,必须指定至少一个 例如:
@Cacheable(value=”mycache”) 或者@Cacheable(value= - key 缓存的 key,可以为空,如果指定要按照 SpEL 表达式编写,如果不指定,则缺省按照方法的所有参数进行组合 例如:@Cacheable(value=”testcache”,key=”#userName”)
- condition 缓存的条件,可以为空,使用 SpEL 编写,返回 true 或者 false,只有为 true 才进行缓存 例如:@Cacheable(value=”testcache”,condition=”#userName.length()>2”)
@CachePut 的作用
- 主要针对方法配置,能够根据方法的请求参数对其结果进行缓存,和 @Cacheable 不同的是,它每次都会触发真实方法的调用
@CachePut 主要的参数
- value 缓存的名称,在 spring 配置文件中定义,必须指定至少一个 例如:
@Cacheable(value=”mycache”) 或者@Cacheable(value= - key 缓存的 key,可以为空,如果指定要按照 SpEL 表达式编写,如果不指定,则缺省按照方法的所有参数进行组合 例如:@Cacheable(value=”testcache”,key=”#userName”)
- condition 缓存的条件,可以为空,使用 SpEL 编写,返回 true 或者 false,只有为 true 才进行缓存 例如:@Cacheable(value=”testcache”,condition=”#userName.length()>2”)
@CachEvict 的作用
- 主要针对方法配置,能够根据一定的条件对缓存进行清空(修改数据后对缓存删除)
@CacheEvict 主要的参数
- value 缓存的名称,在 spring 配置文件中定义,必须指定至少一个 例如:
@CachEvict(value=”mycache”) 或者@CachEvict(value= - key 缓存的 key,可以为空,如果指定要按照 SpEL 表达式编写,如果不指定,则缺省按照方法的所有参数进行组合 例如:@CachEvict(value=”testcache”,key=”#userName”)
- condition 缓存的条件,可以为空,使用 SpEL 编写,返回 true 或者 false,只有为 true 才清空缓存 例如:@CachEvict(value=”testcache”,condition=”#userName.length()>2”)
- allEntries 是否清空所有缓存内容,缺省为 false,如果指定为 true,则方法调用后将立即清空所有缓存 例如:@CachEvict(value=”testcache”,allEntries=true)
- beforeInvocation 是否在方法执行前就清空,缺省为 false,如果指定为 true,则在方法还没有执行的时候就清空缓存,缺省情况下,如果方法执行抛出异常,则不会清空缓存
@Caching
- @Caching注解可以让我们在一个方法或者类上同时指定多个Spring Cache相关的注解。其拥有三个属性:cacheable、put和evict,分别用于指定@Cacheable、@CachePut和@CacheEvict。
基本原理
和 spring 的事务管理类似,spring cache 的关键原理就是 spring AOP,通过 spring AOP,其实现了在方法调用前、调用后获取方法的入参和返回值,进而实现了缓存的逻辑。我们来看一下下面这个图:
上图显示,当客户端“Calling code”调用一个普通类 Plain Object 的 foo() 方法的时候,是直接作用在 pojo 类自身对象上的,客户端拥有的是被调用者的直接的引用。
而 Spring cache 利用了 Spring AOP 的动态代理技术,即当客户端尝试调用 pojo 的 foo()方法的时候,给他的不是 pojo 自身的引用,而是一个动态生成的代理类
如上图所示,这个时候,实际客户端拥有的是一个代理的引用,那么在调用 foo() 方法的时候,会首先调用 proxy 的 foo() 方法,这个时候 proxy 可以整体控制实际的 pojo.foo() 方法的入参和返回值,比如缓存结果,比如直接略过执行实际的 foo() 方法等,都是可以轻松做到的。
整合SpringCache简化缓存开发
1)、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId> spring-boot-starter-data-redis</artifactId>
</dependency>
2)、写配置
- 自动配置了哪些
CacheAuroConfiguration会导入RedisCacheConfiguration;自动配好了缓存管理器RedisCacheManager
- 配置使用redis作为缓存
spring.cache.type=redis
3)、启动类加注解打开缓存
@EnableCaching
4)、方法加注解 @Cacheable(“categorys”)
/**
* 查询所有一级分类
*
* @return
*/
@Override
@Cacheable("categorys")
public List<CategoryEntity> getLevel1Categorys() {
System.out.println("查询所有一级分类");
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
return categoryEntities;
}
- 每一个需要缓存的数据我们都来指定要放到那个名字的缓存.【缓存的分区(按照业务类型分)】
- @Cacheable(“categorys”)
- 代表当前方法的结果需要缓存,如果缓存中有,方法不用调用.
- 如果缓存中没有,会调用方法,最后务方法的结果放入缓存
默认行为
-
如果缓存中有,方法不用调用
-
key默认自动生成,缓存的名字:SimpleKey[] (自主生成的key值)
-
缓存的value值,默认使用jdk序列化机制,将序列化后的数据存到redis
-
默认ttL时间 -1
5)、生成的缓存
自定义
- 指定生成的缓存使用的key
@Cacheable(value = “categorys”, key = “#root.method.name”)
- 指定缓存的数据的存活时间
配置文件中修改ttl,单位为毫秒
spring.cache.redis.time-to-live=3600000
- 将数据保存为json格式
需要修改缓存管理器,spring cache缓存管理器原理:
CacheAutoConfiguration缓存配置类会导入RedisCacheConfiguration;
RedisCacheConfiguration自动配置了RedisdCacheManager;
RedisCacheConfiguration初始化所有的缓存;
每个缓存决定使用什么配置;
如果redisCacheConfiguration有就用已有的,没有就用默认配置;
想改缓存的配置,只需要给容器中放一个RedisCacheConfiguration即可,就会应用到当前RedisCacheManager管理的所有缓存中;
编写MyCacheConfig 注入容器
package com.atguigu.gulimall.product.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.cache.CacheProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@EnableConfigurationProperties(CacheProperties.class)
@Configuration
@EnableCaching
public class MyCacheConfig {
// @Autowired
// CacheProperties cacheProperties;
/**
* 配置文件中的东西没有用上
*
* 1、原来和配置文件绑定的配置类是这样子的
* @ConfigurationProperties(prefix = "spring.cache")
* public class CacheProperties
*
* 2、要让他生效
* @EnableConfigurationProperties(CacheProperties.class)
* @return
*/
@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
// config = config.entryTtl();
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
//将配置文件中的所有配置都生效
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
application.properties
spring.cache.type=redis
#spring.cache.cache-names=
#毫秒为单位
spring.cache.redis.time-to-live=3600000
#如果指定了前缀就用我们指定的前缀,如果没有就默认使用缓存的名字作为前缀
#spring.cache.redis.key-prefix=CACHE_
#默认使用前缀
spring.cache.redis.use-key-prefix=true
#是否缓存空值。防止缓存穿透
spring.cache.redis.cache-null-values=true
ok
这是使用spring.cache.redis.key-prefix=CACHE_的结果
项目中实例:
- @Cacheable---开启缓存
开启category区下的 key=这个注解方法名 的缓存
@Cacheable(value = {"category"},key = "#root.method.name")
- @CacheEvict----失效模式
失效category区下的getLevel1Categorys这个键
@CacheEvict(value = "category",key = "'getLevel1Categorys'")
失效整个category大区
@CacheEvict(value = "category",allEntries = true)
- @Caching---组合使用缓存注解
调用注解下的这个方法,同时失效category大区下的key = 'getLevel1Categorys',key = 'getCatalogJson'
@Caching(evict = {
@CacheEvict(value = "category", key = "'getLevel1Categorys'"),
@CacheEvict(value = "category", key = "'getCatalogJson'")
})
- @CachePut //双写模式
/**
* 级联更新所有关联的数据
*
* @CacheEvict:失效模式
* 1、同时进行多种缓存操作 @Caching
* 2、指定删除某个分区下的所有数据 @CacheEvict(value = "category",allEntries = true)
* 3、存储同意类型的数据,都可以指定成同一个分区。分区名默认就是缓存的前缀
* @param category
*/
//@CacheEvict(value = "category",key = "'getLevel1Categorys'") //清除缓存 这里的key里面为SpEl表达式,所以定义的key加''
// @Caching(evict = {
// @CacheEvict(value = "category", key = "'getLevel1Categorys'"),
// @CacheEvict(value = "category", key = "'getCatalogJson'")
// })
@CacheEvict(value = "category",allEntries = true) //失效模式
// @CachePut //双写模式
@Transactional //事务
@Override
public void updateCascade(CategoryEntity category) {
this.updateById(category);
categoryBrandRelationService.updateCategory(category.getCatId(), category.getName());
//同时修改缓存中的数据
//redis.del("catalogJSON");等待下次主动查询进行更新
}
/**
* 1、查出所有的1级分类
* 2、@Cacheable({"category"})
* 代表当前结果的方法需要缓存,如果缓存中有,方法不用调用。
* 如果缓存中没有,会调用方法,最好将方法的结果放入缓存
* 3、默认行为
* 1)、如果缓存中有,方法不用调用。
* 2)、key默认自动生成:缓存的名字::SimpleKey [](自主生成的key值)
* 3)、缓存的value的值。默认使用jdk序列化机制,将序列化后的数据存到redis
* 4)、默认ttl时间 -1;
*
* 自定义:
* 1)、指定生成的缓存使用的key: key属性指定。接受一个SpEl @Cacheable(value = {"category"},key = "'level1Categorys'")
* SpEl的详细语法:https://docs.spring.io/spring/docs/5.1.17.RELEASE/spring-framework-reference/integration.html#cache-spel-context
* 2)、指定缓存的存活时间: 配置文件中修改ttl spring.cache.redis.time-to-live=3600000 //毫秒为单位
* 3)、将数据保存为json格式
*
*
*
* @return
*/
//每一个需要缓存的数据我们都来指定要放到那个名字的缓存。【缓存的分区(按照业务类型分)】
@Cacheable(value = {"category"},key = "#root.method.name")
@Override
public List<CategoryEntity> getLevel1Categorys() { //查出所有一级分类
long l = System.currentTimeMillis();
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
System.out.println("消耗时间:" + (System.currentTimeMillis() - l));
return categoryEntities;
}
总结
Spirng-Cache的不足
1)读模式:
-
缓存穿透:查询一个null数据。解决:缓存空数据:cache-null-values=true
-
缓存击穿:大量并发进来同时查询一个正好过期的数据。解决:加锁,默认无锁,sync=true
-
缓存雪崩:大量的key同时过期。解决:加随机时间。加上过期时间 spring.cache.redis.time-to-live=3600000
2)写模式:(缓存与数据库一致)
- 1:写入加锁
- 2:引入canal,感受到mysql的变化
- 3:读多写多,直接去数据库查询就行
小结:
- 常规数据(读多写少,即时性,一致性要求不高的数据,完全可以使用spring-cache;写模式(只要缓存的数据有过期时间就足够了))
- 特殊数据:特殊设计
附录1-StringRedisTemplate操作Redis
一、StringRedisTemplate和RedisTemplate的区别
区别如下:
1.两者关系是StringRedisTemplate继承RedisTemplate。
2.两者的数据是不共通的,也就是说StringRedisTemplate只能管理StringRedisTemplate里面的数据,RedisTemplate只能管理RedisTemplate中的数据。
3.使用的序列化类不同。
使用的序列化哪里不同?如下所示:
(1)RedisTemplate使用的是JdkSerializationRedisSerializer 存入数据会将数据先序列化成字节组然后再存入Redis数据库。
(2)StringRedisTemplate使用的是StringRedisSerializer。
使用时注意事项:
(1)当你的Redis数据库里面本来存的是字符串数据或者是你要存取的数据就是字符串类型数据的时候,那么你就使用StringRedisTemplate即可;
(2)但是如果你的数据是复杂的对象类型,而取出的时候又不想做任何数据转换,直接从Redis里面取出一个对象,那么使用RedisTemplate是更好的选择;
(3)RedisTemplate中存取数据都是字节数组。当Redis职工存入的数据是可读形式而非字节数组时,使用RedisTemplate取值的时候会无法获取导出数据,获得的值为null。可以使用StringRedisTemplate试试;
二、配置和RedisTemplate数据操作
lettuce、jedis等操作redis的底层客户端。spring再次对其封装成redisTemplate,所有不论使用其中的哪个客户端,都可以使用redisTemplate进行操作;
配置
pom引入依赖
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
application.yml
spring:
redis:
host: 192.168.109.128
port: 6379
RedisTemplate定义了5种数据结构操作
redisTemplate.opsForValue();//操作字符串
redisTemplate.opsForHash();//操作hash
redisTemplate.opsForList();//操作list
redisTemplate.opsForSet();//操作set
redisTemplate.opsForZSet();//操作有序set
三、StringRedisTemplate常用操作
stringRedisTemplate.opsForValue().set("test", "",*,TimeUnit.SECONDS);//向redis里存入数据和设置缓存时间
stringRedisTemplate.boundValueOps("test").increment(-);//val做-1操作
stringRedisTemplate.opsForValue().get("test")//根据key获取缓存中的val
stringRedisTemplate.boundValueOps("test").increment();//val +1
stringRedisTemplate.getExpire("test")//根据key获取过期时间
stringRedisTemplate.getExpire("test",TimeUnit.SECONDS)//根据key获取过期时间并换算成指定单位
stringRedisTemplate.delete("test");//根据key删除缓存
stringRedisTemplate.hasKey("");//检查key是否存在,返回boolean值
stringRedisTemplate.opsForSet().add("red_123", "","","");//向指定key中存放set集合
stringRedisTemplate.expire("red_123", , TimeUnit.MILLISECONDS);//设置过期时间
stringRedisTemplate.opsForSet().isMember("red_123", "")//根据key查看集合中是否存在指定数据
stringRedisTemplate.opsForSet().members("red_123");//根据key获取set集合
- 单测示例:
@SpringBootTest
class GulimallProductApplicationTests {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void teststringRedisTemplate(){
//hello world
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
//保存
ops.set("hello","world_" + UUID.randomUUID().toString());
//查询
String hello = ops.get("hello");
System.out.println("之前保存的数据是: " + hello);
}
项目中使用:
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson(){
//给缓存中放json字符串,拿出的json字符串,还能逆转为能用的对象类型;【序列化与反序列化】
//1、加入缓存逻辑,缓存中存的数据是json字符串
//JSON跨语言,跨平台兼容。
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (StringUtils.isEmpty(catalogJSON)){
//2、缓存中没有,查询数据库
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
//3、查到的数据再放入缓存,将对象转为json放在缓存中
String s = JSON.toJSONString(catalogJsonFromDb); //利用fastJSON把对象转为json字符串
redisTemplate.opsForValue().set("catalogJSON",s);
return catalogJsonFromDb;
}
//转为我们制定的对象
//fastJSON,把字符串JSON按照我们要转的类型(对象),进行转换; json--->Map<String, List<Catelog2Vo>>
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String, List<Catelog2Vo>>>(){});
return result;
}
上面调用的类:
//从数据库查询并封装分类数据
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
//1、如果缓存中有就用缓存的
// Map<String, List<Catelog2Vo>> catalogJson = (Map<String, List<Catelog2Vo>>) cache.get("catalogJson");
// if (cache.get("catalogJson") == null) {
// //调用业务
// //返回数据又放入缓存
// cache.put("catalogJson",parent_cid);
// }
// return catalogJson;
/**
* 1、将数据库的多次查询变为一次
*
*/
List<CategoryEntity> selectList = baseMapper.selectList(null);
//1、查出所有1级分类
List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);
//2、封装数据
Map<String, List<Catelog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//1、每一个的一级分类,查到这个一级分类的二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
//2、封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo("v.getCatId()", null, l2.getCatId().toString(), l2.getName());
//1、找当前二级分类的三级分类封装成vo
List<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());
if (level3Catelog != null) {
List<Catelog2Vo.Catelog3Vo> collect = level3Catelog.stream().map(l3 -> {
//2、封装成指定格式
Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return catelog3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(collect);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
return parent_cid;
}
private List<CategoryEntity> getParent_cid(List<CategoryEntity> selectList,Long parent_cid) {
List<CategoryEntity> collect = selectList.stream().filter(item -> item.getParentCid() == parent_cid).collect(Collectors.toList());
return collect;
//return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", v.getCatId()));
}
测试结果ok
附录二-Jedis操作各种redis中的数据结构
redis基础
1. 概念: redis是一款高性能的NOSQL系列的非关系型数据库
1.1.什么是NOSQL
NoSQL(NoSQL = Not Only SQL),意即“不仅仅是SQL”,是一项全新的数据库理念,泛指非关系型的数据库。
随着互联网web2.0网站的兴起,传统的关系数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,暴露了很多难以克服的问题,而非关系型的数据库则由于其本身的特点得到了非常迅速的发展。NoSQL数据库的产生就是为了解决大规模数据集合多重数据种类带来的挑战,尤其是大数据应用难题。
1.1.1. NOSQL和关系型数据库比较
优点:
1)成本:nosql数据库简单易部署,基本都是开源软件,不需要像使用oracle那样花费大量成本购买使用,相比关系型数据库价格便宜。
2)查询速度:nosql数据库将数据存储于缓存之中,关系型数据库将数据存储在硬盘中,自然查询速度远不及nosql数据库。
3)存储数据的格式:nosql的存储格式是key,value形式、文档形式、图片形式等等,所以可以存储基础类型以及对象或者是集合等各种格式,而数据库则只支持基础类型。
4)扩展性:关系型数据库有类似join这样的多表查询机制的限制导致扩展很艰难。
缺点:
1)维护的工具和资料有限,因为nosql是属于新的技术,不能和关系型数据库10几年的技术同日而语。
2)不提供对sql的支持,如果不支持sql这样的工业标准,将产生一定用户的学习和使用成本。
3)不提供关系型数据库对事务的处理。
1.1.2. 非关系型数据库的优势:
1)性能NOSQL是基于键值对的,可以想象成表中的主键和值的对应关系,而且不需要经过SQL层的解析,所以性能非常高。
2)可扩展性同样也是因为基于键值对,数据之间没有耦合性,所以非常容易水平扩展。
1.1.3. 关系型数据库的优势:
1)复杂查询可以用SQL语句方便的在一个表以及多个表之间做非常复杂的数据查询。
2)事务支持使得对于安全性能很高的数据访问要求得以实现。对于这两类数据库,对方的优势就是自己的弱势,反之亦然。
1.1.4. 总结
关系型数据库与NoSQL数据库并非对立而是互补的关系,即通常情况下使用关系型数据库,在适合使用NoSQL的时候使用NoSQL数据库,
让NoSQL数据库对关系型数据库的不足进行弥补。
一般会将数据存储在关系型数据库中,在nosql数据库中备份存储关系型数据库的数据
1.2.主流的NOSQL产品
• 键值(Key-Value)存储数据库
相关产品: Tokyo Cabinet/Tyrant、Redis、Voldemort、Berkeley DB
典型应用: 内容缓存,主要用于处理大量数据的高访问负载。
数据模型: 一系列键值对
优势: 快速查询
劣势: 存储的数据缺少结构化
• 列存储数据库
相关产品:Cassandra, HBase, Riak
典型应用:分布式的文件系统
数据模型:以列簇式存储,将同一列数据存在一起
优势:查找速度快,可扩展性强,更容易进行分布式扩展
劣势:功能相对局限
• 文档型数据库
相关产品:CouchDB、MongoDB
典型应用:Web应用(与Key-Value类似,Value是结构化的)
数据模型: 一系列键值对
优势:数据结构要求不严格
劣势: 查询性能不高,而且缺乏统一的查询语法
• 图形(Graph)数据库
相关数据库:Neo4J、InfoGrid、Infinite Graph
典型应用:社交网络
数据模型:图结构
优势:利用图结构相关算法。
劣势:需要对整个图做计算才能得出结果,不容易做分布式的集群方案。
1.3 什么是Redis
Redis是用C语言开发的一个开源的高性能键值对(key-value)数据库,官方提供测试数据,50个并发执行100000个请求,读的速度是110000次/s,写的速度是81000次/s ,且Redis通过提供多种键值数据类型来适应不同场景下的存储需求,目前为止Redis支持的键值数据类型如下:
1) 字符串类型 string
2) 哈希类型 hash
3) 列表类型 list
4) 集合类型 set
5) 有序集合类型 sortedset
1.3.1 redis的应用场景
• 缓存(数据查询、短连接、新闻内容、商品内容等等)
• 聊天室的在线好友列表
• 任务队列。(秒杀、抢购、12306等等)
• 应用排行榜
• 网站访问统计
• 数据过期处理(可以精确到毫秒
• 分布式集群架构中的session分离
2. 下载安装
1. 官网:https://redis.io
2. 中文网:http://www.redis.net.cn/
3. 解压直接可以使用:
* redis.windows.conf:配置文件
* redis-cli.exe:redis的客户端
* redis-server.exe:redis服务器端
3. 命令操作
1. redis的数据结构:
* redis存储的是:key,value格式的数据,其中key都是字符串,value有5种不同的数据结构
* value的数据结构:
1) 字符串类型 string
2) 哈希类型 hash : map格式
3) 列表类型 list : linkedlist格式。支持重复元素
4) 集合类型 set : 不允许重复元素
5) 有序集合类型 sortedset:不允许重复元素,且元素有顺序
2. 字符串类型 string
1. 存储: set key value
127.0.0.1:6379> set username zhangsan
OK
2. 获取: get key
127.0.0.1:6379> get username
"zhangsan"
3. 删除: del key
127.0.0.1:6379> del age
(integer) 1
3. 哈希类型 hash
1. 存储: hset key field value
127.0.0.1:6379> hset myhash username lisi
(integer) 1
127.0.0.1:6379> hset myhash password 123
(integer) 1
2. 获取:
* hget key field: 获取指定的field对应的值
127.0.0.1:6379> hget myhash username
"lisi"
* hgetall key:获取所有的field和value
127.0.0.1:6379> hgetall myhash
1) "username"
2) "lisi"
3) "password"
4) "123"
3. 删除: hdel key field
127.0.0.1:6379> hdel myhash username
(integer) 1
4. 列表类型 list:可以添加一个元素到列表的头部(左边)或者尾部(右边)
1. 添加:
1. lpush key value: 将元素加入列表左表
2. rpush key value:将元素加入列表右边
127.0.0.1:6379> lpush myList a
(integer) 1
127.0.0.1:6379> lpush myList b
(integer) 2
127.0.0.1:6379> rpush myList c
(integer) 3
2. 获取:
* lrange key start end :范围获取
127.0.0.1:6379> lrange myList 0 -1
1) "b"
2) "a"
3) "c"
3. 删除:
* lpop key: 删除列表最左边的元素,并将元素返回
* rpop key: 删除列表最右边的元素,并将元素返回
5. 集合类型 set : 不允许重复元素
1. 存储:sadd key value
127.0.0.1:6379> sadd myset a
(integer) 1
127.0.0.1:6379> sadd myset a
(integer) 0
2. 获取:smembers key:获取set集合中所有元素
127.0.0.1:6379> smembers myset
1) "a"
3. 删除:srem key value:删除set集合中的某个元素
127.0.0.1:6379> srem myset a
(integer) 1
6. 有序集合类型 sortedset:不允许重复元素,且元素有顺序.每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
1. 存储:zadd key score value
127.0.0.1:6379> zadd mysort 60 zhangsan
(integer) 1
127.0.0.1:6379> zadd mysort 50 lisi
(integer) 1
127.0.0.1:6379> zadd mysort 80 wangwu
(integer) 1
2. 获取:zrange key start end [withscores]
127.0.0.1:6379> zrange mysort 0 -1
1) "lisi"
2) "zhangsan"
3) "wangwu"
127.0.0.1:6379> zrange mysort 0 -1 withscores
1) "zhangsan"
2) "60"
3) "wangwu"
4) "80"
5) "lisi"
6) "500"
3. 删除:zrem key value
127.0.0.1:6379> zrem mysort lisi
(integer) 1
7. 通用命令
1. keys * : 查询所有的键
2. type key : 获取键对应的value的类型
3. del key:删除指定的key value
4. 持久化
1. redis是一个内存数据库,当redis服务器重启,获取电脑重启,数据会丢失,我们可以将redis内存中的数据持久化保存到硬盘的文件中。
2. redis持久化机制:
1. RDB:默认方式,不需要进行配置,默认就使用这种机制
* 在一定的间隔时间中,检测key的变化情况,然后持久化数据
1. 编辑redis.windwos.conf文件
# after 900 sec (15 min) if at least 1 key changed
save 900 1
# after 300 sec (5 min) if at least 10 keys changed
save 300 10
# after 60 sec if at least 10000 keys changed
save 60 10000
2. 重新启动redis服务器,并指定配置文件名称
D:\JavaWeb2018\day23_redis\资料\redis\windows-64\redis-2.8.9>redis-server.exe redis.windows.conf
2. AOF:日志记录的方式,可以记录每一条命令的操作。可以每一次命令操作后,持久化数据
1. 编辑redis.windwos.conf文件
appendonly no(关闭aof) --> appendonly yes (开启aof)
# appendfsync always : 每一次操作都进行持久化
appendfsync everysec : 每隔一秒进行一次持久化
# appendfsync no : 不进行持久化
5. Java客户端 Jedis
* Jedis: 一款java操作redis数据库的工具.
* 使用步骤:
1. 下载jedis的jar包
Jedis操作redis
Jedis: 一款java操作redis数据库的工具,下面是使用方法
* Jedis操作各种redis中的数据结构
1) 字符串类型 string
set
get
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
//存储
jedis.set("username","zhangsan");
//获取
String username = jedis.get("username");
System.out.println(username);
//可以使用setex()方法存储可以指定过期时间的 key value
jedis.setex("activecode",20,"hehe");//将activecode:hehe键值对存入redis,并且20秒后自动删除该键值对
//3. 关闭连接
jedis.close();
2) 哈希类型 hash : map格式
hset
hget
hgetAll
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// 存储hash
jedis.hset("user","name","lisi");
jedis.hset("user","age","23");
jedis.hset("user","gender","female");
// 获取hash
String name = jedis.hget("user", "name");
System.out.println(name);
// 获取hash的所有map中的数据
Map<String, String> user = jedis.hgetAll("user");
// keyset
Set<String> keySet = user.keySet();
for (String key : keySet) {
//获取value
String value = user.get(key);
System.out.println(key + ":" + value);
}
//3. 关闭连接
jedis.close();
3) 列表类型 list : linkedlist格式。支持重复元素
lpush / rpush
lpop / rpop
lrange start end : 范围获取
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// list 存储
jedis.lpush("mylist","a","b","c");//从左边存
jedis.rpush("mylist","a","b","c");//从右边存
// list 范围获取
List<String> mylist = jedis.lrange("mylist", 0, -1);
System.out.println(mylist);
// list 弹出
String element1 = jedis.lpop("mylist");//c
System.out.println(element1);
String element2 = jedis.rpop("mylist");//c
System.out.println(element2);
// list 范围获取
List<String> mylist2 = jedis.lrange("mylist", 0, -1);
System.out.println(mylist2);
//3. 关闭连接
jedis.close();
4) 集合类型 set : 不允许重复元素
sadd
smembers:获取所有元素
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// set 存储
jedis.sadd("myset","java","php","c++");
// set 获取
Set<String> myset = jedis.smembers("myset");
System.out.println(myset);
//3. 关闭连接
jedis.close();
5) 有序集合类型 sortedset:不允许重复元素,且元素有顺序
zadd
zrange
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// sortedset 存储
jedis.zadd("mysortedset",3,"亚瑟");
jedis.zadd("mysortedset",30,"后裔");
jedis.zadd("mysortedset",55,"孙悟空");
// sortedset 获取
Set<String> mysortedset = jedis.zrange("mysortedset", 0, -1);
System.out.println(mysortedset);
//3. 关闭连接
jedis.close();
jedis连接池: JedisPool
* jedis连接池: JedisPool
* 使用:
1. 创建JedisPool连接池对象
2. 调用方法 getResource()方法获取Jedis连接
//0.创建一个配置对象
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(50);
config.setMaxIdle(10);
//1.创建Jedis连接池对象
JedisPool jedisPool = new JedisPool(config,"localhost",6379);
//2.获取连接
Jedis jedis = jedisPool.getResource();
//3. 使用
jedis.set("hehe","heihei");
//4. 关闭 归还到连接池中
jedis.close();
* 连接池工具类
public class JedisPoolUtils {
private static JedisPool jedisPool;
static{
//读取配置文件
InputStream is = JedisPoolUtils.class.getClassLoader().getResourceAsStream("jedis.properties");
//创建Properties对象
Properties pro = new Properties();
//关联文件
try {
pro.load(is);
} catch (IOException e) {
e.printStackTrace();
}
//获取数据,设置到JedisPoolConfig中
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(Integer.parseInt(pro.getProperty("maxTotal")));
config.setMaxIdle(Integer.parseInt(pro.getProperty("maxIdle")));
//初始化JedisPool
jedisPool = new JedisPool(config,pro.getProperty("host"),Integer.parseInt(pro.getProperty("port")));
标签:缓存,java,spring,redis,project,jedis,key,return,gl
From: https://www.cnblogs.com/my-global/p/17182365.html