首页 > 其他分享 >go-RabbitMQ

go-RabbitMQ

时间:2023-05-31 12:01:09浏览次数:45  
标签:false err exchange 队列 rabbitmq RabbitMQ go true

erlang 安装

编译依赖:yum install make gcc gcc-c++ build-essential openssl openssl-devel unixODBC unixODBC-devel kernel-devel m4 ncurses-devel

解压:tar -zxvf

创建存放环境目录:mkdir /opt/rabbitMq/erlang

进入 erlang 解压目录执行命令:./configure --prefix=/opt/rabbitmq/erlang --without-javac

安装:make && make install

配置环境变量:vim /etc/profile

#set erlang environment
export PATH=$PATH:/路径和上面黄色部分对应/erlang/bin

刷新配置文件:source profile

测试安装是否成功:erl

RabbitMQ 安装

安装:rpm -ivh --nodeps rabbitmq-server-xxx.noarch.rpm

启动:rabbitmq-server start &

// 如果没有权限,使用 find / -name .erlang.cookie,找到后授权
find / -name .erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie

停止:rabbitmqctl stop

插件添加:rabbitmq-plugins enable {插件名}

插件卸载:rabbitmq-plugins disable {插件名}

查看插件:rabbitmq-plugins list

rabbitmq_management 为 web 管理端插件

本地的话可以使用 username:guest password:guest 进行登录,但是远程就不行了,需要新建用户(必须是 rabbitmq 启动状态下才能新建)

# 第一步:添加 admin 用户并设置密码
rabbitmqctl add_user admin 1234

# 第二步:添加 admin 用户为 administrator 角色
rabbitmqctl set_user_tags admin administrator

# 第三步:设置 admin 用户的权限,指定允许访问的 vhost 以及 write/read
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

# 第四步:查看 vhost (/) 允许哪些用户访问
rabbitmqctl list_permissions -p /

# 第五步:查看用户列表
rabbitmqctl list_users

# 第六步:重启 RabbitMQ,使用设置好的账户和密码登录。

go -> RabbitMQ

package rabbitmq

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

// 连接信息标准格式: amqp://用户名:密码@ RabbitMQ 服务器名称:RabbitMQ 所在服务器的端口/virtual hosts
const MQURL = "amqp://admin:admin@[email protected]:5672/mq1"

type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	QueueName string // 队列名称
	Exchange  string // 交换机
	Key       string // key
	MQURL     string // 连接信息
}

// NewRabbitMQ - 创建 RabbitMQ 结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{
		QueueName: queueName,
		Exchange:  exchange,
		Key:       key,
		MQURL:     MQURL,
	}
	var err error
	// 创建 RabbitMQ 连接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MQURL)
	rabbitmq.failOnErr(err, "创建连接错误")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "获取 channel 失败")
	return rabbitmq
}

// Destory - 断开 channel 和 connection
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

// failOnErr - 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", message, err)
		panic(fmt.Sprintf("%s:%s", message, err))
	}
}

Simple

最简单常用的工作模式。

![[Simple.png]]

一个生产者对应一个消费者

获取实例

// NewRabbitMQSimple - 创建 Simple 模式下的 RabbitMQ 实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

生产者

