//rabbitmq 延时消息队列 生产端 demo
//1.将消息发送到延时交换机对应的队列上delay-queue,指定过期时间;过期后转发的交换机和绑定的key
//2.过期时间过期后将消息转发到新的队列上;
//3.绑定新队列的消费者消费消息,到达延时消费的目的
1.//rabiitmq 延时消息队列 生产端 demo
//1.将消息发送到延时交换机对应的队列上delay-queue,指定过期时间;过期后转发的交换机和绑定的key
//2.过期时间过期后将消息转发到新的队列上;
//3.绑定新队列的消费者消费消息,到达延时消费的目的
public static void SendYanShi(string i)
{
var factory = new ConnectionFactory();
factory.HostName = "IP";//RabbitMQ服务在本地运行
factory.UserName = "admin";//用户名
factory.Password = "***";//密码
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
Dictionary<string, object> dic = new Dictionary<string, object>();
dic = new Dictionary<string, object>();
dic.Add("x-dead-letter-exchange", "Marketpublish-cron-mercadolibre-republish-site-exchange");//过期消息转向到新消息的交换机
dic.Add("x-dead-letter-routing-key", "key_1");//过期消息转向路由相匹配routingkey
//美客多重刊地方站点:延时交换机,延时队列名称:
channel.QueueDeclare(queue: "Marketpublish-cron-mercadolibre-republish-site-delay-queue", durable: true, exclusive: false, autoDelete: false, arguments: dic);
channel.ExchangeDeclare(exchange: "Marketpublish-cron-mercadolibre-republish-site-delay-exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//美客多重刊地方站点:交换机,队列名称:
channel.QueueDeclare(queue: "Marketpublish-cron-mercadolibre-republish-site-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
#region MyRegion
{
//绑定美客多重刊地方站点:延时交换机,延时队列名称:
channel.QueueBind(queue: "Marketpublish-cron-mercadolibre-republish-site-delay-queue",
exchange: "Marketpublish-cron-mercadolibre-republish-site-delay-exchange",
routingKey: "key_1");
//绑定美客多重刊地方站点:交换机,队列名称
channel.QueueBind(queue: "Marketpublish-cron-mercadolibre-republish-site-queue",
exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange",
routingKey: "key_1");
string message = "这是一个延时消息" + i;
IBasicProperties properties = channel.CreateBasicProperties();
properties.Expiration = "5000"; //指定延时队列的过期时间,单位毫秒
channel.BasicPublish(exchange: "Marketpublish-cron-mercadolibre-republish-site-delay-exchange",
routingKey: "key_1",
basicProperties: properties,
body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($"message 已发送~~");
}
#endregion
}
}
}
2.//rabiitmq 延时消息队列 消费端 demo
public static void Consumer()IP
var factory = new ConnectionFactory();
factory.HostName = "ip";//RabbitMQ服务在本地运行
factory.UserName = "admin";//用户名
factory.Password = "****";//密码
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Marketpublish-cron-mercadolibre-republish-site-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueBind(queue: "Marketpublish-cron-mercadolibre-republish-site-queue",
exchange: "Marketpublish-cron-mercadolibre-republish-site-exchange",
routingKey: "key_1");
//rabbitMq消费消息是通过事件驱动的:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => //如果有消息进入到Rabbitmq,就会触发这个事件来完成消息的消费;
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接受消息: {message}");
};
channel.BasicConsume(queue: "Marketpublish-cron-mercadolibre-republish-site-queue",
autoAck: true,
consumer: consumer);
};
}
}