首页 > 编程语言 >【Node.js】RabbitMQ 不同交换器类型的使用

【Node.js】RabbitMQ 不同交换器类型的使用

时间:2024-09-15 19:20:04浏览次数:3  
标签:Node 交换器 const exchange await RabbitMQ js error channel

RabbitMQ 是一个强大的消息队列系统,支持多种消息路由模式(Exchange Types),也可以说是发布订阅模式,包括 Direct、Topic、Headers 和 Fanout。

实际上这四种模式的区别在代码层面只是参数配置不同。


1. RabbitMQ 概述

RabbitMQ 使用交换器(Exchange)来接收生产者发送的消息,并根据交换器类型和绑定规则将消息路由到一个或多个队列(Queue)。四种主要的交换器类型分别是:

  • Direct Exchange:基于完全匹配的路由键进行消息路由。
  • Topic Exchange:基于模式匹配的路由键进行消息路由,支持通配符等(使用"*“匹配一个单词或使用”#"匹配多个单词)。
  • Headers Exchange:基于消息头属性进行消息路由。
  • Fanout Exchange:将消息广播到所有绑定的队列,无视路由键。

使用 MQ 的时候,记得先启动 MQ 服务。

2. Direct Exchange

Direct Exchange 根据消息的路由键(Routing Key)与队列的绑定键(Binding Key)进行严格匹配,将消息路由到对应的队列。

适用场景

  • 任务分发:将任务发送到特定的工作队列。
  • 日志记录:根据日志级别将日志发送到不同的日志处理队列。

示例:日志系统

假设我们有一个日志系统,根据日志级别(info、warning、error)将日志消息发送到不同的队列进行处理。

3.1 生产者(Publisher)
// direct_publisher.js
const amqp = require('amqplib');

async function publishMessage() {
  try {
    // 1. 连接 MQ
    const connection = await amqp.connect('amqp://localhost:5672');
    // 2. 创建一个通道
    const channel = await connection.createChannel();
    // 3. 创建交换机 如果创建了就不会创建 否则创建
    const exchange = 'direct_logs';
    // 参数:交换机名称(随便写) 交换机类型(direct、fanout、topic、headers) 配置项 
    await channel.assertExchange(exchange, 'direct', { durable: false });

    const args = process.argv.slice(2);
    const severity = args[0] || 'info';
    const message = args.slice(1).join(' ') || 'Hello World!';
    // 4. 发送消息 
    // 参数:交换机名称 匹配路由的key 内容(Buffer)
    channel.publish(exchange, severity, Buffer.from(message));
    console.log(` [x] Sent '${severity}':'${message}'`);

    setTimeout(() => {
      // 5. 断开连接
      channel.close();
      connection.close();
    }, 500);
  } catch (error) {
    console.error(error);
  }
}

publishMessage();

使用方法

node direct_publisher.js error "This is an error message"
node direct_publisher.js info "This is an info message"
node direct_publisher.js warning "This is a warning message"
3.2 消费者(Consumer)

我们将创建三个消费者,分别处理不同级别的日志。

3.2.1 Error Logger
// direct_consumer_error.js
const amqp = require('amqplib');

