Redis的发布订阅(Pub/Sub)是一种消息传递模式,允许消息的发布者(publisher)将消息发送给订阅者(subscriber)。
基本概念:
- 发布者(Publisher):发布消息的客户端。
- 订阅者(Subscriber):接收并处理发布者发送的消息的客户端。
- 频道(Channel):消息被发布到的特定通道。发布者将消息发布到指定频道,接受者从频道获取消息。
1、相关命令
订阅频道:subscribe aaaMsg
aaaMsg为自定义的频道的名称。
也可以同时订阅多个频道 subscribe aaaMsg bbbMsg
发布消息:publish aaaMsg 123456
aaaMsg为频道名称,123456为发布消息的内容
"1" 标识有一个订阅者接收到了信息
基于模式的频道订阅:psubscribe aaa? bbb* ccc?*
其中 ? 表示任意一个字符。 * 表示0个或任意多个字符。显然 ?* 就表示一个或任意多个字符
2、Jedis发布订阅
Jedis是Java中的Redis客户端,如何通过Jedis完成上述发布订阅过程呢?
添加依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.1</version>
</dependency>
public class Main {
public static void main(String[] args) throws IOException {
String host = "xxx.xxx.xxx.xxx";
String pass = "xxxxx";
Jedis jedis = new Jedis(host, 6379);
jedis.auth(pass);
// 订阅会阻塞线程
new Thread(()->{
jedis.subscribe(new JedisPubSub() {
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("订阅的频道: "+channel+", 当前jedis实例已订阅的频道数: "+subscribedChannels);
}
@Override
public void onMessage(String channel, String message) {
System.out.println(channel+" :: "+message);
}
},"aaaMsg", "bbbMsg");
}).start();
// 注意一个 jedis 连接发起订阅后就不能执行 (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT 之外的命令了
Jedis jedis2 = new Jedis(host, 6379);
jedis2.auth(pass);
jedis2.publish("aaaMsg","消息内容xxxxx");
}
}
3、SpringBoot中使用发布订阅
创建消息监听器:
@Component
public class MyMsgListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("消息来自频道:"+new String(message.getChannel()));
// springboot 中消息是可以传递对象的,所以收到的是消息序列化后的二进制码
// 由于没有自定义redis的序列化方式可以通过下面的方式获取默认的
RedisSerializer defaultSerializer = redisTemplate.getValueSerializer();
// 当然默认的就是 JdkSerializationRedisSerializer ,自己new一个也行
// JdkSerializationRedisSerializer defaultSerializer = new JdkSerializationRedisSerializer();
System.out.println("消息内容:"+defaultSerializer.deserialize(message.getBody()));
System.out.println("命中频道的规则"+new String(pattern));
}
}
配置监听器:
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(MyMsgListener myMsgListener, RedisConnectionFactory factory){
// 监听器需要放到容器中
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// PatternTopic 为模式的频道订阅(可以使用通配符),ChannelTopic为普通频道
container.addMessageListener(new MessageListenerAdapter(myMsgListener), List.of(new PatternTopic("aaaMsg*"), new ChannelTopic("bbb")));
return container;
}
}
发送消息:
@SpringBootApplication
public class App implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Autowired
private RedisTemplate redisTemplate;
@Override
public void run(String... args) throws Exception {
redisTemplate.convertAndSend("aaaMsg", "123456");
}
}
运行结果:
4、Redis发布订阅的特点
消息不会持久化
发布消息后就算没有订阅者接收,消息也会失效。就算之后再有订阅者订阅此频道,之前的消息也不会发送给他。故而redis的发布订阅适用于「实时但是可靠性要求不高的场景」。
消息可以跨库传播
如A服务连接1号数据库,B服务连接2号数据库,在A服务中发布消息,在B服务中的订阅者依然可以收到信息。
此特性可用于跨库缓存删除。如A服务维护Student数据(crud),B服务则是使用Student数据进行其他业务逻辑。两者属于不同的项目可能各自采用不同的数据库。这时就可以通过发布订阅解决缓存删除问题。