PHP操作RabbitQM
标签(空格分隔): amqp,php
Fanout Exchange
多个queue都可以绑定到exchange,并且每个queue都会接受到消息
producer(client.php)
$exchangeName = "exchange_01";
try {
$connection = new AMQPConnection();
$connection->setHost('127.0.0.1');
$connection->setPort(5672);
$connection->setVhost("/");
$connection->setLogin('guest');
$connection->setPassword('guest');
$isConnect = $connection->connect();
if (!$isConnect) {
exit("connect rabbitmq error\n");
}
} catch (AMQPConnectionException $e) {
exit("connect rabbitmq error\n");
}
$channel = new AMQPChannel($connection);
try {
$exchange = new AMQPExchange($channel);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setName($exchangeName);
$exchange->declareExchange();
for ($i = 0; $i < 5; $i++) {
$bool = $exchange->publish("hello world ". $i);
echo $bool . "\n";
}
$connection->disconnect();
} catch(Exception $e) {
exit("rabbitmq error: " . $e->getMessage() . PHP_EOL);
}
consumer(server_1.php)
<?php
$exchangeName = "exchange_01";
$queueName = "queue_01";
$options = [
'host' => '127.0.0.1',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
$isConnect = $connect->connect();
if (!$isConnect) {
exit("connect rabbitmq error");
}
} catch (AMQPConnectionException $e) {
exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);
$queue = new \AMQPQueue($chan);
// 设置队列名称
$queue->setName($queueName);
// 声明队列
$queue->declareQueue();
// 绑定队列到指定的交换机
$queue->bind($exchangeName);
print "queue: {$queueName} " . PHP_EOL;
while (true) {
$queue->consume('processMessage');
}
function processMessage(\AMQPEnvelope $envelope, \AMQPQueue $queue) {
echo "Message: " . $envelope->getBody() . " \n";
$queue->ack($envelope->getDeliveryTag());
}
consumer(server_2)
<?php
// 不通的queue可以消费
$exchangeName = "exchange_01";
$queueName = "queue_02";
$options = [
'host' => '127.0.0.1',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
$isConnect = $connect->connect();
if (!$isConnect) {
exit("connect rabbitmq error");
}
} catch (AMQPConnectionException $e) {
exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);
$queue = new \AMQPQueue($chan);
// 设置队列名称
$queue->setName($queueName);
// 声明队列
$queue->declareQueue();
// 绑定队列到指定的交换机
$queue->bind($exchangeName);
print "queue: {$queueName} " . PHP_EOL;
while (true) {
$queue->consume('processMessage');
}
function processMessage(\AMQPEnvelope $envelope, \AMQPQueue $queue) {
echo "Message: " . $envelope->getBody() . " \n";
$queue->ack($envelope->getDeliveryTag());
}
DIRECT TYPE
direct 有点像一对一处理
producer (client.php)
$exChangeName = "exchange_02";
$queueName = 'queue_02';
$options = [
'host' => '127.0.0.1',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
$isConnect = $connect->connect();
if (!$isConnect) {
exit("connect rabbitmq error");
}
} catch (AMQPConnectionException $e) {
exit("connect rabbitmq error");
}
try {
$chan = new \AMQPChannel($connect);
$ex = new \AMQPExchange($chan);
$ex->setName($exChangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE); // 持久化
$ex->declareExchange();
// 队列
$queue = new \AMQPQueue($chan);
$queue->setName($queueName);
$queue->declareQueue(); // 生成队列
$queue->setFlags(AMQP_DURABLE); // 持久化
$queue->bind($exChangeName, $queueName); // 绑定
for ($i=0; $i<2;$i++) {
$bool = $ex->publish("HELLO WORLD" . $i, $queueName, AMQP_NOPARAM, ['delivery_mode' => 2]);
echo $bool . PHP_EOL;
}
} catch (Exception $e) {
var_dump($e->getMessage());
}
consumer(server.php)
$exChangeName = "exchange_02";
$queueName = "queue_02";
$options = [
'host' => '127.0.0.1',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
];
$connect = new \AMQPConnection($options);
try {
$isConnect = $connect->connect();
if (!$isConnect) {
exit("connect rabbitmq error");
}
} catch (AMQPConnectionException $e) {
exit("connect rabbitmq error");
}
$chan = new \AMQPChannel($connect);
$queue = new \AMQPQueue($chan);
// 设置队列名称
$queue->setName($queueName);
// 声明队列
$queue->declareQueue();
$queue->setFlags(AMQP_DURABLE);
$queue->bind($exChangeName, $queueName); // 绑定
// 绑定队列到指定的交换机
print "queue: $queueName " . PHP_EOL;
while (true) {
$queue->consume('processMessage');
// $queue->consume('processMessage', AMQP_AUTOACK);// 自动ack
}
function processMessage(\AMQPEnvelope $envelope, \AMQPQueue $queue) {
echo "Message: " . $envelope->getBody() . " \n";
// 手动ack
$bool = $queue->ack($envelope->getDeliveryTag());
}
标签:RabbitQM,rabbitmq,queue,exit,connect,new,操作,PHP,queueName
From: https://www.cnblogs.com/yanweifeng/p/17431412.html