首页 > 编程语言 >rabbitmq php 代码示例

rabbitmq php 代码示例

时间:2023-07-13 11:22:22浏览次数:46  
标签:AMQP 示例 队列 exchange rabbitmq queue 交换机 message php

交换机类型

direct : 直连交换机, 根据路由键投递到与绑定键匹配的队列。
fanout : 扇形交换机,采用广播模式,投递到所有与之绑定的队列。
topic  : 主题交换机,对路由键与绑定键进行模式匹配后再投递到相应的队列。
headers : 头交换机,不处理路由键, 而是根据发送的消息内容中的 headers 属性进行匹配。


路由键与绑定键
routing_key : 发布消息时用的到 key 叫路由键, 它是一个精确的字符串
binding_key : 队列绑定到交换机时用到的 key 叫绑定键, 当交换机类型为 topic 时,绑定键可以包含通配符( * : 通配一个单词, # : 通配多个单词)。

 

docker 容器

docker run -–name mq  -e RABBITMQ_DEFAULT_USER=test -e RABBITMQ_DEFAULT_PASS=12345  -p 15672:15672 -p 5672:5672 -d rabbitmq

docker exec -it mq /bin/bash

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

# 管理界面 channels 标签无法点击
echo management_agent.disable_metrics_collector = false > /etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf

 

浏览器访问 localhost:15672 输入 test 12345  登录, 自己尝试创建交换机和队列,能更理解, 特别是代码里一笔带过的 Arguments 参数

 

测试 topic  交换机

<?php

$connect = new AMQPConnection(['login'=>'test', 'password'=>12345]); //创建连接
$connect->connect();

if(!$connect->isConnected()){
    exit("连接失败");
}

$channel  = new AMQPChannel($connect); //创建频道
$exchange = new AMQPExchange($channel); //创建交换机
$queue    = new AMQPQueue($channel); //创建队列

// 配置交换机
$exchange->setName('zbs.topic');
$exchange->setType(AMQP_EX_TYPE_TOPIC); //交换机类型 AMQP_EX_TYPE_DIRECT,AMQP_EX_TYPE_FANOUT,AMQP_EX_TYPE_HEADERS
$exchange->setFlags(AMQP_DURABLE);// 设置标志: AMQP_DURABLE 持久化, AMQP_PASSIVE 不创建,但不存在时抛出异常
$exchange->declareExchange();// 发布交换机,不存在时,自动创建

// 配置队列
$queue->setName('zbs.queue');
$queue->setFlags(AMQP_DURABLE); //设置标志: AMQP_DURABLE 持久化, AMQP_PASSIVE 不创建,但不存在时抛出异常, AMQP_EXCLUSIVE 独占队列, AMQP_AUTODELETE 没有监听时自动删除
$queue->declareQueue(); //发布队列,不存在时,自动创建.注意:发布队列后才能绑定到交换机
$queue->bind($exchange->getName(), 'news.*.*'); // 绑定键可以用通配符


// 发布消息, 通过交换机来发布消息
for($i = 0; $i < 10; $i++) {
    $exchange->publish("message{$i}", 'news.sport.china', AMQP_MANDATORY); // AMQP_MANDATORY 消息投递失败将报错误  AMQP_IMMEDIATE 消息应该立即被处理,高优先级
}

// 消费消息, 通过队列来消费消息
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue){

    $queue->ack($message->getDeliveryTag()); // 消息确认

    echo $message->getBody() . "\n";

    // dump(
    //     $message->getExchangeName(),
    //     $message->getBody(),
    //     $message->getConsumerTag(),
    //     $queue->getConsumerTag(),
    //     $queue->getName(),
    //     $message->getConsumerTag() == $queue->getConsumerTag() ? 'valid' : 'not valid'
    // );

    sleep(1);

});

 

测试 headers 交换机

$connect = new AMQPConnection(['login'=>'test', 'password'=>12345]); //创建连接
$connect->connect();

if(!$connect->isConnected()){
    exit("连接失败");
}

$channel  = new AMQPChannel($connect); //创建频道

$exchange = new AMQPExchange($channel);
$exchange->setName('zbs.headers' );
$exchange->setType(AMQP_EX_TYPE_HEADERS);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName("zbs.headers");
$queue->declareQueue();

$arguments = array('x-match' => 'any', 'type' =>'custom', 'name'=>'tom'); // x-match: all 所有匹配, any 任意匹配
$queue->bind($exchange->getName(), null, $arguments);

