首页 > 编程语言 >PHP操作RabbitQM

PHP操作RabbitQM

时间:2023-05-25 15:36:21浏览次数:24  
标签:RabbitQM rabbitmq queue exit connect new 操作 PHP queueName

PHP操作RabbitQM

标签(空格分隔): amqp,php

Fanout Exchange

多个queue都可以绑定到exchange,并且每个queue都会接受到消息
producer(client.php)


$exchangeName = "exchange_01";
try {
    $connection = new AMQPConnection();
    $connection->setHost('127.0.0.1');
    $connection->setPort(5672);
    $connection->setVhost("/");
    $connection->setLogin('guest');
    $connection->setPassword('guest');
    $isConnect = $connection->connect();
    if (!$isConnect) {
        exit("connect rabbitmq error\n");
    }
} catch (AMQPConnectionException $e) {
    exit("connect rabbitmq error\n");
}
$channel = new AMQPChannel($connection);
try {
    $exchange = new AMQPExchange($channel);
    $exchange->setType(AMQP_EX_TYPE_FANOUT);
    $exchange->setName($exchangeName);
    $exchange->declareExchange();
    for ($i = 0; $i < 5; $i++) {
        $bool = $exchange->publish("hello world ". $i);
        echo $bool . "\n";
    }
    $connection->disconnect();
} catch(Exception $e) {
    exit("rabbitmq error: " . $e->getMessage() . PHP_EOL);
}



consumer(server_1.php)

<?php
$exchangeName = "exchange_01";
$queueName = "queue_01";
$options = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
    $isConnect = $connect->connect();
    if (!$isConnect) {
        exit("connect rabbitmq error");
    }
} catch (AMQPConnectionException $e) {
    exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);
$queue = new \AMQPQueue($chan);
// 设置队列名称
$queue->setName($queueName);
// 声明队列
$queue->declareQueue();
// 绑定队列到指定的交换机
$queue->bind($exchangeName);
print "queue: {$queueName} " . PHP_EOL;
while (true) {
    $queue->consume('processMessage');
}
function processMessage(\AMQPEnvelope $envelope, \AMQPQueue $queue) {
    echo "Message: " . $envelope->getBody() . " \n";
    $queue->ack($envelope->getDeliveryTag());
}



consumer(server_2)

<?php
    // 不通的queue可以消费
    $exchangeName = "exchange_01";
    $queueName = "queue_02";
    $options = [
        'host' => '127.0.0.1',
        'port' => '5672',
        'vhost' => '/',
        'login' => 'guest',
        'password' => 'guest'
    ];
    $connect = new \AMQPConnection($options);
    try {
        $isConnect = $connect->connect();
        if (!$isConnect) {
            exit("connect rabbitmq error");
        }
    } catch (AMQPConnectionException $e) {
        exit("connect rabbitmq error");
    }
    $chan = new \AMQPChannel($connect);
    
    $queue = new \AMQPQueue($chan);
    // 设置队列名称
    $queue->setName($queueName);
    // 声明队列
    $queue->declareQueue();
    // 绑定队列到指定的交换机
    $queue->bind($exchangeName);
    print "queue: {$queueName} " . PHP_EOL;
    while (true) {
        $queue->consume('processMessage');
    }
    function processMessage(\AMQPEnvelope $envelope, \AMQPQueue $queue) {
        echo "Message: " . $envelope->getBody() . " \n";
        $queue->ack($envelope->getDeliveryTag());
    }

DIRECT TYPE

direct 有点像一对一处理
producer (client.php)


$exChangeName = "exchange_02";
$queueName = 'queue_02';
$options = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
    $isConnect = $connect->connect();
    if (!$isConnect) {
        exit("connect rabbitmq error");
    }
} catch (AMQPConnectionException $e) {
    exit("connect rabbitmq error");
}

try {
    $chan = new \AMQPChannel($connect);
    $ex = new \AMQPExchange($chan);
    $ex->setName($exChangeName);
    $ex->setType(AMQP_EX_TYPE_DIRECT);
    $ex->setFlags(AMQP_DURABLE); // 持久化
    $ex->declareExchange();
    // 队列
    $queue = new \AMQPQueue($chan);
    $queue->setName($queueName);
    $queue->declareQueue(); // 生成队列
    $queue->setFlags(AMQP_DURABLE); // 持久化
    $queue->bind($exChangeName, $queueName); // 绑定
    for ($i=0; $i<2;$i++) {
        $bool = $ex->publish("HELLO WORLD" . $i, $queueName, AMQP_NOPARAM, ['delivery_mode' => 2]);
        echo $bool . PHP_EOL;
    }
} catch (Exception $e) {
    var_dump($e->getMessage());
}


consumer(server.php)

$exChangeName = "exchange_02";
$queueName = "queue_02";
$options = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
    $isConnect = $connect->connect();
    if (!$isConnect) {
        exit("connect rabbitmq error");
    }
} catch (AMQPConnectionException $e) {
    exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);