async function consumeErrorLogs() {
  try {
    const connection = await amqp.connect('amqp://localhost:5672');
    const channel = await connection.createChannel();

    const exchange = 'direct_logs';
    const queue = 'error_logs';
    const severity = 'error';
    // 创建交换机 保证在消息队列前一定会有交换机(而且上面说过交换机一旦创建就不会重复创建)
    await channel.assertExchange(exchange, 'direct', { durable: false });
    // 创建队列
    const q = await channel.assertQueue(queue, { durable: false });
    // 交换机和队列要绑定 
    // 参数:队列名称 交换机名称 匹配路由的key
    await channel.bindQueue(q.queue, exchange, severity);

    console.log(` [*] Waiting for ${severity} logs. To exit press CTRL+C`);
	// 消费消息
    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received ${severity}: ${msg.content.toString()}`);
      }
    }, { noAck: true });  // 自动消费
  } catch (error) {
    console.error(error);
  }
}

consumeErrorLogs();
3.2.2 Info Logger
// direct_consumer_info.js
const amqp = require('amqplib');

async function consumeInfoLogs() {
  try {
    const connection = await amqp.connect('amqp://localhost:5672');
    const channel = await connection.createChannel();

    const exchange = 'direct_logs';
    const queue = 'info_logs';
    const severity = 'info';

    await channel.assertExchange(exchange, 'direct', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });
    await channel.bindQueue(q.queue, exchange, severity);

    console.log(` [*] Waiting for ${severity} logs. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received ${severity}: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeInfoLogs();
3.2.3 Warning Logger
// direct_consumer_warning.js
const amqp = require('amqplib');

async function consumeWarningLogs() {
  try {
    const connection = await amqp.connect('amqp://localhost:5672');
    const channel = await connection.createChannel();

    const exchange = 'direct_logs';
    const queue = 'warning_logs';
    const severity = 'warning';

    await channel.assertExchange(exchange, 'direct', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });
    await channel.bindQueue(q.queue, exchange, severity);

    console.log(` [*] Waiting for ${severity} logs. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received ${severity}: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeWarningLogs();
3.3 运行示例
  1. 启动消费者:
node direct_consumer_error.js
node direct_consumer_info.js
node direct_consumer_warning.js
  1. 发送日志消息:
node direct_publisher.js error "This is an error message"
node direct_publisher.js info "This is an info message"
node direct_publisher.js warning "This is a warning message"

预期输出

  • Error Logger 会接收到 “This is an error message”
  • Info Logger 会接收到 “This is an info message”
  • Warning Logger 会接收到 “This is a warning message”

注:这里使用的是 cjs,不可以使用顶层 await 语法,顶层 await 只有在 esm 中才可以使用。

3.4 总结

Direct Exchange 适用于需要基于精确匹配路由键来路由消息的场景。通过不同的消费者绑定不同的路由键,可以实现消息的定向分发。


3. Topic Exchange

Topic Exchange 基于路由键的模式匹配进行消息路由,支持通配符(*#),允许更灵活的消息分发。

适用场景

  • 日志系统:根据不同的日志类别和子类别进行路由。
  • 实时消息分发:根据不同的主题进行消息分发,如股票价格、天气更新等。

示例:实时新闻发布系统

假设我们有一个实时新闻系统,根据新闻的类别(如 sports, weather, politics)和子类别(如 football, basketball)将新闻消息发送到不同的队列。

4.1 生产者(Publisher)
// topic_publisher.js
const amqp = require('amqplib');

async function publishMessage() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'topic_news';
    await channel.assertExchange(exchange, 'topic', { durable: false });

    const args = process.argv.slice(2);
    const routingKey = args[0] || 'sports.football';
    const message = args.slice(1).join(' ') || 'Breaking News!';

    channel.publish(exchange, routingKey, Buffer.from(message));
    console.log(` [x] Sent '${routingKey}':'${message}'`);

    setTimeout(() => {
      channel.close();
      connection.close();
    }, 500);
  } catch (error) {
    console.error(error);
  }
}

publishMessage();

使用方法

node topic_publisher.js sports.football "Football match results"
node topic_publisher.js weather.rain "Heavy rain in the city"
node topic_publisher.js politics.election "Election results announced"
4.2 消费者(Consumer)

我们将创建多个消费者,根据不同的主题和子主题进行绑定。

4.2.1 Sports Subscriber
// topic_consumer_sports.js
const amqp = require('amqplib');

async function consumeSportsNews() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'topic_news';
    const queue = 'sports_news';

    await channel.assertExchange(exchange, 'topic', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });

    // 订阅所有体育相关的新闻 只匹配前面内容 后面内容随便
    const bindingKey = 'sports.*';
    await channel.bindQueue(q.queue, exchange, bindingKey);

    console.log(` [*] Waiting for sports news with binding key '${bindingKey}'. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received sports news: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeSportsNews();
4.2.2 Weather Subscriber
// topic_consumer_weather.js
const amqp = require('amqplib');

async function consumeWeatherNews() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'topic_news';
    const queue = 'weather_news';

    await channel.assertExchange(exchange, 'topic', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });

    // 订阅所有天气相关的新闻
    const bindingKey = 'weather.*';
    await channel.bindQueue(q.queue, exchange, bindingKey);

    console.log(` [*] Waiting for weather news with binding key '${bindingKey}'. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received weather news: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeWeatherNews();
4.2.3 All News Subscriber
// topic_consumer_all.js
const amqp = require('amqplib');

async function consumeAllNews() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'topic_news';
    const queue = 'all_news';

    await channel.assertExchange(exchange, 'topic', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });

    // 订阅所有类别的新闻
    const bindingKey = '#';
    await channel.bindQueue(q.queue, exchange, bindingKey);

    console.log(` [*] Waiting for all news with binding key '${bindingKey}'. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received news: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeAllNews();
4.3 运行示例
  1. 启动消费者:
node topic_consumer_sports.js
node topic_consumer_weather.js
node topic_consumer_all.js
  1. 发送新闻消息:
node topic_publisher.js sports.football "Football match results"
node topic_publisher.js weather.rain "Heavy rain in the city"
node topic_publisher.js politics.election "Election results announced"

预期输出

  • Sports Subscriber 会接收到 “Football match results”
  • Weather Subscriber 会接收到 “Heavy rain in the city”
  • All News Subscriber 会接收到所有三条消息

4.4 总结

Topic Exchange 提供了基于模式匹配的灵活消息路由机制。通过使用通配符,可以实现复杂的消息分发逻辑,适用于需要多层次分类的场景。


4. Headers Exchange

Headers Exchange 根据消息头属性进行路由,而不是依赖路由键。它使用消息的头部(headers)来匹配队列的绑定规则。

适用场景

  • 复杂路由逻辑:当路由逻辑涉及多个属性时,Headers Exchange 可以更方便地处理。
  • 属性驱动的消息分发:如基于消息的多个属性组合进行路由。

示例:多条件订单处理系统

假设我们有一个订单处理系统,根据订单的属性(如国家、货币)将订单路由到不同的处理队列。

5.1 生产者(Publisher)
// headers_publisher.js
const amqp = require('amqplib');

async function publishMessage() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'headers_orders';
    await channel.assertExchange(exchange, 'headers', { durable: false });

    const message = { orderId: 1234, item: 'Laptop', quantity: 1 };
    const headers = {
      country: 'US',
      currency: 'USD'
    };

    channel.publish(exchange, '', Buffer.from(JSON.stringify(message)), { headers });
    console.log(` [x] Sent message with headers ${JSON.stringify(headers)}: ${JSON.stringify(message)}`);

    setTimeout(() => {
      channel.close();
      connection.close();
    }, 500);
  } catch (error) {
    console.error(error);
  }
}

publishMessage();

使用方法

node headers_publisher.js
5.2 消费者(Consumer)

我们将创建两个消费者,分别处理不同国家和货币的订单。

5.2.1 US Orders Consumer
// headers_consumer_us.js
const amqp = require('amqplib');

async function consumeUSOrders() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'headers_orders';
    const queue = 'us_orders';

    await channel.assertExchange(exchange, 'headers', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });

    // 绑定队列,匹配 country=US 和 currency=USD
    const bindingHeaders = {
      country: 'US',
      currency: 'USD'
    };

    await channel.bindQueue(q.queue, exchange, '', { headers: bindingHeaders, 'x-match': 'all' });

    console.log(` [*] Waiting for US orders with headers ${JSON.stringify(bindingHeaders)}. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received US order: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeUSOrders();
5.2.2 EU Orders Consumer
// headers_consumer_eu.js
const amqp = require('amqplib');

async function consumeEUOrders() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'headers_orders';
    const queue = 'eu_orders';

    await channel.assertExchange(exchange, 'headers', { durable: false });
    const q = await channel.assertQueue(queue, { durable: false });

    // 绑定队列,匹配 country=EU 和 currency=EUR
    const bindingHeaders = {
      country: 'EU',
      currency: 'EUR'
    };

    await channel.bindQueue(q.queue, exchange, '', { headers: bindingHeaders, 'x-match': 'all' });

    console.log(` [*] Waiting for EU orders with headers ${JSON.stringify(bindingHeaders)}. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [x] Received EU order: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeEUOrders();
5.3 运行示例
  1. 启动消费者:
node headers_consumer_us.js
node headers_consumer_eu.js
  1. 发送订单消息:
node headers_publisher.js

预期输出

  • US Orders Consumer 会接收到订单消息。
  • EU Orders Consumer 不会接收到该消息,除非发送的消息包含 country: EUcurrency: EUR

发送 EU 订单消息

修改 headers_publisher.js 中的 headers 为:

const headers = {
  country: 'EU',
  currency: 'EUR'
};

然后重新发送:

node headers_publisher.js

预期输出

  • EU Orders Consumer 会接收到订单消息。
  • US Orders Consumer 不会接收到该消息。

5.4 总结

Headers Exchange 通过匹配消息头属性进行路由,适用于复杂的路由需求。它提供了更高的灵活性,但相比其他交换器类型,配置和使用稍显复杂。


5. Fanout Exchange

Fanout Exchange 会将接收到的消息广播到所有绑定的队列,无视路由键。它类似于广播机制,适用于需要将消息发送给所有订阅者的场景。

适用场景

  • 实时通知:如系统广播通知、聊天消息。
  • 日志收集:将日志消息发送到多个日志处理系统。

示例:实时聊天系统

假设我们有一个实时聊天系统,所有用户都能接收到所有发送的消息。

6.1 生产者(Publisher)
// fanout_publisher.js
const amqp = require('amqplib');

async function publishMessage() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'fanout_chat';
    await channel.assertExchange(exchange, 'fanout', { durable: false });

    const message = process.argv.slice(2).join(' ') || 'Hello Chat!';

    channel.publish(exchange, '', Buffer.from(message));
    console.log(` [x] Sent: ${message}`);

    setTimeout(() => {
      channel.close();
      connection.close();
    }, 500);
  } catch (error) {
    console.error(error);
  }
}

