首页 > 其他分享 >RabbitMQ的几种应用场景

RabbitMQ的几种应用场景

时间:2024-01-24 19:23:30浏览次数:22  
标签:场景 pika exchange RabbitMQ 几种 queue message channel

之前的几篇文章介绍了一下RabbitMQ的概念以及环境的搭建和配置,有了RabbitMQ环境就可以基于其实现一些特殊的任务场景了。RabbitMQ官方有个很好的Tutorials基本覆盖了RabbitMQ的各中常见应用场景,现以代码加注释的方式以其Python客户端pika为例简单介绍如下。更详尽的信息可参阅:http://www.rabbitmq.com/getstarted.html 。

之前的几篇文章:
RabbitMQ概念及环境搭建(一)单节点安装与配置
RabbitMQ概念及环境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及环境搭建(三)RabbitMQ cluster
RabbitMQ概念及环境搭建(四)RabbitMQ High Availability
RabbitMQ概念及环境搭建(五)与web的整合

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

应用场景1-“Hello Word”

一个P向queue发送一个message,一个C从该queue接收message并打印。

send.py 
producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。

#!/usr/bin/python27
#encoding:utf8
import pika
 
#与RabbitMQ Server建立连接
#连接到的broker在本机-localhost上
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明队列以向其发送消息消息
#向不存在的位置发送消息时RabbitMQ将消息丢弃
#queue='hello'指定队列名字
channel.queue_declare(queue='hello', durable=True)
 
#message不能直接发送给queue,需经exchange到达queue,此处使用以空字符串标识的默认的exchange
#使用默认exchange时允许通过routing_key明确指定message将被发送给哪个queue
#body参数指定了要发送的message内容
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
 
print " [x] Sent 'Hello World!'"
 
#关闭与RabbitMq Server间的连接
connection.close()
  receive.py 
consumer,连接至RabbitMQ Server,声明队列,接收消息并进行处理这里为打印出消息,退出。
#!/usr/bin/env python
#encoding:utf8
import pika
 
#建立到达RabbitMQ Server的connection
#此处RabbitMQ Server位于本机-localhost
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明queue,确认要从中接收message的queue
#queue_declare函数是幂等的,可运行多次,但只会创建一次
#若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
#但在producer和consumer中重复声明queue是一个好的习惯
channel.queue_declare(queue='hello')
 
print ' [*] Waiting for messages. To exit press CTRL+C'
 
#定义回调函数
#一旦从queue中接收到一个message回调函数将被调用
#ch:channel
#method:
#properties:
#body:message
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
 
#从queue接收message的参数设置
#包括从哪个queue接收message,用于处理message的callback,是否要确认message
#默认情况下是要对消息进行确认的,以防止消息丢失。
#此处将no_ack明确指明为True,不对消息进行确认。
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
 
#开始循环从queue中接收message并使用callback进行处理
channel.start_consuming()

 

测试
python send.py
python receive.py

应用场景2-work queues

将耗时的消息处理通过队列分配给多个consumer来处理,我们称此处的consumer为worker,我们将此处的queue称为Task Queue,其目的是为了避免资源密集型的task的同步处理,也即立即处理task并等待完成。相反,调度task使其稍后被处理。也即把task封装进message并发送到task queue,worker进程在后台运行,从task queue取出task并执行job,若运行了多个worker,则task可在多个worker间分配。


new_task.py
建立连接,声明队列,发送可以模拟耗时任务的message,断开连接、退出。

#!/usr/bin/env python
#encoding:utf8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#仅仅对message进行确认不能保证message不丢失,比如RabbitMQ崩溃了queue就会丢失
#因此还需使用durable=True声明queue是持久化的,这样即便Rabb崩溃了重启后queue仍然存在
channel.queue_declare(queue='task_queue', durable=True)
 
#从命令行构造将要发送的message
message = ' '.join(sys.argv[1:]) or "Hello World!"
 
