1、发送端
步骤分解如下:
(1)建立连接
conn, err := amqp.Dial("amqp://admin:[email protected]:5672/")
(2)打开channel
这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。
(3)声明一个队列
q, err := ch.QueueDeclare( ... )
(4)创建消息
msg := amqp.Publishing{ ... }
(5)发布消息
err = ch.Publish( ... )
package main import ( "github.com/streadway/amqp" "log" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接 RabbitMQ conn, err := amqp.Dial("amqp://admin:[email protected]:5672/") failOnError(err, "连接失败") defer conn.Close() // 建立一个 channel ( 其实就是TCP连接 ) ch, err := conn.Channel() failOnError(err, "打开通道失败") defer ch.Close() // 创建一个名字叫 "hello" 的队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "创建队列失败") // 构建一个消息 body := "Hello World!" msg := amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), } // 构建一个生产者,将消息 放入队列 err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate msg) log.Printf(" [x] Sent %s", body) failOnError(err, "Failed to publish a message") }
2、接收端
2.2 接收端
步骤分解如下:
(1)建立连接
conn, err := amqp.Dial("amqp://admin:[email protected]:5672/")
(2)打开channel
这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。
(3)声明一个队列
q, err := ch.QueueDeclare( ... )
(4)构建一个消费者
msgChan, err := ch.Consume( ... )
(5)不断的读取消息
for d := range msgChan {
log.Printf("收到消息: %s", d.Body)
}
package main import ( "github.com/streadway/amqp" "log" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 连接 RabbitMQ conn, err := amqp.Dial("amqp://admin:[email protected]:5672/") failOnError(err, "连接失败") defer conn.Close() // 建立一个 channel ( 其实就是TCP连接 ) ch, err := conn.Channel() failOnError(err, "打开通道失败") defer ch.Close() // 创建一个名字叫 "hello" 的队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "创建队列失败") // 开启一个 消费者 // 返回值是 ch 类型 msgChan, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "注册消费者 ,失败") //帮助阻塞 forever := make(chan bool) // 开启一个 go程 go func() { for d := range msgChan { log.Printf("收到消息: %s", d.Body) } }() log.Printf(" 等待消息...") <-forever }
3. 可能遇到的问题
遇到 “Reason: "username or password not allowed"”
缺少权限,可能账户密码错误,也可能使用了 guest 账户未处理远程连接。考虑新建一个高权限的用户。
新建账户的方法请参考我的另一篇文章。
遇到 “no access to this vhost”
为 admin 赋予权限,使之可以访问 vhost
下面的指令 为 admin 赋予权限,使得可以访问 vhost 名字为 / 的资源。
rabbitmqctl set_permissions -p / admin "." "." "."
说明:
/ 是个 vhost 资源名称
"." "." "." 标识权限的类型,和读写权限。