首页 > 其他分享 >RabbitMQ RPC 实现

RabbitMQ RPC 实现

时间:2022-09-30 11:23:53浏览次数:52  
标签:false nil err 实现 fmt RabbitMQ queue RPC

  RPC,是 Remote Procedure Call 的简称,即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。RPC 的主要功用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。   一般在 RabbitMQ 中进行 RPC 是很简单的。客户端发送请求消息,服务端回复响应的消息。为了客户端接收响应的消息,需要在请求消息中发送一个回调队列,在消息的 replyTo 属性中指定。为每个 RPC 请求创建一个回调队列是非常低效的,所以通常为每个客户端创建一个单一的回调队列。为了区分同一个回调队列中的不同请求,需要在消息的 correlationId 属性中设置当前请求的唯一识别ID。 RPC server 代码示例:

package main

import (
    "fmt"

    "github.com/streadway/amqp"
)

func main() {
    // connect to rabbitmq
    conn, err := amqp.Dial("amqp://root:shiajun666@192.168.10.4:5672")
    if err != nil {
        fmt.Println("Connect to RabbitMQ failed: ", err)
        return
    }
    defer conn.Close()

    // open a channel
    ch, err := conn.Channel()
    if err != nil {
        fmt.Println("Open channel failed: ", err)
        return
    }
    defer ch.Close()

    // declare rpc request message queue
    _, err = ch.QueueDeclare(
        "rpc_queue", //name
        true,        //durable
        false,       //autoDelete
        false,       //exclusive
        false,       //noWait
        nil,         //args
    )
    if err != nil {
        fmt.Println("Declare rpc request message queue failed: ", err)
        return
    }

    // consume request messages
    reqCh, err := ch.Consume(
        "rpc_queue", //queue
        "",          //consumer
        false,       //autoAck
        false,       //exclusive
        false,       //noLocal
        false,       //noWait
        nil,         //args
    )
    if err != nil {
        fmt.Println("Consume request messages failed: ", err)
        return
    }
    
    // deal with requests
    go func() {
        for req := range reqCh {
            fmt.Printf("Receive request[%s]: %s", req.CorrelationId, string(req.Body))

            // send response
            err = ch.Publish(
                "",                //exchange
                req.ReplyTo,       //key
                false,             //mandatory
                false,             //immediate
                amqp.Publishing{
                    ContentType:   "text/plain",
                    CorrelationId: req.CorrelationId,
                    Body:          []byte(fmt.Sprintf("this is the response messsge for request %s", req.CorrelationId)),
                },
            )
            // send ack
            req.Ack(false)
        }
    }()
    
    for {
        select {}
    }
}
RPC client 代码示例:
package main

import (
    "fmt"

    "github.com/streadway/amqp"
)

func main() {
    // connect to rabbitmq
    conn, err := amqp.Dial("amqp://root:shiajun666@192.168.10.4:5672")
    if err != nil {
        fmt.Println("Connect to RabbitMQ failed: ", err)
        return
    }
    defer conn.Close()

    // open a channel
    ch, err := conn.Channel()
    if err != nil {
        fmt.Println("Open channel failed: ", err)
        return
    }
    defer ch.Close()

    // declare rpc response message queue
    q, err := ch.QueueDeclare(
        "",          //name
        true,        //durable
        false,       //autoDelete
        false,       //exclusive
        false,       //noWait
        nil,         //args
    )
    if err != nil {
        fmt.Println("Declare rpc response message queue failed: ", err)
        return
    }

    // consume response messages
    respCh, err := ch.Consume(
        q.Name,      //queue
        "",          //consumer
        false,       //autoAck
        false,       //exclusive
        false,       //noLocal
        false,       //noWait
        nil,         //args
    )
    if err != nil {
        fmt.Println("Consume response messages failed: ", err)
        return
    }

    // send request to rpc request message queue
    corrId := "1234567890abcdefghigklmnopqrstuv"
    err = ch.Publish(
        "",                //exchange
        "rpc_queue",       //key
        false,             //mandatory
        false,             //immediate
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: corrId,
            ReplyTo:       q.Name,
            Body:          []byte(fmt.Sprintf("this is the request message for request %s", corrId)),
        },
    )
    
    // deal with responses
    go func() {
        for resp := range respCh {
            fmt.Printf("Receive response for request[%s]: %s", resp.CorrelationId, string(resp.Body))
            // send ack
            resp.Ack(false)
        }
    }()
    
    for {
        select {}
    }
}
  以上代码示例的大致流程为: (1)RPC server 声明了一个名为 rpc_queue 的队列,用于存储 rpc 请求消息,该队列没有显式绑定交换器,直接使用 RabbitMQ 的默认交换器 "",然后调用 Channel.Consume() 开始接收请求消息; (2)RPC client 声明了一个回调队列,队列名称由 RabbitMQ 随机生成,也是直接使用 RabbitMQ 的默认交换器 "",然后调用 Channel.Consume() 开始接收返回消息; (3)RPC client 向请求队列 rpc_queue 发送请求消息,在消息的 replyTo 属性中设置回调队列的名称,在消息的 correlationId 属性中设置请求的唯一标识ID; (4)RPC server 收到请求消息后,向消息的 replyTo 属性中指定的回调队列发送返回信息,在消息的 correlationId 属性中设置当前请求的唯一标识ID; (5)RPC client 收到返回信息,根据消息中的 correlationId 属性将返回与请求对应起来。 注:   以上代码示例中,由于 RPC 请求消息队列只在 RPC server 中声明,故而应该先运行 RPC server 再运行 RPC client。RPC 请求消息队列和 RPC 响应消息队列都直接使用了 RabbitMQ 默认交换器,所以 Routing Key 为各自队列名称。   大致流程图:

 