#除了要声明queue是持久化的外,还需声明message是持久化的
#basic_publish的properties参数指定message的属性
#此处pika.BasicProperties中的delivery_mode=2指明message为持久的
#这样一来RabbitMQ崩溃重启后queue仍然存在其中的message也仍然存在
#需注意的是将message标记为持久的并不能完全保证message不丢失,因为
#从RabbitMQ接收到message到将其存储到disk仍需一段时间,若此时RabbitMQ崩溃则message会丢失
#况且RabbitMQ不会对每条message做fsync动作
#可通过publisher confirms实现更强壮的持久性保证
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()
  worker.py
建立连接,声明队列,不断的接收message,处理任务,进行确认。
#!/usr/bin/env python
#encoding:utf8
import pika
import time
 
#默认情况RabbirMQ将message以round-robin方式发送给下一个consumer
#每个consumer接收到的平均message量是一样的
#可以同时运行两个或三个该程序进行测试
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#仅仅对message进行确认不能保证message不丢失,比如RabbitMQ崩溃了
#还需使用durable=True声明queue是持久化的,这样即便Rabb崩溃了重启后queue仍然存在其中的message不会丢失
#RabbitMQ中不允许使用不同的参数定义同名queue
channel.queue_declare(queue='task_queue', durable=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
 
#回调函数,函数体模拟耗时的任务处理:以message中'.'的数量表示sleep的秒数
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    #对message进行确认
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
#若存在多个consumer每个consumer的负载可能不同,有些处理的快有些处理的慢
#RabbitMQ并不管这些,只是简单的以round-robin的方式分配message
#这可能造成某些consumer积压很多任务处理不完而一些consumer长期处于饥饿状态
#可以使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer处理并确认了上一个message后才分配新的message给他
#否则分给另一个空闲的consumer
channel.basic_qos(prefetch_count=1)
 
#这里移除了no_ack=True这个参数,也即需要对message进行确认(默认行为)
#否则consumer在偶然down后其正在处理和分配到该consumer还未处理的message可能发生丢失
#因为此时RabbitMQ在发送完message后立即从内存删除该message
#假如没有设置no_ack=True则consumer在偶然down掉后其正在处理和分配至该consumer但还未来得及处理的message会重新分配到其他consumer
#没有设置no_ack=True则consumer在收到message后会向RabbitMQ反馈已收到并处理了message告诉RabbitMQ可以删除该message
#RabbitMQ中没有超时的概念,只有在consumer down掉后重新分发message
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

 

测试

python new_task.py "A very hard task which takes two seconds.."
python worker.py

 

应用场景3-Publish/Subscribe

在应用场景2中一个message(task)仅被传递给了一个comsumer(worker)。现在我们设法将一个message传递给多个consumer。这种模式被称为publish/subscribe。此处以一个简单的日志系统为例进行说明。该系统包含一个log发送程序和一个log接收并打印的程序。由log发送者发送到queue的消息可以被所有运行的log接收者接收。因此,我们可以运行一个log接收者直接在屏幕上显示log,同时运行另一个log接收者将log写入磁盘文件。

 


receive_logs.py
日志消息接收者:建立连接,声明exchange,将exchange与queue进行绑定,开始不停的接收log并打印。

#!/usr/bin/env python
#encoding:utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#作为好的习惯,在producer和consumer中分别声明一次以保证所要使用的exchange存在
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
#在不同的producer和consumer间共享queue时指明queue的name是重要的
#但某些时候,比如日志系统,需要接收所有的log message而非一个子集
#而且仅对当前的message 流感兴趣,对于过时的message不感兴趣,那么
#可以申请一个临时队列这样,每次连接到RabbitMQ时会以一个随机的名字生成
#一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
result = channel.queue_declare(exclusive=True)
 
#用于获取临时queue的name
queue_name = result.method.queue
 
#exchange与queue之间的关系成为binding
#binding告诉exchange将message发送该哪些queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print ' [*] Waiting for logs. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
    print " [x] %r" % (body,)
 
#从指定地queue中consume message且不确认
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
  emit_log.py
日志消息发送者:建立连接,声明fanout类型的exchange,通过exchage向queue发送日志消息,消息被广播给所有接收者,关闭连接,退出。
#!/usr/bin/env python
#encoding:utf8
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#producer只能通过exchange将message发给queue
#exchange的类型决定将message路由至哪些queue
#可用的exchange类型:direct\topic\headers\fanout
#此处定义一个名称为'logs'的'fanout'类型的exchange,'fanout'类型的exchange简单的将message广播到它所知道的所有queue
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
 
#将message publish到名为log的exchange中
#因为是fanout类型的exchange,这里无需指定routing_key
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
 
print " [x] Sent %r" % (message,)
 
connection.close()
  测试
python receive_logs.py
python emit_log.py "info: This is the log message"

 

应用场景4-Routing

应用场景3中构建了简单的log系统,可以将log message广播至多个receiver。现在我们将考虑只把指定的message类型发送给其subscriber,比如,只把error message写到log file而将所有log message显示在控制台。


receive_logs_direct.py
log message接收者:建立连接,声明direct类型的exchange,声明queue,使用提供的参数作为routing_key将queue绑定到exchange,开始循环接收log message并打印。

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明一个名为direct_logs类型为direct的exchange
#同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
#从命令行获取参数:routing_key
severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
    sys.exit(1)
 
for severity in severities:
    #exchange和queue之间的binding可接受routing_key参数
    #该参数的意义依赖于exchange的类型
    #fanout类型的exchange直接忽略该参数
    #direct类型的exchange精确匹配该关键字进行message路由
    #对多个queue使用相同的binding_key是合法的
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print ' [*] Waiting for logs. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

emit_log_direct.py
log message发送者:建立连接,声明direct类型的exchange,生成并发送log message到exchange,关闭连接,退出。

#!/usr/bin/env python
#encoding:utf8
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明一个名为direct_logs的direct类型的exchange
#direct类型的exchange
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
#从命令行获取basic_publish的配置参数
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
 
#向名为direct_logs的exchage按照设置的routing_key发送message
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
 
print " [x] Sent %r:%r" % (severity, message)
connection.close()

 

测试:

python receive_logs_direct.py info
python emit_log_direct.py info "The message"

 

应用场景5-topic

应用场景4中改进的log系统中用direct类型的exchange替换应用场景3中的fanout类型exchange实现将不同的log message发送给不同的subscriber(也即分别通过不同的routing_key将queue绑定到exchange,这样exchange便可将不同的message根据message内容路由至不同的queue)。但仍然存在限制,不能根据多个规则路由消息,比如接收者要么只能收error类型的log message要么只能收info类型的message。如果我们不仅想根据log的重要级别如info、warning、error等来进行log message路由还想同时根据log message的来源如auth、cron、kern来进行路由。为了达到此目的,需要topic类型的exchange。topic类型的exchange中routing_key中可以包含两个特殊字符:“*”用于替代一个词,“#”用于0个或多个词。

receive_logs_topic.py
log message接收者:建立连接,声明topic类型的exchange,声明queue,根据程序参数构造routing_key,根据routing_key将queue绑定到exchange,循环接收并处理message。

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明一个名为direct_logs类型为direct的exchange
#同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
#从命令行获取参数:routing_key
severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
    sys.exit(1)
 
for severity in severities:
    #exchange和queue之间的binding可接受routing_key参数
    #该参数的意义依赖于exchange的类型
    #fanout类型的exchange直接忽略该参数
    #direct类型的exchange精确匹配该关键字进行message路由
    #对多个queue使用相同的binding_key是合法的
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print ' [*] Waiting for logs. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

emit_log_topic.py
log message发送者:建立连接、声明topic类型的exchange、根据程序参数构建routing_key和要发送的message,以构建的routing_key将message发送给topic类型的exchange,关闭连接,退出。

#!/usr/bin/env python
#encoding:utf8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明一个名为topic_logs的topic类型的exchange
#topic类型的exchange可通过通配符对message进行匹配从而路由至不同queue
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
 
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
 
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

 

测试:

python receive_logs_topic.py "*.rabbit"
python emit_log_topic.py red.rabbit Hello


应用场景6-PRC

在应用场景2中描述了如何使用work queue将耗时的task分配到不同的worker中。但是,如果我们task是想在远程的计算机上运行一个函数并等待返回结果呢。这根场景2中的描述是一个完全不同的故事。这一模式被称为远程过程调用。现在,我们将构建一个RPC系统,包含一个client和可扩展的RPC server,通过返回斐波那契数来模拟RPC service。


rpc_server.py
RPC server:建立连接,声明queue,定义了一个返回指定数字的斐波那契数的函数,定义了一个回调函数在接收到包含参数的调用请求后调用自己的返回斐波那契数的函数并将结果发送到与接收到message的queue相关联的queue,并进行确认。开始接收调用请求并用回调函数进行请求处理。

#!/usr/bin/env python
#encoding:utf8
import pika
 
#建立到达RabbitMQ Server的connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
#声明一个名为rpc_queue的queue
channel.queue_declare(queue='rpc_queue')
 
#计算指定数字的斐波那契数
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
#回调函数,从queue接收到message后调用该函数进行处理
def on_request(ch, method, props, body):
    #由message获取要计算斐波那契数的数字
    n = int(body)
 
    print " [.] fib(%s)"  % (n,)
    #调用fib函数获得计算结果
    response = fib(n)
    
    #exchage为空字符串则将message发送个到routing_key指定的queue
    #这里queue为回调函数参数props中reply_ro指定的queue
    #要发送的message为计算所得的斐波那契数
    #properties中correlation_id指定为回调函数参数props中co的rrelation_id
    #最后对消息进行确认
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
#只有consumer已经处理并确认了上一条message时queue才分派新的message给它
channel.basic_qos(prefetch_count=1)
 
#设置consumeer参数,即从哪个queue获取消息使用哪个函数进行处理,是否对消息进行确认
channel.basic_consume(on_request, queue='rpc_queue')
 
print " [x] Awaiting RPC requests"
 
#开始接收并处理消息
channel.start_consuming()
  rpc_client.py
RPC client:远程过程调用发起者:定义了一个类,类中初始化到RabbitMQ Server的连接、声明回调queue、开始在回调queue上等待接收响应、定义了在回调queue上接收到响应后的处理函数on_response根据响应关联的correlation_id属性作出响应、定义了调用函数并在其中向调用queue发送包含correlation_id等属性的调用请求、初始化一个client实例,以30为参数发起远程过程调用。
#!/usr/bin/env python
#encoding:utf8
import pika
import uuid
 
#在一个类中封装了connection建立、queue声明、consumer配置、回调函数等
class FibonacciRpcClient(object):
    def __init__(self):
        #建立到RabbitMQ Server的connection
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
 
        self.channel = self.connection.channel()
        
        #声明一个临时的回调队列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        #此处client既是producer又是consumer,因此要配置consume参数
        #这里的指明从client自己创建的临时队列中接收消息
        #并使用on_response函数处理消息
        #不对消息进行确认
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
    
    #定义回调函数
    #比较类的corr_id属性与props中corr_id属性的值
    #若相同则response属性为接收到的message
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        #初始化response和corr_id属性
        self.response = None
        self.corr_id = str(uuid.uuid4())
       
        #使用默认exchange向server中定义的rpc_queue发送消息
        #在properties中指定replay_to属性和correlation_id属性用于告知远程server
        #correlation_id属性用于匹配request和response
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   #message需为字符串
                                   body=str(n))
 
        while self.response is None:
            self.connection.process_data_events()
        
        return int(self.response)
 