// PublishSimple - Smiple 模式下的生产者
func (r *RabbitMQ) PublishSimple(message string) {
	// 1. 申请队列,找不到就创建,存在就跳过创建
	// 保证队列存在,消息能放到队列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // 是否持久化
		false, // 是否自动删除
		false, // 是否具有排他性
		false, // 是否阻塞
		nil,   // 额外属性
	)

	if err != nil {
		fmt.Println(err)
		return
	}

	// 2. 发送消息到队列中
	r.channel.Publish(
		r.Exchange,  // 交换机
		r.QueueName, // 队列名称
		false,       // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false,       // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// ConsumeSimple - Smiple 模式下的消费者
func (r *RabbitMQ) ConsumeSimple() {
	// 1. 申请队列,找不到就创建,存在就跳过创建
	// 保证队列存在,消息能放到队列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // 是否持久化
		false, // 是否自动删除
		false, // 是否具有排他性
		false, // 是否阻塞
		nil,   // 额外属性
	)

	if err != nil {
		fmt.Println(err)
		return
	}

	// 2. 接收消息
	msgs, err := r.channel.Consume(
		r.QueueName, // 队列名称
		"",          // 用来区分多个消费者
		true,        // 是否自动应答
		false,       // 是否具有排他性
		false,       // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,       // 队列消费是否阻塞
		nil,         // 额外属性
	)

	if err != nil {
		fmt.Println(err)
		return
	}

	forever := make(chan bool)
	// 启用协程处理消息
	go func() {
		for d := range msgs {
			// 实现我们要处理的逻辑判断
			fmt.Printf("Received a message: %s\n", d.Body)
		}
	}()

	log.Printf("[*] Waiting for messages, To exit press CTRL + C")
	<-forever
}

示例

mainSimplePublish

package main

import (
	"fmt"
	"rabbitmq-base/rabbitmq"
)

func main() {
	r := rabbitmq.NewRabbitMQSimple("simple")
	r.PublishSimple("aaaa")
	fmt.Println("发送成功")
}

mainSimpleRecieve

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQSimple("simple")
	r.ConsumeSimple()
}

Work

工作模式

一个消息只能被一个消费者获取,也就是消息只能被消费一次。

一个生产者对应多个消费者。

![[Work.png]]

与 Simple 模式最大的区别就是多个消费者

Publish/Subscribe

订阅模式

消息被路由投递给多个队列,一个消息被多个消费者获取。

![[Publish&Subscribe.png]]

获取实例

// NewRabbitMQPubSub - 创建订阅模式下的 RabbitMQ 实例
func NewRabbitMQPubSub(exchange string) *RabbitMQ {
	return NewRabbitMQ("", exchange, "")
}

生产者

// PublishPub - 订阅模式下的生产者
func (r *RabbitMQ) PublishPub(message string) {
	// 1. 尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"fanout",   // 交换机类型,fanout 是广播类型
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 发送消息
	err = r.channel.Publish(
		r.Exchange,
		"",
		false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// RecieveSub - 订阅模式下的消费者
func (r *RabbitMQ) RecieveSub() {
	// 1. 试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"fanout",   // 交换机类型,fanout 是广播类型
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
	q, err := r.channel.QueueDeclare(
		"",    // 随机生产队列名称
		false, // 是否持久化
		false, // 自动删除
		true,  // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)

	r.failOnErr(err, "Failed to declare a queue")

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		"", // 在 Pub/Sub 模式下,这里的 key 要为空
		r.Exchange,
		false, // 是否阻塞
		nil,   // 额外属性
	)

	// 3. 消费消息
	message, err := r.channel.Consume(
		q.Name, // 队列名称
		"",     // 用来区分多个消费者
		true,   // 是否自动应答
		false,  // 是否具有排他性
		false,  // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,  // 队列消费是否阻塞
		nil,    // 额外属性
	)

	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL + C")
	<-forever
}

示例

mainPub

package main

import (
	"fmt"
	"rabbitmq-base/rabbitmq"
	"strconv"
	"time"
)

func main() {
	r := rabbitmq.NewRabbitMQPubSub("exchange1")
	for i := 0; i < 100; i++ {
		r.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")
		fmt.Println("第" + strconv.Itoa(i) + "条消息")
		time.Sleep(1 * time.Second)
	}
}

mainSub1

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQPubSub("exchange1")
	r.RecieveSub()
}

mainSub2

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQPubSub("exchange1")
	r.RecieveSub()
}

Routing

路由模式

一个消息被多个消费者获取。并且消息的目标队列可被生产者指定。

![[Routing.png]]

获取实例

// NewRabbitMQRouting - 创建路由模式下的 RabbitMQ 实例
func NewRabbitMQRouting(exchange string, routingKey string) *RabbitMQ {
	return NewRabbitMQ("", exchange, routingKey)
}

生产者

