首页 > 其他分享 >消息队列中间件nsq安装与使用

消息队列中间件nsq安装与使用

时间:2022-11-22 23:31:45浏览次数:90  
标签:return consumers 队列 中间件 Println nsq order er


安装与运行

nsq的镜像开启容器时并不是默认开启三个服务的,而是需要手动开启。

docker pull nsqio/nsq
docker run -itd --restart=on-failure:20 -p 4150:4150 -p 4151:4151 -p 4160:4160 -p 4161:4161 -p 4170:4170 -p 4171:4171 nsqio/nsq
docker exec -itd 容器id nsqlookupd
docker exec -itd 容器id nsqd --lookupd-tcp-address=127.0.0.1:4160
docker exec -itd 容器id nsqadmin --lookupd-http-address=127.0.0.1:4161

执行完毕后,在主机浏览器输入:localhost:4171, 可以看到nsq的topic后台。

PS

怀疑代码内部可能存在隐式端口未说明,目前docker方式是只能连nsqd,lookupd有点问题,不能给消费者寻找topic,暂时没找到原因,实体部署nsq如下:

  1. ​https://nsq.io/deployment/installing.html​​ 安装指定版本
  2. 将安装文件里的bin路径,写入PATH
  3. 执行​​nsq --version​​ 得到有效信息,则为成功安装

基本上都是release免安版,不需要编译, 解压就好了.~

使用详解

​https://nsq.io/overview/quick_start.html​

go使用实例

package main

import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"runtime"
"sync"
"time"
)

//handler
type ConsumerT struct{}

var consumeMessageNumber int
var l sync.RWMutex
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))

l.Lock()
defer l.Unlock()
consumeMessageNumber ++
return nil
}

var producer *nsq.Producer
var consumers []*nsq.Consumer

var conf *nsq.Config
var nsqdAddrTCP = "localhost:4150" // tcp used to publish topics and messages
var nsqdAddrHTTP = "localhost:4151" // http publish topics,not exampled
var nsqAdminAddr = "localhost:4171" // backend ui
var nsqlookupdAddr = "localhost:4161" // help consumer find topics
func init() {
log.SetFlags(log.Llongfile | log.LstdFlags)
//init a producer
var er error
conf = nsq.NewConfig()
producer, er = nsq.NewProducer(nsqdAddrTCP, conf)
if er != nil {
log.Println(er.Error())
return
}
er = producer.Ping()
if er != nil {
log.Println(er.Error())
return
}
//
init consumer
conf.LookupdPollInterval = 5 * time.Second
}
func main() {
defer func(){
producer.Stop()
for i,_:=range consumers {
consumers[i].Stop()
}
}()
runtime.GOMAXPROCS(runtime.NumCPU())
// 创建100个消费者
consumers = make([]*nsq.Consumer, 100)
var e error
for i, _ := range consumers {
consumers[i], e = nsq.NewConsumer("go-nsq_testcase", "channel_1", conf)
if e != nil {
log.Println(e.Error())
return
}
consumers[i].SetLogger(nil, 0)
consumers[i].AddHandler(&ConsumerT{}) // 添加消费者接口

if e = consumers[i].ConnectToNSQLookupd(nsqlookupdAddr);e!=nil {
log.Println(e)
return
}

}

// 并发发布3000个消息
for i := 0; i < 3000; i++ {
go func(i int) {
er := producer.Publish("go-nsq_testcase", []byte(fmt.Sprintf("hello,everyone_%d", i)))
if er != nil {
log.Println(er.Error())
return
}
}(i)
}

time.Sleep(20 * time.Second)
fmt.Println(consumeMessageNumber)
select {}
}

解决消息多次投递,单服务如何确保单次

假设从nsq拿到订单,需要避免订单重复处理

type Unique struct{
M *sync.RWMutex
mp map[string]interface{}
}
func NewUnique() Unique{
return Unique{
M: &sync.RWMutex{},
mp: make(map[string]interface{})
}
}
// 已处理过,true
// 未处理过,false
func (u *Unique)HasDealed(uniqueID string)bool {
u.RLock()
defer u.RUnlock()
_, ok := u.mp[uniqueID]

return ok
}

// 未处理时,添加
// 处理过时,无视
func (u *Unique) UnDealedAndWrite(uniqueID string, order interface{}) (existed bool) {
u.Lock()
defer u.Unlock()
_, ok := u.mp[uniqueID]
if !ok {
u.mp[uniqueID] = order
return false
}
return true
}

func main() {
unique := NewUnique()
// get an order
order := GetFromNSQ()
if unique.UnDealeAndWrite(order.UniqueID, order) {
fmt.Println("order 已被消费,无须添加处理,可通知nsq停止推送")
// 停止nsq推送该条order
StopNSQTopic(topicName)
return
}
fmt.Println("order未被消费,已添加")
// 消费order
handle(order)
}

服务群如何保证消息被消费一次

在思考,可以用数据库id唯一保证,比如处理以前,先插入记录,如果插入成功,则未处理,否则处理过


标签:return,consumers,队列,中间件,Println,nsq,order,er
From: https://blog.51cto.com/u_11553781/5878751

相关文章

  • freertos消息队列的值传递和指针传递
    消息队列的使用方法总结:1、消息队列初始化(定义一个消息队列的结构体),一般在main.c中完成。2、消息队列的发送:  aextern消息队列   b定义一个结构体的指针指向消......
  • 4.队列、栈、链表
    目录一、队列1.什么是队列2.抽象数据类型Queue3.python实现ADTQueue4.举例热土豆问题(约瑟夫问题)5.举例:打印队列二、双端队列1.什么是双端队列?2.抽象数据类型Deque3.pytho......
  • Java双向链表实现队列
    将双向链表做简单的改造,即可实现一个FIFO(FirstInputFirstOut)队列,该队列只在头节点出队,尾节点入队。一般来说定义节点类只需一个后驱节点next即可。这里保留pre节......
  • 优先队列(std_priority_queue)
    title:优先队列(std::priority_queue)date:2022-11-1715:50:12tags:算法本文章遵守知识共享协议CC-BY-NC-SA,转载时需要署名,推荐在我的个人博客阅读。优先队列是......
  • Java 同步锁ReentrantLock与抽象同步队列AQS
    AbstractQueuedSynchronizer抽象同步队列,它是个模板类提供了许多以锁相关的操作,常说的AQS指的就是它。AQS继承了​​AbstractOwnableSynchronizer​​类,AOS用于保存线程对......
  • Java 同步锁ReentrantLock与抽象同步队列AQS
    AbstractQueuedSynchronizer抽象同步队列,它是个模板类提供了许多以锁相关的操作,常说的AQS指的就是它。AQS继承了​​AbstractOwnableSynchronizer​​类,AOS用于保存线程对......
  • 如何一秒钟从头构建一个 ASP.NET Core 中间件
    前言其实地上本没有路,走的人多了,也便成了路。--鲁迅就像上面鲁迅说的那样,其实在我们开发中间件的过程中,微软并没有制定一些策略或者文档来约束你如何编写一个中间件......
  • ASP.NET Core 中间件之压缩、缓存
    前言今天给大家介绍一下在ASP.NETCore日常开发中用的比较多的两个中间件,它们都是出自于微软的ASP.NET团队,他们分别是Microsoft.AspNetCore.ResponseCompression和......
  • django---中间件
    中间件当用户发送请求时,其实时候是将请求发送给wsgi(一种协议),django使用的是wsgiref,然后再将请求发送给django的各个中间件(settings里的MIDDLEWARE表示使用的中间件),再由......
  • Java——集合——数据结构——栈、队列、数组、链表、红黑树
                                                      ......