首页 > 编程语言 >PHP Amqp 封装类

PHP Amqp 封装类

时间:2023-05-26 14:57:48浏览次数:34  
标签:return Amqp exchange param queue flags 封装 PHP throws

  1 <?php
  2 
  3 /**
  4  *  使用示例
  5 #定义名称
  6 define('ExchangeName', 'exchange_name_15');
  7 define('QueueName',    'queue_name_15');
  8 define('RoutingKey',   'routing_key_15');
  9 
 10 if(count($argv) > 1){
 11 
 12     $amqp = new Amqp();
 13     $amqp->channel();
 14     $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
 15     $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
 16     $amqp->bindQueue(RoutingKey);
 17     $amqp->publish($argv[1], RoutingKey);
 18     $amqp->disconnect();
 19 
 20 }else{
 21 
 22     $amqp = new Amqp();
 23     $amqp->channel();
 24     $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
 25     $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
 26     $amqp->bindQueue(RoutingKey);
 27 
 28     //接收消息
 29     $amqp->consume(function($envelope, $queue){
 30     $msg = $envelope->getBody();
 31     echo $msg . "\n";
 32     }, AMQP_AUTOACK); //自动应答
 33 
 34 }
 35  */
 36 
 37 class Amqp extends AMQPConnection
 38 {
 39     
 40     protected $channel;
 41     protected $exchange;
 42     protected $queue;
 43 
 44     public function __construct($host='127.0.0.1', $port=5672, $user='guest', $passwd='guest', $vhost='/')
 45     {
 46         $credentials = is_array($host) ? $host : [
 47             'host'      => $host,
 48             'port'      => $port,
 49             'login'     => $user,
 50             'password'  => $passwd,
 51             'vhost'     => $vhost
 52         ];
 53         parent::__construct($credentials);
 54     }
 55 
 56 
 57     /**
 58      * 创建频道
 59      * @return AMQPChannel
 60      * @throws AMQPConnectionException
 61      */
 62     public function channel()
 63     {
 64         if (!$this->channel) {
 65             parent::connect();
 66             $this->channel = new \AMQPChannel($this);
 67         }
 68         return $this->channel;
 69     }
 70 
 71     /**
 72      * 创建交换器
 73      * @param string $name
 74      * @param string $type
 75      * @param null $flags
 76      * @return AMQPExchange
 77      * @throws AMQPConnectionException
 78      * @throws AMQPExchangeException
 79      */
 80     public function exchange($name='', $type='', $flags=null)
 81     {
 82         if (!$this->exchange) {
 83             $this->exchange = new \AMQPExchange($this->channel());
 84         }
 85         $name && $this->exchange->setName($name);
 86         $type && $this->exchange->setType($type);
 87         is_integer($flags) && $this->exchange->setFlags($flags);
 88 
 89         return $this->exchange;
 90     }
 91 
 92     /**
 93      * 创建队列
 94      * @param string $name
 95      * @param null $flags
 96      * @return AMQPQueue
 97      * @throws AMQPConnectionException
 98      * @throws AMQPQueueException
 99      */
100     public function queue($name='', $flags=null)
101     {
102         if (!$this->queue) {
103             $this->queue = new \AMQPQueue($this->channel());
104         }
105 
106         $name && $this->queue->setName($name);
107         is_integer($flags) && $this->queue->setFlags($flags);
108 
109         return $this->queue;
110     }
111 
112     /**
113      * 发布交换器
114      * @return bool
115      * @throws AMQPChannelException
116      * @throws AMQPConnectionException
117      * @throws AMQPExchangeException
118      */
119     public function declareExchange()
120     {
121         return $this->exchange()->declareExchange();
122     }
123 
124     /**
125      * 发布队列
126      * @return int
127      * @throws AMQPChannelException
128      * @throws AMQPConnectionException
129      * @throws AMQPQueueException
130      */
131     public function declareQueue()
132     {
133         return $this->queue()->declareQueue();
134     }
135 
136     /**
137      * 绑定队列
138      * @param string $routing_key
139      * @param array $arguments
140      * @return bool
141      * @throws AMQPChannelException
142      * @throws AMQPConnectionException
143      * @throws AMQPExchangeException
144      * @throws AMQPQueueException
145      */
146     public function bindQueue($routing_key='', $arguments=[])
147     {
148         return $this->queue()->bind($this->exchange()->getName(), $routing_key, $arguments);
149     }
150 
151 
152     /**
153      * 发布消息
154      * @param callable|null $callback
155      * @param int $flags
156      * @param null $consumerTag
157      * @return bool
158      * @throws AMQPChannelException
159      * @throws AMQPConnectionException
160      * @throws AMQPExchangeException
161      */
162     public function publish($message, $routing_key=null, $flags=AMQP_NOPARAM, array $attributes=[])
163     {
164         return $this->exchange()->publish($message, $routing_key, $flags, $attributes);
165     }
166 
167     public function consume(callable $callback=null, $flags=AMQP_NOPARAM, $consumerTag = null)
168     {
169         $this->queue()->consume($callback, $flags, $consumerTag);
170     }
171 
172     /**
173      * 删除交换器
174      * @param null $name
175      * @param int $flags
176      * @throws AMQPChannelException
177      * @throws AMQPConnectionException
178      * @throws AMQPExchangeException
179      */
180     public function deleteExchange($name = null, $flags=AMQP_NOPARAM)
181     {
182         $this->exchange()->delete($name, $flags);
183     }
184 
185     /**
186      * 删除队列
187      * @param int $flags
188      * @throws AMQPChannelException
189      * @throws AMQPConnectionException
190      * @throws AMQPQueueException
191      */
192     public function deleteQueue($flags=AMQP_NOPARAM)
193     {
194         $this->queue()->delete($flags);
195     }
196 }

 

