// ------- 延迟队列 php 需要安装好几个扩展 php 生产者
$connection = new AMQPStreamConnection('xxxxx', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 发送消息到交换机,并设置 x-delay 属性
$messageData = ['msg' => json_encode(['a'=>111])];
$messageBody = json_encode($messageData);
$headers = ['x-delay' => 10000]; // 10 秒延迟
$message = new AMQPMessage($messageBody, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => $headers['x-delay']])
]);
// 推送延迟消息到延迟交换机
$channel->basic_publish($message, 'delayed-exchange', 'delayed-queue');
// 关闭连接
$channel->close();
$connection->close(); // php 消费者 单独一个文件 php -d display_errors=on mq.php 这个命令可以查看具体报错信息等错误 <?php
require __DIR__ . '/vendor/autoload.php';
// 消费者进行消费 需要这个扩展 "php-amqplib/php-amqplib": "*",
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('xxxxx', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('delayed-queue', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delayed-queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
// java 消费者 @RabbitListener(queues = "delayed-queue")
public void receiveDelayedMessage(Map<String, String> message) {
String msg = message.get("msg");
System.out.println("Received delayed message: " + msg);
} 标签:false,delayed,rabbitmq,connection,msg,java,php,channel From: https://www.cnblogs.com/huliangqing/p/17585807.html