$queue = new \AMQPQueue($chan);
// 设置队列名称
$queue->setName($queueName);
// 声明队列
$queue->declareQueue();
$queue->setFlags(AMQP_DURABLE);
$queue->bind($exChangeName, $queueName); // 绑定

// 绑定队列到指定的交换机

print "queue: $queueName " . PHP_EOL;
while (true) {
    $queue->consume('processMessage');
//    $queue->consume('processMessage', AMQP_AUTOACK);// 自动ack
}


function processMessage(\AMQPEnvelope $envelope, \AMQPQueue $queue) {
    echo "Message: " . $envelope->getBody() . " \n";
    // 手动ack
    $bool = $queue->ack($envelope->getDeliveryTag());
}

标签:RabbitQM,rabbitmq,queue,exit,connect,new,操作,PHP,queueName
From: https://www.cnblogs.com/yanweifeng/p/17431412.html

相关文章

  • 【全国产龙芯平台】迅为iTOP-LS3A5000_7A2000开发板+银河麒麟操作系统
       硬件准备 1.M.2.ssd硬盘(最好大于等于128G);2.迅为LS3A5000开发板;  3.U盘(需大于8g),制作启动盘使用;4.hdmi显示器;5.搭载linux环境的计算机。   安装步骤1制作启动盘我们首先使用命令查看U盘挂载节点位置。我们将iso镜像放入linux操作系统中,使用dd命令......
  • 怎么快速学会操作mac?
    学会操作Mac的关键是熟悉macOS操作系统的各种功能和特性。下面是一些快速学习如何操作Mac的技巧:熟悉macOS界面:了解菜单栏、Dock和Finder窗口等基本界面元素。掌握常用快捷键:像Command+C、Command+V、Command+W等常用系统级别的快捷键,可以大大提高你使用电脑的效率。学......
  • 最新版本的操作系统DiskStation Manager 7.2
        数据安全问题是大数据时代备受关注的领域,而针对这些问题,群晖科技发布最新的操作系统,以便解决各种数据管理难题。据悉,群晖科技发布最新版本的操作系统DiskStationManager7.2(DSM7.2),而这次更新为企业的数据管理提供了一系列新功能和改进,旨在解决各种数据管理挑战。......
  • UOS操作系统添加右键刷新功能
    一、需求   国产化已经势不可挡,为了UOS操作系统也能有windows的操作体验,让大家继续有右键刷新的体验,今天给大家分享一下UOS的右键刷新功能添加二、操作步骤2.1进入开发者模式   设置进入开发者模式  2.2添加刷新配置文件 2.3把刷新配置文件考到执行目录......
  • DDL操作数据表
    一个数据库中可以n个表,表中有字段(列)在建表的时候就需要定义字段,指定字段属性-字段数据类型+字段长度等复制表结构:like用法createtable course1 like course;查看表结构desc  course1;常用的数据类型:int整数double浮点型varchar字符串型date日期类型年月日注......
  • java函数式编程stream流操作lambda表达式使用方法引用用法等练习
    java函数式编程stream流操作lambda表达式使用方法引用用法等练习 @Testvoidtest01(){System.out.println("111");List<Author>authors=getAuthor();//stream流打对象中一个字段authors.stream().distinct().forEach(author......
  • Linux系统,使用PHP连接ORACLE数据库
    首先安装ORACLE的InstantClient环境:在这里下载因为我需要连接11.2.0.4.0版本的ORACLE数据库,所以选取下载了basic-11.2.0.4.0-1.x86_64.rpm和devel-11.2.0.4.0-1.x86_64.rpm,使用如下命令安装:rpm-ivhoracle-instantclient11.2-basic-11.2.0.4.0-1.x86_64.rpmrpm-ivhoracle......
  • 信创操作系统–麒麟Kylin桌面操作系统 (项目九 实用工具软件:截图、字体、扫描、打印机)
    截图(1)在桌面左下角的【开始菜单】中找到【截图】工具,如图1-1所示。图1-1打开【截图】工具(2)单击运行,即可拖动鼠标选择截图区域,如图1-2所示。图1-2选择截图区域(3)将鼠标放到所选择的区域内,拖动鼠标可以移动选择区域;将鼠标放到所选区域的边界线上,可以放大或缩小选择区域;如图1-3所示。......
  • 【操作系统入门到成神系列 七】Linux 内核 VS Window 内核
    ......
  • linux命令全部失效+无法登录 极限操作
    背景在kali上大概用了有两个月,这两个月可以说喜忧参半喜:kali系统优化的很好,不管是开机关机还是运行应用程序,都非常快,非常流畅,配合占用超小的xcfe桌面,爽飞忧:由于我是直接安装在物理机上的,原本是打算做开发和逆向用,但这个系统做开发和逆向,bug实在是太多,而且缺少很多必要的so文件(......