首页 > 其他分享 >使用NSQ

使用NSQ

时间:2024-03-29 19:23:13浏览次数:24  
标签:INFO 07 使用 nsqd 2019 nsqlookupd nsq NSQ

nsq最初是由bitly公司开源出来的一款简单易用的消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。它有以下特性:

  • 分布式。它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。
  • 易于扩展。它支持水平扩展,没有中心化的消息代理(Broker),内置的发现服务让集群中增加节点非常容易。
  • 运维方便。它非常容易配置和部署,灵活性高。
  • 高度集成。现在已经有官方的Golang、Python和JavaScript客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。

组件

nsq一共有四种组件

nsqlookupd

nsqlookupd是负责管理拓扑信息并提供最终一致性的发现服务的守护进程(daemon)。在终端1启动它:

❯ nsqlookupd
[nsqlookupd] 2019/07/18 11:42:16.876296 INFO: nsqlookupd v1.1.0 (built w/go1.11)
[nsqlookupd] 2019/07/18 11:42:16.876864 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/07/18 11:42:16.876868 INFO: TCP: listening on [::]:4160

默认HTTP接口监听4161,TCP接口监听4160。

nsqd

nsqd是一个负责接收、排队、投递消息给客户端的守护进程。客户端通过查询 nsqlookupd 来发现指定话题(topic)的nsqd生产者,nsqd节点会广播话题(topic)和通道(channel)信息。数据流模型如下:

单个nsqd可以有多个topic,每个topic可以有多个channel。channel接收这个topic所有消息的副本,从而实现多播分发,而channel上的每个消息被分发给它的订阅者,从而实现负载均衡。

在终端2启动nsqd:

❯ nsqd --lookupd-tcp-address=127.0.0.1:4160
...
[nsqd] 2019/07/18 11:47:46.427184 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/07/18 11:47:46.427195 INFO: TCP: listening on [::]:4150
[nsqd] 2019/07/18 11:47:46.427203 INFO: LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2019/07/18 11:47:46.427355 INFO: LOOKUP connecting to 127.0.0.1:4160
...

nsqd通过tcp端口连接到了nsqlookupd,它自己在4151接受HTTP请求,在4150接受TCP请求。

nsqadmin

nsqadmin 是一套WEB管理UI,用来汇集集群的实时统计,并执行不同的管理任务。在终端3启动它:

❯ nsqadmin --lookupd-http-address=127.0.0.1:4161
[nsqadmin] 2019/07/18 11:54:23.125392 INFO: nsqadmin v1.1.0 (built w/go1.11)
[nsqadmin] 2019/07/18 11:54:23.128755 INFO: HTTP: listening on [::]:4171

浏览器打开http://localhost:4171就能访问了,需要注意,管理UI可以按需启动。

nsq的Go客户端使用

首先安装go-nsq:

go get github.com/nsqio/go-nsq

先看生产者:

package main

import (
    "github.com/nsqio/go-nsq"
    "log"
    "math/rand"
    "time"
)

func main() {
    config := nsq.NewConfig()
    w, err := nsq.NewProducer("127.0.0.1:4150", config)

    if err != nil {
        log.Panic(err)
    }

    chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")

    for {
        buf := make([]byte, 4)
        for i := 0; i < 4; i++ {
            buf[i] = chars[rand.Intn(len(chars))]
        }
        log.Printf("Pub: %s", buf)
        err = w.Publish("test", buf)
        if err != nil {
            log.Panic(err)
        }
        time.Sleep(time.Second * 1)
    }

    w.Stop()
}

NewProducer的第一个参数就是nsqd的地址,在这里做了个无限for循环,每次随机4个byte发布到test话题里面。

再看消费者代码:

package main

import (
    "log"
    "sync"

    "github.com/nsqio/go-nsq"
)

func main() {

    wg := &sync.WaitGroup{}
    wg.Add(1000)

    config := nsq.NewConfig()
    q, _ := nsq.NewConsumer("test", "ch", config)
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Printf("Got a message: %s", message.Body)
        wg.Done()
        return nil
    }))
    err := q.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        log.Panic(err)
    }
    wg.Wait()

}

一开始通过sync.WaitGroup安排了1000个待执行的等待组,NewConsumer的第一个参数是话题test,第二是通道名字,然后用AddHandler添加一个消费处理函数,在处理函数中会打印这个消息。

首先启动消费者,再启动发布者:

 附:代码地址