// 发布消息
for($i = 0; $i < 10; $i++) {
    // 发布消息时 headers 头要匹配才能正确投递到队列
    $exchange->publish("headers message{$i}", null, AMQP_MANDATORY, ['headers'=> ['name' => 'tom' ]]);
}


// 消费消息, 为了表示消费功能在其他项目,这里我们重新创建对象
$queue = new AMQPQueue($channel);
$queue->setName("zbs.headers");
$queue->declareQueue();
$queue->consume(function(AMQPEnvelope $message, AMQPQueue $queue){

    $queue->ack($message->getDeliveryTag());
    echo $message->getBody() . "\n";
    sleep(1);


});

 

 总结一下:

1. 交换机和队列是多对多的关系

2. 交换机和队列之间的绑定键也可以有多个.

3. 创建,生产消息 和消费消息,其实是三个过程,如果已经明确知道交换机和队列的存在,那么你在生产消息时,只要声明交换机,而在消费消息时只需声明队列.

 

标签:AMQP,示例,队列,exchange,rabbitmq,queue,交换机,message,php
From: https://www.cnblogs.com/zbseoag/p/17549895.html

相关文章

  • C# 选择文件选择设置类型示例
     例子:OpenFileDialogdialog=newOpenFileDialog();dialog.Multiselect=false;//该值确定是否可以选择多个文件dialog.Title="请选择文件";dialog.Filter="图像文件(*.jpg;*.png;*.bmp)|*.jpg;*.png;*.bmp;*.jpg......
  • Dockerfile与docker-compose搭建php环境
     目录结构 php.conf文件内容server{ listen 80; server_name localhost; location/{ root /usr/share/nginx/html/web; index index.phpindex.htmlindex.htm; } error_page 500502503504 /50x.html; location=/50x.html{ root /usr/share/nginx/......
  • PHP微信接入时的token验证
    微信接入时的token验证//微信token认证$signature=$_GET["signature"];$timestamp=$_GET["timestamp"];$nonce=$_GET["nonce"];$echostr=$_GET["echostr"];//你的设置Token$token="lingqiangkejiToken";//1)将......
  • 四、EAS代码代码示例
    4.EAS代码代码示例4.1单据新增代码//如:新增一行付款单//构造一条付款单信息(构造值对象)PaymentBillInfopayInfo=newPaymentBillInfo();payInfo.setNumber("1001")://关联用户UserInfouserInfo=newUserInfo();userInfo.setId(BOSUuid.read("867d5df6-00f8-1000-e......
  • 网站木马文件查找之-列出2023年网站根目录下以及子目录所有php文件
    本代码适合查找网站挂马(木马)文件使用<?phpfunctionlistPhpFiles($directory){$fileList=[];$iterator=newRecursiveIteratorIterator(newRecursiveDirectoryIterator($directory));foreach($iteratoras$file){if($file->isFile()&&$......
  • RabbitMQ简介
    简介消息队列提供一个异步通信机制,消息的发送者不必一直等待消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被缓存到队列中;当网络畅通时,消息将转发给相应的应用程序或服务。前提是这些服务或程序订阅了该消息。RabbitMQ是使用Erla......
  • PHP+Redis消息队列
    调用方式$redis=RedisManager::getInstance();$queue=json_encode(['queue_id'=>$queueId,'question'=>$question],256);if($redis->LPush('QA_wecom',$queue))returnResult::Success();单例<?phpnamespaceapp\admin\com......
  • spring-boot-webflux-servlet-websocket-2.x 示例响应式编程
    这里是一个使用SpringBootWebFlux结合Servlet和WebSocket的示例代码:首先,确保在pom.xml文件中添加以下依赖:<dependencies><!--SpringBootWebFlux--><dependency><groupId>org.springframework.boot</groupId><artifactId>sprin......
  • JavaScript加密:常见加密种类、优缺点和代码示例
    当涉及到JavaScript加密时,有多种加密算法和技术可供选择。下面我将列举一些常见的加密种类、它们的优缺点,并提供一些代码案例作为参考。对称加密算法:对称加密算法使用相同的密钥进行加密和解密。常见的对称加密算法包括AES(AdvancedEncryptionStandard)和DES(DataEncryptionS......
  • api接口技术开发分享,亚马逊国际获得AMAZON商品详情数据采集商品规格信息列表示例说明
    ​ 亚马逊商品详情API接口的作用是提供对亚马逊平台上商品的详细信息进行访问和操作的功能。通过调用该API接口,开发者可以获取亚马逊商品的标题、描述、价格、库存、销量、评价、图片等信息,并且可以对商品进行购买、加入购物车、添加评论等操作。这个API接口可以用于构建电商平......