首页 > 其他分享 >RabbitMQ-消息队列之topic使用

RabbitMQ-消息队列之topic使用

时间:2024-08-18 13:52:51浏览次数:13  
标签:queue false name 队列 RabbitMQ topic php channel

1、安装rabbitmq

怎么安装rabbitmq请查看之前课程,如果已经安装,请略过此步。

2、创建vendor文件夹或是直接采用PHP框架

mkdir vendor

3、进入文件

cd vendor

4、安装php扩展

composer require php-amqplib/php-amqplib

5、进入上级创建topic文件夹

cd ../
mkdir topic

6、进入topic文件并创建生产者php

cd topic
touch publish.php

7、输入topic生产者内容

<?php

require_once  '../vendor/autoload.php';


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$v_host = 'order';

$exc_name ='topic_log';

$routing_key = 'goods.warn';

$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);

$channel = $connection->channel();


$channel->exchange_declare($exc_name,'topic',false,false,false);

$data = 'this is '.$routing_key.' message';

$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMEssage::DELIVERY_MODE_PERSISTENT]);


$channel->basic_publish($msg,$exc_name,$routing_key);

$channel->close();

$connection->close();

8、创建消费者php

touch all.php
touch user.php
touch warn.php

9、输入topic消费者内容

vi all.php
<?php

require_once  '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$v_host ='order';

$exc_name = 'topic_log';

$routing_key = '#';

$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);

$channel = $connection->channel();

$channel->exchange_declare($exc_name,'topic',false,false,false);


list($queue_name,,) = $channel->queue_declare('',false,false,true,false);

$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function($msg){
	echo 'received ' ,$msg->body,"\n";
	$msg->ack();
};

$channel->basic_qos(null,1,null);

$channel->basic_consume($queue_name,'',false,false,false,false,$callback);

while($channel->is_open()){
	$channel->wait();
}

$channel->close();

$connection->close();
vi user.php
<?php

require_once  '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$v_host ='order';

$exc_name = 'topic_log';

$routing_key = 'user.*';

$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);

$channel = $connection->channel();

$channel->exchange_declare($exc_name,'topic',false,false,false);


list($queue_name,,) = $channel->queue_declare('',false,false,true,false);

$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function($msg){
	echo 'received ' ,$msg->body,"\n";
	$msg->ack();
};

$channel->basic_qos(null,1,null);

$channel->basic_consume($queue_name,'',false,false,false,false,$callback);

while($channel->is_open()){
	$channel->wait();
}

$channel->close();

$connection->close();
vi warn.php
<?php

require_once  '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$v_host ='order';

$exc_name = 'topic_log';

$routing_key = '*.warn';

$connection = new AMQPStreamConnection('localhost',5672,'wangyulong','wangyulong',$v_host);

$channel = $connection->channel();

$channel->exchange_declare($exc_name,'topic',false,false,false);


list($queue_name,,) = $channel->queue_declare('',false,false,true,false);

$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function($msg){
	echo 'received ' ,$msg->body,"\n";
	$msg->ack();
};

$channel->basic_qos(null,1,null);

$channel->basic_consume($queue_name,'',false,false,false,false,$callback);

while($channel->is_open()){
	$channel->wait();
}

$channel->close();

$connection->close();

10、执行消费者

php all.php
php user.php
php warn.php

11、执行生产者

php publish.php

此时你会发现只有warn.php对应的消费者及all.php都有数据,如果你把publish.php生产者里面的$routing_key对应的值改成user.goods,你会发现user.php及all.php对应的消费者都有数据。这就是rabbitmq topic工作模式硬实力

标签:queue,false,name,队列,RabbitMQ,topic,php,channel
From: https://blog.csdn.net/m0_63603104/article/details/141292153

相关文章

  • 链式队列的实现方式
    C语言实现链队列LinkQueue.h////CreatedbyAdministratoron2024/8/17.//#ifndefLINKQUEUE_H#defineLINKQUEUE_H#include<stdbool.h>/**定义队列元素结构体*/typedefintDataType;typedefstructLinkQueueNode{DataTypedata;//数据域structLi......
  • RabbitMQ实现消息可靠性的三种方法(发送者可靠性,MQ可靠性,消费者可靠性)
    1.发送者可靠性1.1发送者重连RabbitMQ的发送者重连机制是一种应对网络不稳定或连接中断情况的策略,它能够自动尝试重新建立与RabbitMQ服务器的连接,以确保消息能够成功发送。发送者重连通常涉及到一些配置参数,如连接超时时间、重试间隔、最大重试次数等。例如,在Spring框架的......
  • [Redis]延迟消息队列
    延迟消息队列redis数据结构,用什么结构实现延迟消息队列延迟消息队列是一种消息队列系统,它允许消息的发布者在消息发送时指定消息的投递时间,使消息在未来的某个预定时间点被消费者接收。这种机制对于需要在稍后执行的任务或具有特定延迟需求的应用非常有用。对于实现延迟消息队......
  • RabbitMQ面试题
    一、RabbitMQ如何保证消息的可靠性RabbiMQ如果想要保证消息的可靠性有几种方式可以实现:1、消费端消息可靠性保证:1).消息确认在消费端可以设置手动ACK模式,为确保消息可靠性,手动确认消息是否被正常处理,若存在异常或者未运行,则消息超时后不会被删除,会被重新投递2).死信队列......
  • RabbitMQ-记录一个问题及解决
    我在配置文件中写入了rabbitmq:host:127.0.0.1port:5672username:guestpassport:guestvirtualhost:/switchFlag:truepool_size:10然后希望实现一个监听器,所以加上了注解形式@RabbitListener(queues=CommonConstants.QUERE_NAME_NOTIFY),由于我工作......
  • RabbitMQ的介绍
    一、什么是MQMQ(messagequeue),从字面意思上看就个FIFO先入先出的队列,只不过队列中存放的内容是message而已,它是一种具有接收数据、存储数据、发送数据等功能的技术服务。消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象......
  • DelayQueue 延迟队列使用
    一、DelayQueue是什么DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。二、DelayQueue能做什么1.淘宝订单业务:下......
  • 第七节:消息队列推拉模式选择、kafak如何提高吞吐量、超时关单和支付成功同时出现咋办
    一.        二.        三.         !作       者:Yaopengfei(姚鹏飞)博客地址:http://www.cnblogs.com/yaopengfei/声     明1:如有错误,欢迎讨论,请勿谩骂^_^。声     明2:原创博客请在转载......
  • BackgroundWorker和BlockingCollection配合实现有消息才发送的队列
    privateBackgroundWorkerm_MessageConsumer=newBackgroundWorker();privateBlockingCollection<string>m_BlockingQueue=newBlockingCollection<string>();构造函数{m_MessageConsumer.DoWork+=M_MessageConsumer_DoWork;m_MessageConsumer.Work......
  • 小X与队列
    题目题目描述小X正和同学们做列队的练习。有\(n\)名同学排成一路纵队,编号为i的同学排在从前往后数第i个位置上,即:初始时的队列为\(1,2,3,...,n\)。接下来小X会发出若干条指令,每条指令形如“请编号为x的同学排到最前面来”。(例如:若当前时刻的队列为\(5,4,3,2......