首页 > 其他分享 >rabbitmq 延时消息队列

rabbitmq 延时消息队列

时间:2023-01-31 17:44:57浏览次数:44  
标签:cron republish exchange 队列 mercadolibre Marketpublish rabbitmq queue 延时

//rabbitmq 延时消息队列 生产端 demo
//1.将消息发送到延时交换机对应的队列上delay-queue,指定过期时间;过期后转发的交换机和绑定的key
//2.过期时间过期后将消息转发到新的队列上;
//3.绑定新队列的消费者消费消息,到达延时消费的目的

 

1.//rabiitmq 延时消息队列 生产端 demo


//1.将消息发送到延时交换机对应的队列上delay-queue,指定过期时间;过期后转发的交换机和绑定的key
//2.过期时间过期后将消息转发到新的队列上;
//3.绑定新队列的消费者消费消息,到达延时消费的目的
public static void SendYanShi(string i)
{
var factory = new ConnectionFactory();
factory.HostName = "IP";//RabbitMQ服务在本地运行
factory.UserName = "admin";//用户名
factory.Password = "***";//密码
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{

Dictionary<string, object> dic = new Dictionary<string, object>();
dic = new Dictionary<string, object>();
dic.Add("x-dead-letter-exchange", "Marketpublish-cron-mercadolibre-republish-site-exchange");//过期消息转向到新消息的交换机
dic.Add("x-dead-letter-routing-key", "key_1");//过期消息转向路由相匹配routingkey

//美客多重刊地方站点:延时交换机,延时队列名称:
channel.QueueDeclare(queue: "Marketpublish-cron-mercadolibre-republish-site-delay-queue", durable: true, exclusive: false, autoDelete: false, arguments: dic);
channel.ExchangeDeclare(exchange: "Marketpublish-cron-mercadolibre-republish-site-delay-exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

//美客多重刊地方站点:交换机,队列名称:
channel.QueueDeclare(queue: "Marketpublish-cron-mercadolibre-republish-site-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);


#region MyRegion
{
//绑定美客多重刊地方站点:延时交换机,延时队列名称:
channel.QueueBind(queue: "Marketpublish-cron-mercadolibre-republish-site-delay-queue",
exchange: "Marketpublish-cron-mercadolibre-republish-site-delay-exchange",
routingKey: "key_1");

//绑定美客多重刊地方站点:交换机,队列名称
channel.QueueBind(queue: "Marketpublish-cron-mercadolibre-republish-site-queue",
exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange",
routingKey: "key_1");

string message = "这是一个延时消息" + i;

IBasicProperties properties = channel.CreateBasicProperties();
properties.Expiration = "5000"; //指定延时队列的过期时间,单位毫秒

channel.BasicPublish(exchange: "Marketpublish-cron-mercadolibre-republish-site-delay-exchange",
routingKey: "key_1",
basicProperties: properties,
body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($"message 已发送~~");

}
#endregion
}
}
}

 

2.//rabiitmq 延时消息队列 消费端 demo

public static void Consumer()IP
var factory = new ConnectionFactory();
factory.HostName = "ip";//RabbitMQ服务在本地运行
factory.UserName = "admin";//用户名
factory.Password = "****";//密码

using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Marketpublish-cron-mercadolibre-republish-site-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueBind(queue: "Marketpublish-cron-mercadolibre-republish-site-queue",
exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange",
routingKey: "key_1");

//rabbitMq消费消息是通过事件驱动的:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => //如果有消息进入到Rabbitmq,就会触发这个事件来完成消息的消费;
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接受消息: {message}");
};
channel.BasicConsume(queue: "Marketpublish-cron-mercadolibre-republish-site-queue",
autoAck: true,
consumer: consumer);
};

}
}

标签:cron,republish,exchange,队列,mercadolibre,Marketpublish,rabbitmq,queue,延时
From: https://www.cnblogs.com/csj007523/p/17080016.html

相关文章

  • 堆栈、队列、单调栈、单调队列1
    Stack和Queue——栈和队列栈的定义:栈是限定仅在表头进行插入和删除操作的线性表(先进后出)队列的定义:队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)......
  • rabbitmq 概念部署及应用
    概念RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种......
  • rabbitmq启动不起来解决方案
    1,用systemctlstartrabbitmq-server命令启动发现启动不起来2,查看journalctl-xe日志发现1月3015:05:34sit2--10-1-1-97rabbitmq-server[5088]:BOOTFAILED1......
  • 代码随想录 |栈与队列总结篇
    基础知识:栈与队列都是容器接口,而非容器;栈与队列可选容器,缺省状态下是deque;提供push,pop等操作,但不提供送代器,不提供走访功能,因为只能在一边进行插入,弹出操作;栈的经典题......
  • Rabbitmq 与kafka
    Rabbitmq比kafka可靠,kafka更适合IO高吞吐的处理,比如ELK日志收集Kafka和RabbitMq一样是通用意图消息代理,他们都是以分布式部署为目的。但是他们对消息语义模型的定义的假设......
  • RabbitMQ之五种消息模型
     首先什么是MQMQ全称是MessageQueue,即消息对列!消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生......
  • 7-6 银行业务队列简单模拟
    #include<stdio.h>#include<stack>#include<queue>#include<bits/stdc++.h>usingnamespacestd;intmain(){ intn; intflag=1; queue<int>s1,s2;//奇数s1a窗口 scanf("%......
  • 《RabbitMQ实战指南》笔记
    1.RabbitMQ简介1.1什么是消息中间件消息队列中间件(MessageQueueMiddleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行......
  • 硬盘测速工具中的队列深度是个什么东西——CrystalDiskMark中的Q32T16是什么意思
     ================================ 最近有使用CrystalDiskMark给自己的硬盘做测速,发现有个名词自己不是很理解,就是像Q32T16这样的词: 在网上找了好久,都说Q32T16......
  • Redis实现分布式阻塞队列
    1.Redis分布式锁实现原理分布式锁本质上要实现的目标就是在Redis里面占一个“茅坑”,当别的进程也要来占时,发现已经有人蹲在那里了,就只好放弃或者稍后再试。占坑一般是......