1.订阅的关系维护
redis的所有频道的订阅关系都维护在pubsub_channels字典里面,这个字典的key是被订阅的频道,而value是订阅客户端的链表。
struct redisServer {
// ...
// 保存所有订阅关系
dict *pubsub_channels;
// ...
}
字典示例:
与此类似,服务器也将所有模式的订阅关系保存在pubsub_patterns属性内:
struct redisServer {
// ...
// 链表保存所有订阅模式关系
list *pubsub_patterns;
// ...
}
2.订阅频道与模式
每当客户端执行SUBSCRIBLE命令订阅频道的时候,会将频道订阅关系记录到pubsub_channels字典中进行跟踪,且同时会记录模式订阅关系
频道订阅关系可以使用如下伪代码来描述,模式订阅类似就不再赘述:
def subscrible(*all_input_channels):
# 如果该频道没有被订阅,则直接加入字典,如果已有订阅,则在当前的key追加client
for input_channel in all_input_channels:
if input_channel not in server.pubsub_channels:
server.pubsub_channels[input_channel] = []
server.pubsub_channels[input_channel].append(client)
3.发送消息
当一个redis客户端执行PUBLIASH 发送消息到对应的channel的时候,服务器会执行两个操作:
-
将消息发送给频道的所有订阅者
-
如果有一个或多个模式与频道匹配,那么将发送给这些模式的订阅者
下面举例说明一下频道订阅的发送:
def channel_publish(channel, message):
# 如果频道不在 pubsub_channels内说明无人订阅直接返回
if channel not in server.pubsub_channels:
return
# 如果在的话说明至少有一个订阅者,遍历将消息发送
for subcrible in server.pubsub_channels[channel]:
send_message(subcrible, message)
4.利用有序集合来实现延迟队列
Redis对于MQ来说,其实没有什么高级特性,也没有ACK机制,如果对消息的准确性要求较高,还是建议使用MQ。
Redis的有序集合叫做ZSET,zset的结构如下:
struct zset {
skiplist *skiplist;
dict *dict;
}
zset同时使用了字典和跳表,同时使用这两种特性同时保证了范围查询和单个SCORE查询的效率,skiplist可以很好的支持ZRANGE查询,而dict可以很好的支持查询某个对象的SCORE。
可以看一下ZSET的数据结构
如果要使用该特性来实现延迟队列,那么我们需要在SCORE内存储对应的延迟时间,我们通过客户端取数据的时候需要用SCORE来进行过滤,获取当前时间该处理的数据。
标签:订阅,频道,redis,channels,发布,input,channel,pubsub From: https://www.cnblogs.com/xu-xiaofeng/p/18180888