标签:return,Amqp,exchange,param,queue,flags,封装,PHP,throws
From: https://www.cnblogs.com/zbseoag/p/17434724.html

相关文章

  • 超低功耗段码LCD液晶显示屏驱动IC-VKL144A/B QFN48 超小体积封装,可完全替代PCF8551适
    VKL144A/B概述:VKL144A/B是一个点阵式存储映射的LCD驱动器,可支持最大144点(36SEGx4COM)的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据,可配置4种功耗模式,也可通过关显示和关振荡器进入省电模式。其高抗干扰,低功耗的特性适用于水电气表以及工控仪表类产品。QT921功能特点......
  • PHP 反射机制打印对象
    1<?php2/**34echoObj::new('Redis');5echoObj::new('Redis')->method('set')?->isPublic();67*/89classObjextendsReflectionClass10{11publicstaticfunctionstd(?array$attrs=null)1......
  • elementplus vue3 ts 单图预览封装
    <divclass="demo-image__preview"><el-imagestyle="width:50px":src="props.image":zoom-rate="1.2":preview-src-list="[props.image]":initial-index="4&quo......
  • Graphpad Prism9.5.1 科学绘图软件 下载安装教程(含Win/Mac)
    软件下载AutodeskFlame2024formac软件介绍AutodeskFlame破解版由是Autodesk公司(译作欧特克)开发的一款高端电影剪辑和特效制作系统。Flame是用于高速合成、高级图形和客户驱动的交互设计的终极视觉特效制作系统。AutodeskFlame提供出色的性能和荣获奥斯卡奖的工具。从......
  • php语言开发宝塔部署thinkphp考试平台管理系统
    技术架构PHP7.2+Thinkphp6+React+UmiJs+nginx+mysql5.7cnetos7以上+宝塔面板搭建教程1.下载源码,宝塔添加一个站点,PHP版本选择7.2。2.压缩codes后台代码目录,上传后台代码到宝塔的网站根目录当中解压缩,关闭防跨站,运行目录指向public,伪静态选择thinkphp,设置为:locat......
  • Dapper-OracleSQLHelper 的通用封装
    ///<summary>///OracleHelp封装使用///</summary>///<typeparamname="T"></typeparam>publicclassOracleSQLDapperHelper<T>whereT:class{///<summary>///数据库连接字符串......
  • PHP中的AMQP类
    PHP中的AMQP类标签(空格分隔):php,amqp官网地址:http://docs.php.net/manual/da/book.amqp.phpAMQPConnectionAMQPConnection{/*Methods*///与AMQP代理建立连接publicboolconnect(void)//构造函数$credentials可以理解为配置项的数组//$co......
  • PHP操作RabbitQM
    PHP操作RabbitQM标签(空格分隔):amqp,phpFanoutExchange多个queue都可以绑定到exchange,并且每个queue都会接受到消息producer(client.php)$exchangeName="exchange_01";try{$connection=newAMQPConnection();$connection->setHost('127.0.0.1');$......
  • 用go封装一下封禁功能
    用go封装一下封禁功能本篇为用go设计开发一个自己的轻量级登录库/框架吧-秋玻-博客园(cnblogs.com)的封禁业务篇,会讲讲封禁业务的实现,给库/框架增加新的功能。源码:https://github.com/weloe/token-go思路封禁业务也是在一般项目中比较常见的业务。我们也将它封装在库中作......
  • GB28181-2022中的封装编码要求
    术语: GB28181的传输要求:   国标协议的封装和编码要求:注意国标GB28181只支持RTP+PS;尽管RTP内的内容可以是PS/TS/ES,但是国标协议传输的只是RTP+PS,PS封装的编码类型可以有多种; 国标码流RTP-PS内部的一些参数【PSM,PT等】:  如果不限制国标流,RTP内部可以是PS......