#生成类的实例
fibonacci_rpc = FibonacciRpcClient()
 
print " [x] Requesting fib(30)"
#调用实例的call方法
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)

 

测试:

 

python rpc_server.py
python rpc_client.py

 

 

 

标签:场景,pika,exchange,RabbitMQ,几种,queue,message,channel
From: https://www.cnblogs.com/-hz01/p/17985675

相关文章

  • Python三方库:Pika(RabbitMQ基础使用)
    Python有多种插件都支持RabbitMQ,本文介绍的是RabbitMQ推荐的Pika插件。使用pip直接安装即可pipinstallpika。一、RabbitMQ简介1.MQ简介MQ(MessageQueue,消息队列),是一个在消息传输过程中保存消息的容器,多用在分布式系统之间进行通信。MQ优势应用解耦:提高系统容错性和可......
  • 大模型系列|垂直大模型的几种训练策略(一)
     1目前垂直行业大模型的几种训练策略参考:大模型时代-行业落地的再思考重新训练:使用通用数据和领域数据混合,fromscratch(从头开始)训练了一个大模型,最典型的代表就是BloombergGPT。二次预训练:在一个通用模型的基础上做continuepretraining(继续预训练,二次预训练),像LawGPT就......
  • 数字孪生在能源电力行业的技术难点和应用场景
    数字孪生的关键技术数字孪生技术架构包括物理空间、数字空间和信息处理三个部分,基于物联网和虚拟仿真的底层技术,实现了真实空间和虚拟空间之间的双向数据交换、指挥控制和虚拟现实联动,以满足这些需求,必须依靠五项关键技术。一是智能传感技术,高精度、高灵敏度的传感监测系统是实现数......
  • 前端性能测试 - 页面运行性能测试场景拓展
    前言对于前端场景复杂且需保持常态开启的页面(例如在线表格、高频大屏),客户在实际应用场景中长时间使用会很容易出现性能问题。但是在测试场景下,并不是那么容易观察到,为了更好的评估和描述出现的问题,当前主要对页面性能场景作以下分类:下载页面性能表现运行时性能表现这篇文章主要跟大......
  • ERROR:Only one ConfirmCallback is supported by each RabbitTemplate] with root cau
     错误:OnlyoneConfirmCallbackissupportedbyeachRabbitTemplate]withrootcause 原因:因为Spring的Bean默认都是单例;而RabbitTemplate对象同样支持一个回调。 解决:使用@Scope("prototype")可通知Spring将被注解的Bean变为多例。代码: //改Ra......
  • 工业RTU串口网关有哪些使用用途和使用场景
    工业RTU串口网关主要以串口形式实现对设备的链接和数据采集、传输,具有设备对接方便、设备对接数量多、系统整体稳定性高、部署快捷等优势,可以广泛应用于各种工业领域。本篇就为大家简单介绍一下工业串口网关的使用用途和使用场景: 1、系统兼容集成在工业物联网发展过程中,可能已......
  • fMP4流媒体视频格式特点及其应用场景介绍
    近期我们在视频监控管理平台EasyCVR系统中新增了HTTP-FMP4播放协议,今天我们就来聊聊该协议的特点和应用。fMP4(FragmentedMPEG-4)是基于MPEG-4Part12的流媒体格式,是流媒体的一项重要技术,它能通过互联网传送高质量的视频内容。FMP4格式将整个视频文件分割为多个小片段,每个片段包含......
  • 有了Composition API后,有些场景或许你不需要pinia了
    前言日常开发时有些业务场景功能很复杂,如果将所有代码都写在一个vue组件中,那个vue文件的代码量可能就几千行了,维护极其困难。这时我们就需要将其拆分为多个组件,拆完组件后就需要在不同组件间共享数据和业务逻辑。有的小伙伴会选择将数据和业务逻辑都放到pinia中,这样虽然可以解决......
  • clickhouse 删除数据的几种常见的方式
    clickhouse数据库清理数据的方式很多,各有自己的优缺点,正面介绍几种常见的方式。一、执行delete方式此种方式为异步执行,并不是实时的。##DELETE操作--删除记录altertableck_dev01deletewhereid='33';--删除分片表数据altertableck_dev01onclustermain_clusterwher......
  • RabbitMq批量删除队列
    RabbitMq批量删除队列​ 由于部分公司同事使用RabbitMq时,没有将Client设置为autodelete,导致大量冗余队列。其中这些队列又是无routekey队列,收到了批量的订阅消息,占用服务器内存。​ 如何将这些无用的队列删除成为一个问题?经过多次摸索,在rabbitmqmanagementapi里面找到了方案:u......