标签:INFO,07,使用,nsqd,2019,nsqlookupd,nsq,NSQ
From: https://www.cnblogs.com/beatle-go/p/18104467

相关文章

  • 【GitLab】Ubuntu使用宝塔安装GitLab最新社区版
    首先在Ubuntu安装宝塔面板在官网可以找到脚本一键安装安装GitLab社区版然后在宝塔面板的“软件商店”里面找到GitLab最新社区版12.8.1一键安装安装过程中可能出现以下问题:1.卡在ruby_block[waitforlogrotateservicesocket]actionrun解决办法:在Ubuntu终端中运行......
  • KingbaseES 索引unusable的使用
    前言KingbaseES中,索引不可用原则介绍oracle数据库中,如果索引不可用(unusable),在进行DML操作时,会触发报错:索引不可用。在KES中如果设置索引不可用,插入数据不会报错,因为当索引被置为不可用状态后,如果要重新启动索引需要rebuild索引,此时会在新索引中重新组织表tuple数据。insert......
  • LangChain SQL介绍以及使用Qwen1.5执行SQL查询教程
    该模块可以让我们向LLM提问时从数据库中查询数据并做出回答。架构SQLchain和agent的高层抽象架构:问题转查询SQL:使用LLM将用户输入转成SQL查询执行SQL查询:执行SQL语句回答问题:LLM根据数据库查询结果返回回答内容环境安装安装必要环境和包pipinstall--upgrade--quiet......
  • WPF中使用PDF模板实现PDF导出和预览-来自GPT4
    在C#和WPF项目中实现加载不同的PDF模板、查看报告和导出PDF文件的功能,可以通过以下步骤完成:1.选择PDF库首先,选择一个合适的.NETPDF库。有许多库可以帮助你处理PDF文件,包括但不限于:iTextSharp:一个功能强大的和灵活的库,适用于创建和修改PDF文件。它是iText的一个.NET端口。......
  • nbtstat /?命令参数 显示使用NBT(NetBIOS over TCP/IP)的协议统计信息和当前TCP/IP连接
    NBTSTAT(NetBIOSoverTCP/IPStatistics)是一个用于显示和更新NetBIOS名称缓存、NetBIOS名称表和NetBIOS会话表的Windows命令行实用程序。它允许用户查看当前网络上的NetBIOS名称信息,以便诊断网络连接问题和执行基本的网络故障排除。NetBIOS是一种用于在局域网中进行通信的协议,它......
  • ios使用openlayer地图缩放时卡顿
    问题描述h5项目使用openlayer展示地图,并且使用VectorLayer铺点,安卓完全没问题,但是ios上缩放后会突然触发无法缩放并且无法点击拖动缓慢等问题。经排查,是VectorLayer的minZoommaxZoom导致,但不理解原因。问题代码如下importVectorLayerfrom'ol/layer/Vector'vectorLayer......
  • linux离线安装jenkins及使用教程
    本教程采用jenkins.war的方式离线安装部署,在线下载的方式会遇到诸多问题,不宜采用一、下载地址地址:Jenkinsdownloadanddeployment下载最新的长期支持版由于jenkins使用java开发的,所以需要安装的linux服务器装有jdk环境,并且jdk版本支持你所安装的jenkins版本点击 Hard......
  • 【前端】使用Web Audio API 技术播放音乐
    简言记录下使用webaudio播放音乐的方法。WebAudioAPIWebAudioAPI提供了在Web上控制音频的一个非常有效通用的系统,允许开发者来自选音频源,对音频添加特效,使音频可视化,添加空间效果(如平移),等等。你可以先看下api接口介绍文章WebAudioAPI接口介绍。html的<au......
  • 【Nuxt3】使用内置方法获取网络数据和使用场景
    简言记录下如何使用useFetch和$fetch的使用方法和它们的使用场景。获取数据nuxt3内置了很多方法来获取网络数据。这些方法有:useAsyncData—useAsyncData可以访问以SSR友好的可组合方式异步解析的数据。useFetch—使用SSR友好型可组合程序从API端点获取数据......
  • JavaScript代码安全性提升:选择和使用JS混淆工具的指南
    引言在Web开发中,JavaScript是一种常用的脚本语言,然而,由于其代码容易被他人轻易获取和修改,为了保护JavaScript代码的安全性和版权,我们需要使用JS混淆工具。本文将介绍什么是JS混淆工具、为什么要使用以及如何选择合适的JS混淆工具,同时还会列举一些常用的JS混淆工具。 正文什......