// PublishRouting - 路由模式下的生产者
func (r *RabbitMQ) PublishRouting(message string) {
	// 1. 尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"direct",   // 交换机类型,direct
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 发送消息
	err = r.channel.Publish(
		r.Exchange,
		r.Key, // 设置 routingKey
		false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// RecieveRouting - 路由模式下的消费者
func (r *RabbitMQ) RecieveRouting() {
	// 1. 试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"direct",   // 交换机类型,direct
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
	q, err := r.channel.QueueDeclare(
		"",    // 随机生产队列名称
		false, // 是否持久化
		false, // 自动删除
		true,  // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)

	r.failOnErr(err, "Failed to declare a queue")

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		r.Key, // 需要绑定 routingKey
		r.Exchange,
		false, // 是否阻塞
		nil,   // 额外属性
	)

	// 3. 消费消息
	message, err := r.channel.Consume(
		q.Name, // 队列名称
		"",     // 用来区分多个消费者
		true,   // 是否自动应答
		false,  // 是否具有排他性
		false,  // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,  // 队列消费是否阻塞
		nil,    // 额外属性
	)

	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL + C")
	<-forever
}

示例

mainRoutingPublish

package main

import (
	"rabbitmq-base/rabbitmq"
	"strconv"
	"time"
)

func main() {
	r1 := rabbitmq.NewRabbitMQRouting("exchange2", "error")
	r2 := rabbitmq.NewRabbitMQRouting("exchange2", "info")

	for i := 0; i < 5; i++ {
		r1.PublishRouting("Hello exchange1 error" + strconv.Itoa(i))
		r2.PublishRouting("Hello exchange1 info" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
	}
}

mainRoutingRecieve1

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQRouting("exchange2", "error")
	r.RecieveRouting()
}

mainRoutingRecieve2

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQRouting("exchange2", "info")
	r.RecieveRouting()
}

Topic

话题模式

一个消息被多个消费者获取。消息的目标 queue 可用 BindingKey 以通配符,(#:一个或多个词,*:一个词)的方式指定。

![[Topic.png]]

获取实例

// NewRabbitMQTopic - 创建话题模式下的 RabbitMQ 实例
func NewRabbitMQTopic(exchange string, routingKey string) *RabbitMQ {
	return NewRabbitMQ("", exchange, routingKey)
}

生产者

// PublishTopic - 话题模式下的生产者
func (r *RabbitMQ) PublishTopic(message string) {
	// 1. 尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"topic",    // 交换机类型,topic
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 发送消息
	err = r.channel.Publish(
		r.Exchange,
		r.Key, // 设置 routingKey
		false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// RecieveTopic - 话题模式下的消费者
func (r *RabbitMQ) RecieveTopic() {
	// 1. 试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"topic",    // 交换机类型,direct
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
	q, err := r.channel.QueueDeclare(
		"",    // 随机生产队列名称
		false, // 是否持久化
		false, // 自动删除
		true,  // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)

	r.failOnErr(err, "Failed to declare a queue")

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		r.Key, // 需要绑定 routingKey
		r.Exchange,
		false, // 是否阻塞
		nil,   // 额外属性
	)

	// 3. 消费消息
	message, err := r.channel.Consume(
		q.Name, // 队列名称
		"",     // 用来区分多个消费者
		true,   // 是否自动应答
		false,  // 是否具有排他性
		false,  // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,  // 队列消费是否阻塞
		nil,    // 额外属性
	)

	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL + C")
	<-forever
}

示例

mainTopicPublish

package main

import (
	"rabbitmq-base/rabbitmq"
	"strconv"
	"time"
)

func main() {
	r1 := rabbitmq.NewRabbitMQTopic("exchange3", "xylx.topic1")
	r2 := rabbitmq.NewRabbitMQTopic("exchange3", "xylx.topic2")

	for i := 0; i <= 100; i++ {
		r1.PublishTopic("Hello exchange3 xylx.topic1 -> " + strconv.Itoa(i))
		r2.PublishTopic("Hello exchange3 xylx.topic2 -> " + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
	}
}

mainTopicRecieve1

. 用来分隔词

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQTopic("exchange3", "#.topic1")
	r.RecieveTopic()
}

mainTopicRecieve2

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQTopic("exchange3", "#.topic2")
	r.RecieveTopic()
}

标签:false,err,exchange,队列,rabbitmq,RabbitMQ,go,true
From: https://www.cnblogs.com/xylx1/p/17445730.html

相关文章

  • API NEWS | 三个Argo CD API漏洞
    欢迎大家围观小阑精心整理的API安全最新资讯,在这里你能看到最专业、最前沿的API安全技术和产业资讯,我们提供关于全球API安全资讯与信息安全深度观察。本周,我们带来的分享如下:关于三个ArgoCDAPI漏洞的文章Gartner对API安全的看法分布式标识是现代API安全的关键 关......
  • Golang扫盲式学习——GO并发 | (一)
    并发与并行......
  • Invalid prop: type check failed for prop “value”. Expected String, Number, got
    记录一个报错问题,之前别的同事写的代码,还看了半天有点无语!!下拉选择部门,联动动态赋值责任人下拉列表警告,导致选择责任人的时候无法正确赋值undefined。究其原因是封装的表单formItem项中传入了下拉选项的映射字段,如下: 而在选择部门的时候又已经把动态数据遍历处理成了标准的la......
  • go语言笔记——defer作用DB资源等free或实现调试
    defer和追踪关键字defer允许我们推迟到函数返回之前(或任意位置执行 return 语句之后)一刻才执行某个语句或函数(为什么要在返回之后才执行这些语句?因为 return 语句同样可以包含一些操作,而不是单纯地返回某个值)。关键字defer的用法类似于面向对象编程语言Java和C#的 fi......
  • MongoDB C++ gridfs worked example
    使用libmongoc,参考:http://mongoc.org/libmongoc/current/mongoc_gridfs_t.html#include<mongoc.h>#include<stdio.h>#include<stdlib.h>#include<fcntl.h>classMongoGridFS{public:MongoGridFS(constchar*db);~MongoGridFS();......
  • mongodb c++ driver安装踩坑记
     安装教程:https://mongodb.github.io/mongo-cxx-driver/mongocxx-v3/installation/(1)“initializer_list”filenotfoundhttp://stackoverflow.com/questions/19493671/initializer-list-no-such-file-or-directorySinceyouareusing GCC-4.8 andyourproblemisthatyoud......
  • go 执行ssh 报错ssh: handshake failed: read tcp xxx:->xxx:22: read: connection re
    需求:解决报错go执行ssh报错ssh:handshakefailed:readtcpxxx:->xxx:22:read:connectionresetbypeer 10个以内,没有问题。10以上就报错解决:我的远程(192.168.49.171)服务器ssh默认最大限制10解除限制,下面的操作都是在49.171上操作的。1.编辑sshd_confi......
  • RabbitMQ简单介绍
    RabbitMQ是一款开源的消息中间件具备的特点1.高可靠,易扩展,高可用2.支持大多数的编程语言客户端3.遵循AMQP协议,也支持MQTT协议,自身采用Erlang语法开发RabbitMQ整体逻辑结构大体可以由三部分组成:生产者,Broker,消费者 而消息者就是从指定的消息队列中进行消息的消费交换器需......
  • RabbitMq
    角色 生产者消息的创建者,负责创建和推送数据到消息服务器消费者消息的接收方,用于处理数据和确认消息组件交换机(exchange) 接受消息,分配消息  ,用于存储生产者的消息路由键(RountingKey) 用于那生成这的数据分配到固定的交换机上面 BingKey把交换机的消息绑定到队......
  • go exec.Command windows 参数引号转义问题
    Go在windows上调用本地进程传参时的一个天坑Golanggo在windows上exec.Command调用本地进程在传参的时候有一个天坑,举个栗子来说正常来说一般代码会这么写cmdLine:="notepad.exe"+`"D:\ProgramFiles\Notepad++\session.xml"`cmd:=exec.Command("cmd.exe","/c",cmdL......