publishMessage();

使用方法

node fanout_publisher.js "Hello everyone!"
6.2 消费者(Consumer)

我们将创建多个消费者,模拟不同的聊天客户端接收消息。

6.2.1 Chat Client 1
// fanout_consumer1.js
const amqp = require('amqplib');

async function consumeChat() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'fanout_chat';
    await channel.assertExchange(exchange, 'fanout', { durable: false });

    const q = await channel.assertQueue('', { exclusive: true });
    await channel.bindQueue(q.queue, exchange, '');

    console.log(` [*] Chat Client 1 waiting for messages. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [Chat Client 1] Received: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeChat();
6.2.2 Chat Client 2
// fanout_consumer2.js
const amqp = require('amqplib');

async function consumeChat() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const exchange = 'fanout_chat';
    await channel.assertExchange(exchange, 'fanout', { durable: false });

    const q = await channel.assertQueue('', { exclusive: true });
    await channel.bindQueue(q.queue, exchange, '');

    console.log(` [*] Chat Client 2 waiting for messages. To exit press CTRL+C`);

    channel.consume(q.queue, (msg) => {
      if (msg.content) {
        console.log(` [Chat Client 2] Received: ${msg.content.toString()}`);
      }
    }, { noAck: true });
  } catch (error) {
    console.error(error);
  }
}

consumeChat();
6.3 运行示例
  1. 启动消费者:
node fanout_consumer1.js
node fanout_consumer2.js
  1. 发送聊天消息:
node fanout_publisher.js "Hello everyone!"

预期输出

  • Chat Client 1Chat Client 2 都会接收到 “Hello everyone!” 消息。

6.4 总结

Fanout Exchange 提供了简单的广播机制,适用于需要将消息发送给所有订阅者的场景。它不需要路由键,配置简单,但缺乏灵活性。


6. 总结

  • Direct Exchange:适用于基于精确路由键匹配的场景,如任务分发和日志记录。
  • Topic Exchange:适用于需要基于模式匹配的灵活路由场景,如实时新闻发布和分类消息系统。
  • Headers Exchange:适用于需要基于多个消息头属性进行复杂路由的场景,如多条件订单处理。
  • Fanout Exchange:适用于需要广播消息给所有订阅者的场景,如实时聊天系统和日志广播。

标签:Node,交换器,const,exchange,await,RabbitMQ,js,error,channel
From: https://blog.csdn.net/XiugongHao/article/details/142287008

相关文章

  • 【Node.js】初识 RabbitMQ
    概述MQ顾名思义,是消息队列。RabbitMQ是一个消息队列系统,用于实现异步通信。基于AMQP。AMQP(高级消息队列协议)实现了对于消息的排序,点对点通讯,和发布订阅,保持可靠性、保证安全性。在Node.js的微服务架构中,RabbitMQ可以作为服务之间的消息传递中介,帮助解耦系统组件......
  • 开发nodejs RESETful api 创建项目流程
    开发nodejsRESETfulapi创建项目流程1.安装vm-windows、node.js和npm安装Node.js时,建议使用版本管理器,因为版本变更速度非常快。你可能需要根据所使用的不同项目的需要在多个Node.js版本之间进行切换。Node版本管理器(通常称为nvm)是安装多个版本的Node.js的最......
  • MonoDevelop 的续集dotdevelop
    DotDevelop是一个跨平台的.NET集成开发环境(IDE),它原本是MonoDevelop的分支项目,这个项目更侧重于Linux支持和GTK3升级,github:https://github.com/dotdevelop/dotdevelop。MonoDevelop是一个开源的跨平台C#开发工具,而DotDevelop则是在此基础上进行改进和扩展的一个新版......
  • 基于Node.js+Vue的校园二手物品交易平台设计与实现
    ......
  • Node.js安装与配置(LTS 20.17.0版本)
    目录一、下载Node.js一、下载Node.js1、地址1️⃣下载地址官网:https://nodejs.org/en/download/2️⃣中文镜像地址:下载|Node.js中文网建议:如果没有科学上网,还是镜像好一些,有些时候官网进不去或者下载很慢个人比较喜欢压缩包的方式,没有多余操作,解压即可,弊端就是没有自......
  • 使用Kimi生成Node-RED的代码
    目录引言Kimi生成导入Node-RED  引言前面写过几篇博客介绍了Node-RED的使用。Node-RED使用起来已经很方便了,拖拉一下就可以生成程序流。当然,如果想偷懒,可以借助Kimi。Kimi生成Kimi不能生成图形,但是Node-RED支持JSON格式的保存和导入,我们可以让Kimi生成需要的JSON字......
  • k8s中master无法访问NodePort,普通节点可以
    我的是ens33下有两个ip:  移除不想要的ip(不是我们设置的静态ip): 临时(重启后失效):sudoipaddrdel192.168.87.132/24devens33 #删除所有pod让他们再创建 kubectldeletepods--all--all-namespaces 永久(不要dhcp):sudovi/etc/sysconfig/network-scripts/ifcf......
  • K8S怎么删除一个Node节点
    驱逐Pod本次node为172.16.5.103#kubectldrain172.16.5.103--force--ignore-daemonsets--delete-local-data使用参数--delete-local-data删除本地挂载数据查看该节点无法调度删除node#kubectldeletenode172.16.5.103重新设置为可调度#kubectluncordon172.16.5.103......
  • 解决 Node.js 项目中的 Yarn 安装错误
    在开发Node.js项目时,我们经常需要依赖于各种包来增强项目功能。Yarn是一个流行的包管理工具,由Facebook推出,旨在提供更快、更安全、更可靠的依赖管理。然而,如果你在尝试运行项目时遇到了关于Yarn未安装的错误,这可能会阻碍你的开发流程。本文将指导你如何通过简单的步......
  • 基于Node.js+vue职位智能匹配系统(开题+程序+论文) 计算机毕业设计
    本系统(程序+源码+数据库+调试部署+开发环境)带文档lw万字以上,文末可获取源码系统程序文件列表开题报告内容研究背景随着信息技术的飞速发展和互联网应用的普及,人才招聘市场迎来了前所未有的变革。传统的人才招聘方式往往效率低下,信息不对称,导致求职者难以快速找到合适的工......