RPC,是 Remote Procedure Call 的简称,即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。RPC 的主要功用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。 一般在 RabbitMQ 中进行 RPC 是很简单的。客户端发送请求消息,服务端回复响应的消息。为了客户端接收响应的消息,需要在请求消息中发送一个回调队列,在消息的 replyTo 属性中指定。为每个 RPC 请求创建一个回调队列是非常低效的,所以通常为每个客户端创建一个单一的回调队列。为了区分同一个回调队列中的不同请求,需要在消息的 correlationId 属性中设置当前请求的唯一识别ID。 RPC server 代码示例:
package main import ( "fmt" "github.com/streadway/amqp" ) func main() { // connect to rabbitmq conn, err := amqp.Dial("amqp://root:shiajun666@192.168.10.4:5672") if err != nil { fmt.Println("Connect to RabbitMQ failed: ", err) return } defer conn.Close() // open a channel ch, err := conn.Channel() if err != nil { fmt.Println("Open channel failed: ", err) return } defer ch.Close() // declare rpc request message queue _, err = ch.QueueDeclare( "rpc_queue", //name true, //durable false, //autoDelete false, //exclusive false, //noWait nil, //args ) if err != nil { fmt.Println("Declare rpc request message queue failed: ", err) return } // consume request messages reqCh, err := ch.Consume( "rpc_queue", //queue "", //consumer false, //autoAck false, //exclusive false, //noLocal false, //noWait nil, //args ) if err != nil { fmt.Println("Consume request messages failed: ", err) return } // deal with requests go func() { for req := range reqCh { fmt.Printf("Receive request[%s]: %s", req.CorrelationId, string(req.Body)) // send response err = ch.Publish( "", //exchange req.ReplyTo, //key false, //mandatory false, //immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: req.CorrelationId, Body: []byte(fmt.Sprintf("this is the response messsge for request %s", req.CorrelationId)), }, ) // send ack req.Ack(false) } }() for { select {} } }RPC client 代码示例:
package main import ( "fmt" "github.com/streadway/amqp" ) func main() { // connect to rabbitmq conn, err := amqp.Dial("amqp://root:shiajun666@192.168.10.4:5672") if err != nil { fmt.Println("Connect to RabbitMQ failed: ", err) return } defer conn.Close() // open a channel ch, err := conn.Channel() if err != nil { fmt.Println("Open channel failed: ", err) return } defer ch.Close() // declare rpc response message queue q, err := ch.QueueDeclare( "", //name true, //durable false, //autoDelete false, //exclusive false, //noWait nil, //args ) if err != nil { fmt.Println("Declare rpc response message queue failed: ", err) return } // consume response messages respCh, err := ch.Consume( q.Name, //queue "", //consumer false, //autoAck false, //exclusive false, //noLocal false, //noWait nil, //args ) if err != nil { fmt.Println("Consume response messages failed: ", err) return } // send request to rpc request message queue corrId := "1234567890abcdefghigklmnopqrstuv" err = ch.Publish( "", //exchange "rpc_queue", //key false, //mandatory false, //immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(fmt.Sprintf("this is the request message for request %s", corrId)), }, ) // deal with responses go func() { for resp := range respCh { fmt.Printf("Receive response for request[%s]: %s", resp.CorrelationId, string(resp.Body)) // send ack resp.Ack(false) } }() for { select {} } }以上代码示例的大致流程为: (1)RPC server 声明了一个名为 rpc_queue 的队列,用于存储 rpc 请求消息,该队列没有显式绑定交换器,直接使用 RabbitMQ 的默认交换器 "",然后调用 Channel.Consume() 开始接收请求消息; (2)RPC client 声明了一个回调队列,队列名称由 RabbitMQ 随机生成,也是直接使用 RabbitMQ 的默认交换器 "",然后调用 Channel.Consume() 开始接收返回消息; (3)RPC client 向请求队列 rpc_queue 发送请求消息,在消息的 replyTo 属性中设置回调队列的名称,在消息的 correlationId 属性中设置请求的唯一标识ID; (4)RPC server 收到请求消息后,向消息的 replyTo 属性中指定的回调队列发送返回信息,在消息的 correlationId 属性中设置当前请求的唯一标识ID; (5)RPC client 收到返回信息,根据消息中的 correlationId 属性将返回与请求对应起来。 注: 以上代码示例中,由于 RPC 请求消息队列只在 RPC server 中声明,故而应该先运行 RPC server 再运行 RPC client。RPC 请求消息队列和 RPC 响应消息队列都直接使用了 RabbitMQ 默认交换器,所以 Routing Key 为各自队列名称。 大致流程图:
标签:false,nil,err,实现,fmt,RabbitMQ,queue,RPC From: https://www.cnblogs.com/wujuntian/p/16744342.html