rabbitmq数据不丢失需要满足以下几点:(开启持久化后rabbitmq性能会下降)
- 生产者confirm 消息确认机制
- rabbitmq的交换机,队列,消息设置为持久化
- 关闭消费者的自动ack换为手动ack
1 public function producer(){ 2 $exchange = "topic-text"; 3 $type = "topic"; 4 $msg = [11111]; 5 $routerKey = "topic.info"; 6 $queue_name = "hyperf-t"; 7 // 创建rabbitmq连接 8 $config = [ 9 'host' => env('AMQP_IP'), 10 'port' => env('AMQP_PORT'), 11 'user' => env('AMQP_USER'), 12 'password' => env('AMQP_PASS'), 13 ]; 14 $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], env('AMQP_VHOST')); 15 // 创建Channel 16 $channel = $connection->channel(); 17 //设置一个消息没有处理完成之前不会给同一个消费者发送第二个 18 $channel->basic_qos(0, 1,true); 19 //设置发送端必须ack应答 20 $channel->confirm_select(); 21 //成功回调 22 $channel->set_ack_handler(function (AMQPMessage $message){ 23 echo "这是成功回调".$message->body; 24 file_put_contents("1.txt","这是成功回调".$message->body) ; 25 }); 26 //失败回调 27 $channel->set_nack_handler(function (AMQPMessage $message){ 28 echo "这是失败回调".$message->body; 29 file_put_contents("2.txt","这是失败回调".$message->body) ; 30 }); 31 // 声明交换机 32 $channel->exchange_declare( 33 $exchange, // 交换机名,需要唯一,不能重复 34 $type, // 交换机类型 35 false, 36 true, // 是否持久化 37 false 38 ); 39 //生命和绑定队列 40 /* 41 * 参数说明: 42 * $queue:(必需)队列的名称。 43 * $passive:(可选)如果设置为 true,则仅返回有关队列是否存在的信息,不创建队列。默认为 false。 44 * $durable:(可选)如果设置为 true,则队列将在服务器重启后持久存在。默认为 false。 45 * $exclusive:(可选)如果设置为 true,则队列将仅对首次声明它的连接可见,并在连接断开时自动删除。默认为 false。 46 * $auto_delete:(可选)如果设置为 true,则当所有消费者都已离开队列时,队列将自动被删除。默认为 false。 47 * $nowait:(可选)如果设置为 true,则不会等待服务器的响应。默认为 false。 48 * $arguments:(可选)一个关联数组,包含了队列的其他参数,例如:x-message-ttl 或 x-expires 等。 49 * $ticket:(可选)可用于确认持久化队列的创建。 50 */ 51 $channel->queue_declare($queue_name, false, true, false, false); 52 53 $channel->queue_bind($queue_name, $exchange,$routerKey); 54 // 消息对象并设置持久化,参数是消息内容 55 $msg = new AMQPMessage(json_encode($msg), ['delivery_mode' => 2]); 56 57 // 注意第三个参数,路由参数 58 $channel->basic_publish( 59 $msg, // 消息对象 60 $exchange, // 交换机名字 61 $routerKey// 路由参数,可以根据需求,任意定义。 62 ); 63 //等待rabbitmq返回消息 64 $channel->wait_for_pending_acks(); 65 // 释放资源 66 $channel->close(); 67 $connection->close(); 68 }
标签:queue,false,Ack,Rabbitmq,发送者,队列,message,true,channel From: https://www.cnblogs.com/yue97843/p/18149640