首页 > 其他分享 >RabbitMQ 入门教程

RabbitMQ 入门教程

时间:2024-08-26 09:54:43浏览次数:21  
标签:pika exchange 入门教程 RabbitMQ queue connection message channel

RabbitMQ 入门教程

1. 引言

RabbitMQ 是一个开源的消息代理和队列服务器,实现高级消息队列协议 (AMQP)。它能帮助开发者实现应用程序间的解耦、异步处理、流量削峰等需求。

2. 安装与配置

2.1 安装RabbitMQ

2.1.1 Ubuntu

```bash

sudo apt-get update

sudo apt-get install rabbitmq-server

```

2.1.2 Windows

1. 下载安装包: [RabbitMQ Download](https://www.rabbitmq.com/download.html)

2. 运行安装向导。

2.2 启动服务

```bash

sudo service rabbitmq-server start

```

2.3 配置管理插件

```bash

sudo rabbitmq-plugins enable rabbitmq_management

```

2.4 访问管理界面

- 浏览器访问: `http://localhost:15672/`

- 默认用户名/密码: `guest/guest`

3. Python 开发环境准备

3.1 安装pika库

```bash

pip install pika

```

4. Hello World 示例

4.1 发送端 - send.py

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

```

4.2 接收端 - receive.py

```python

import pika

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

```

5. 工作队列

5.1 创建工作队列

```python

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "A message"

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2, # make message persistent

))

print(" [x] Sent %r" % message)

connection.close()

```

5.2 消费者

```python

import pika

import time

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

time.sleep(body.count(b'.'))

print(" [x] Done")

ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='task_queue',

on_message_callback=callback)

channel.start_consuming()

```

6. 发布/订阅模式

6.1 发布者

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "Log message"

channel.basic_publish(exchange='logs',

routing_key='',

body=message)

print(" [x] Sent %r" % message)

connection.close()

```

6.2 订阅者

```python

import pika

import sys

def callback(ch, method, properties, body):

print(" [x] %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)

queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

channel.start_consuming()

```

7. 路由模式

7.1 发布者

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = "info"

message = "Info message"

channel.basic_publish(exchange='direct_logs',

routing_key=severity,

body=message)

print(" [x] Sent %r:%r" % (severity, message))

connection.close()

```

7.2 订阅者

```python

import pika

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severities = ["info", "warning"]

result = channel.queue_declare(queue='', exclusive=True)

queue_name = result.method.queue

for severity in severities:

channel.queue_bind(exchange='direct_logs',

queue=queue_name,

routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

channel.start_consuming()

```

8. 主题模式

8.1 发布者

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = "kern.critical"

message = "Kernel critical message"

channel.basic_publish(exchange='topic_logs',

routing_key=routing_key,

body=message)

print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()

```

8.2 订阅者

```python

import pika

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)

queue_name = result.method.queue

binding_keys = ["kern.*", "*.critical"]

for binding_key in binding_keys:

channel.queue_bind(exchange='topic_logs',

queue=queue_name,

routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

channel.start_consuming()

```

标签:pika,exchange,入门教程,RabbitMQ,queue,connection,message,channel
From: https://blog.csdn.net/qq_40698086/article/details/141553637

相关文章

  • RabbitMQ 入门教程
    RabbitMQ入门教程1.引言RabbitMQ是一个开源的消息代理和队列服务器,实现了AMQP0-9-1标准。本教程将指导你如何安装、配置和使用RabbitMQ进行消息传递。2.安装RabbitMQ2.1安装RabbitMQ服务器2.1.1Ubuntu/Debian```bashsudoapt-getupdatesudoapt-getins......
  • RabbitMQ 相关概念及简述
    总结自:BV15k4y1k7EpRabbitMQ是一款常用的消息队列(MQ)。什么是消息队列MQ全称为MessageQueue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信。消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,......
  • RabbitMQ 入门示例
    参考:BV15k4y1k7EpRabbitMQ相关概念及简述中简单介绍了RabbitMQ提供的6种工作模式。下面以简单模式为例,介绍RabbitMQ的使用。新建工程先新建Maven工程RabbitMQ作为父工程,在父工程下新建三个子模块:common:公共包producer:生产者consumer:消费者在三个模块中添加......
  • 消息队列-RabbitMQ学习笔记(一)
    1.什么是消息队列消息队列(MessageQueue,简称MQ)是一种用于在应用程序之间传递消息的技术,通常在分布式系统中使用。它提供了一种异步通信机制,使得应用程序可以通过发送和接收消息来进行数据交换。消息队列可以用来存储消息,这就涉及到消息队列的三个关键字:存储、消息、队列......
  • CentOS7 安装及配置 RabbitMQ
    主要总结自:Centos7安装RabbitMQ1、安装前准备由于RabbitMQ使用的是Erlang语言开发的,因此在安装RabbitMQ之前需要安装Erlang环境,Erlang与RabbitMQ的下载地址分别为:Erlang:https://github.com/rabbitmq/erlang-rpm/releasesRabbitMQ:https://github.com/rabbitmq/rabbitmq-ser......
  • RabbitMQ 从原理到实战—golang版本
    1.MQ1.1概念MQ(MessageQueue,消息队列)是一种用于在分布式系统中实现消息传递和异步通信的技术。它充当了发送方和接收方之间的中间人,用于在应用程序或服务之间传递消息。MQ允许系统中的不同组件彼此独立运行,而无需直接通信或相互依赖,从而提高系统的可扩展性、可靠性和灵......
  • AI绘画SD三分钟入门教程!秋叶大佬8月最新的Stable Diffusion整合包V4.9来了,完整安装部
    大家好,我是画画的小强前几天8月15日,国内AI绘画工具开源大佬更新了StableDiffusion整合包最新版本4.9,相关信息从图中能看到,本次更新后SDWebUI已经能够支持最新的AI绘画大模型SD3.0,以及更新了SD最强的控制插件ControlNet的ControlNetUnion模型,在使用方面上,新版本对比旧版......
  • ansible自动化部署rabbitmq
    一、准备环境1、准备机器本次以192.168.190.155,192.168.190.156,192.168.190.157,192.168.190.158四台虚机为例,本文操作全在管理机中。机器IP节点CPU&内存系统盘存储192.168.190.155Ansible管理机2C4G50GB50GB192.168.190.158Master/Worker(被管理机)2......
  • java 入门教程(非常详细!1.6w+ 文字)
    先序:学习编程语言要先学个轮廓,刚开始只用学核心的部分,一些细节、不常用的内容先放着,现用现查即可;把常用的东西弄熟练了在慢慢补充。1.Java概述Java是一种面向对象的编程语言,由SunMicrosystems(现在的Oracle)在1995年推出。Java程序可以在任何支持Java虚拟机(J......