nsq最初是由bitly公司开源出来的一款简单易用的消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。它有以下特性:
- 分布式。它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。
- 易于扩展。它支持水平扩展,没有中心化的消息代理(Broker),内置的发现服务让集群中增加节点非常容易。
- 运维方便。它非常容易配置和部署,灵活性高。
- 高度集成。现在已经有官方的Golang、Python和JavaScript客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。
组件
nsq一共有四种组件
nsqlookupd
nsqlookupd是负责管理拓扑信息并提供最终一致性的发现服务的守护进程(daemon)。在终端1启动它:
❯ nsqlookupd [nsqlookupd] 2019/07/18 11:42:16.876296 INFO: nsqlookupd v1.1.0 (built w/go1.11) [nsqlookupd] 2019/07/18 11:42:16.876864 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2019/07/18 11:42:16.876868 INFO: TCP: listening on [::]:4160
默认HTTP接口监听4161,TCP接口监听4160。
nsqd
nsqd是一个负责接收、排队、投递消息给客户端的守护进程。客户端通过查询 nsqlookupd 来发现指定话题(topic)的nsqd生产者,nsqd节点会广播话题(topic)和通道(channel)信息。数据流模型如下:
单个nsqd可以有多个topic,每个topic可以有多个channel。channel接收这个topic所有消息的副本,从而实现多播分发,而channel上的每个消息被分发给它的订阅者,从而实现负载均衡。
在终端2启动nsqd:
❯ nsqd --lookupd-tcp-address=127.0.0.1:4160 ... [nsqd] 2019/07/18 11:47:46.427184 INFO: HTTP: listening on [::]:4151 [nsqd] 2019/07/18 11:47:46.427195 INFO: TCP: listening on [::]:4150 [nsqd] 2019/07/18 11:47:46.427203 INFO: LOOKUP(127.0.0.1:4160): adding peer [nsqd] 2019/07/18 11:47:46.427355 INFO: LOOKUP connecting to 127.0.0.1:4160 ...
nsqd通过tcp端口连接到了nsqlookupd,它自己在4151接受HTTP请求,在4150接受TCP请求。
nsqadmin
nsqadmin 是一套WEB管理UI,用来汇集集群的实时统计,并执行不同的管理任务。在终端3启动它:
❯ nsqadmin --lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2019/07/18 11:54:23.125392 INFO: nsqadmin v1.1.0 (built w/go1.11) [nsqadmin] 2019/07/18 11:54:23.128755 INFO: HTTP: listening on [::]:4171
浏览器打开http://localhost:4171就能访问了,需要注意,管理UI可以按需启动。
nsq的Go客户端使用
首先安装go-nsq:
go get github.com/nsqio/go-nsq
先看生产者:
package main import ( "github.com/nsqio/go-nsq" "log" "math/rand" "time" ) func main() { config := nsq.NewConfig() w, err := nsq.NewProducer("127.0.0.1:4150", config) if err != nil { log.Panic(err) } chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ") for { buf := make([]byte, 4) for i := 0; i < 4; i++ { buf[i] = chars[rand.Intn(len(chars))] } log.Printf("Pub: %s", buf) err = w.Publish("test", buf) if err != nil { log.Panic(err) } time.Sleep(time.Second * 1) } w.Stop() }
NewProducer的第一个参数就是nsqd的地址,在这里做了个无限for循环,每次随机4个byte发布到test话题里面。
再看消费者代码:
package main import ( "log" "sync" "github.com/nsqio/go-nsq" ) func main() { wg := &sync.WaitGroup{} wg.Add(1000) config := nsq.NewConfig() q, _ := nsq.NewConsumer("test", "ch", config) q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Printf("Got a message: %s", message.Body) wg.Done() return nil })) err := q.ConnectToNSQD("127.0.0.1:4150") if err != nil { log.Panic(err) } wg.Wait() }
一开始通过sync.WaitGroup安排了1000个待执行的等待组,NewConsumer的第一个参数是话题test,第二是通道名字,然后用AddHandler添加一个消费处理函数,在处理函数中会打印这个消息。
首先启动消费者,再启动发布者:
附:代码地址
标签:INFO,07,使用,nsqd,2019,nsqlookupd,nsq,NSQ From: https://www.cnblogs.com/beatle-go/p/18104467