标签:false,nil,err,实现,fmt,RabbitMQ,queue,RPC
From: https://www.cnblogs.com/wujuntian/p/16744342.html

相关文章

  • 基于AM5728 DSP+ARM平台实现无线地磁车辆检测网关
     一、 无线 地磁检测技术原理在没有外物扰动的情况下,地球磁场处于一个相对稳定的状态,当有物体经过或停靠在上方时,磁场值就会发生一些细微的变化,金属物体对磁场的扰动相......
  • 【C语言练习_2】用C语言实现凯撒密码加密解密
    1.凯撒密码简介又叫循环移位密码.它的加密方法是将明文中的每个字母用此字符在字母表中后面第k个字母替代.它的加密过程可以表示为下面的函数:E(m)=m+k(modn)其中:m为明文字......
  • B/S大文件(百M以上)的上传下载实现技术
    ​文件夹数据库处理逻辑public class DbFolder{    JSONObjectroot;       public DbFolder()    {        this.root= new JSONOb......
  • 【排序】快速排序C++实现总结
    一、算法步骤快速排序的基本思想是:1.先从数列中取出一个数作为基准数。2.分区过程,将比这个数大的数全放到它的右边,小于或等于它的数全放到它的左边。3.再对左右区间重复第二步,......
  • 1588分析和实现总纲
    1588博客总体分为两部分:协议分析和协议实现。1588协议有一定难度,主要是因为它涉及的面较广,同步算法较为复杂。(一)协议分析主要基于《IEEE1588-2008》分析1588协议中的一些重......
  • 基于OMAP-L138 DSP+ARM处理器与FPGA实现SDR软件无线电系统
    信迈公司的某客户需要针对多个应用开发一个扩频无线电收发器。该客户已经开发出一套算法,准备用于对信号进行调制和解调,但他们却缺少构建完整系统的资源和专业知识。客户希望......
  • 水声通信软件无线电OMAP平台的硬件设计与实现
    水声通信作为水下唯一远距离无线通信方式,是实现水下综合信息感知与信息交互的主要手段。而水声信道的时延、多普勒双重扩展以及时变特性给高速率高可靠性通信带来了极大的......
  • 基于C6748 DSP+FPGA MIMO雷达验证系统模块化设计与实现
    MIMO雷达和传统雷达不同,因为其本身特有的优点,使得这些年很多科研人员对其进行研究。MIMO雷达的优点是能够不受天线大小的束缚,有着很高的方位分辨率。实现了MIMO雷达验证......
  • 基于OMAPL138 +FPGA 48通道采集器的设计与实现
    当今局势下,世界人口形势进一步加剧,由于陆地资源和环境的压力,海洋客观上已成为世界后备资源基地及某些主要战略资源的接替区。人类为了更加深入的探索海洋,在水声领域引入......
  • AM5728 Opencl 案例汇总:实现sobel算法,计算向量和,矩阵转置
    案例一:实现sobel算法OpenCV(Open Source Computer Vision Library)是一个基于BSD许可开源发行的跨平台计算机视觉库。实现图像处理和计算机视觉方面的很多通用计算。......