首页 > 其他分享 >NSQ demo

NSQ demo

时间:2023-08-22 15:00:08浏览次数:39  
标签:err nil producer demo fmt nsqd nsq NSQ

Docker

docker pull nsqio/nsq

nsqd

​nsqd​ 是接收、队列和向客户端传递消息的守护进程。它可以独立运行,但通常在具有nsqlookupd​实例的集群中进行配置(这种情况下,他将会发布主题和频道以便发现)

配置及api:https://nsq.io/components/nsqd.html

docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.18.0.1

nsqlookup

​nsqlookupd​ 是管理拓扑信息的守护进程。客户端可以查询 nsqlookupd​ 来发现特定主题的 nsqd​ 生产者,而 nsqd​ 节点会广播主题和通道的信息。

配置及api:https://nsq.io/components/nsqlookupd.html

nsqadmin

​nsqadmin​ 是一个 Web 用户界面,可以实时查看汇总的集群统计信息,并执行各种管理任务。

docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin  --nsqd-http-address=172.18.0.1:4151

Demo

 

// producer.go
package main

import (
	"bufio"
	"fmt"
	"github.com/nsqio/go-nsq"
	"os"
	"strings"
)

var producer *nsq.Producer

func initProducer(str string) (err error) {
	config := nsq.NewConfig()
	producer, err = nsq.NewProducer(str, config)
	if err != nil {
		fmt.Println("create producer failed, err:", err)
		return
	}
	return nil
}

func main() {
	nsqAddress := "localhost:4150"
	err := initProducer(nsqAddress)
	if err != nil {
		fmt.Printf("init producer failed, err:%v\n", err)
		return
	}

	reader := bufio.NewReader(os.Stdin)
	for {
		data, err := reader.ReadString('\n')
		if err != nil {
			fmt.Printf("read string failed, err:%v\n", err)
			continue
		}
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q" {
			break
		}

		err = producer.Publish("topic_demo", []byte(data))
		if err != nil {
			fmt.Printf("publish msg to nsq failed, err:%v\n", err)
			continue
		}
	}
}

 

// consumer.go
package main

import (
	"fmt"
	"time"

	"github.com/nsqio/go-nsq"
)

type ConsumerT struct{}

func main() {
	InitConsumer("topic_demo", "localhost:4150")
	for {
		time.Sleep(1 * time.Second)
	}
}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

func InitConsumer(topic string, address string) {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 15 * time.Second
	c, err := nsq.NewConsumer(topic, "ch", cfg)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)
	c.AddHandler(&ConsumerT{})

	if err := c.ConnectToNSQD(address); err != nil {
		panic(err)
	}
}

 

 

标签:err,nil,producer,demo,fmt,nsqd,nsq,NSQ
From: https://www.cnblogs.com/kanx1blog/p/17648542.html

相关文章

  • hibernate_demo
    参考:ORM----hibernate入门Demo(无敌详细版)-Old-凯-博客园(cnblogs.com)Hibernate-基础入门详解_51CTO博客_hibernate入门hmb.xml:Hibernate框架之hbm.xml映射文件(详解)_hibernate映射文件详解_hestyle的博客-CSDN博客 新建testdb数据库,创建tb_users表: 模块整体目录......
  • OpenTiny Vue 3.10.0 版本发布:组件 Demo 支持 Composition 写法,新增4个新组件
    我们非常高兴地宣布,2023年8月14日,OpenTinyVue发布了v3.10.0 ......
  • FastApi-1-结合sql 增/查demo
    目录FastAPI学习记录项目结构部分接口/代码展示感受全部代码FastAPI学习记录fastapi已经学习有一段时间,今天抽时间简单整理下。官网介绍:FastAPI是一个用于构建API的现代、快速(高性能)的web框架,使用Python3.6+并基于标准的Python类型提示。快速:可与NodeJS和Go......
  • tk的一个布局demo
    效果代码importtkinterastkfromtkinterimportscrolledtextimportrequestsdefQ_A(data):headers={'origin':'https://chat2.jinshutuan.com','referer':'https://chat2.jinshutuan.com/&......
  • python监控redis demo
    下载aioredis为了提升性能我们使用一部redispipinstallaioredisdemo:importasyncioimportaioredis#每隔10s获取redis信息asyncdefmonitor_redis(host,port,interval):#建立异步Redis连接redis_uri=f"redis://{host}:{port}"redis=awaitai......
  • 适老化demo
    --------------------------------------------------------html------------------------------------------------------<!doctypehtml><htmllang="en"> <head> <metacharset="UTF-8"> <title>鼠标移入视频播放,鼠标移出播放停止,恢......
  • excel 输出demo(outputstream 转inputstr)
    protectedvoidresponseExcel(HSSFWorkbookworkbook)throwsIOException{ByteArrayOutputStreamos=newByteArrayOutputStream();try{workbook.write(os);}catch(IOExceptione){e.printStackTrace();}byte[]content......
  • django乐观锁、悲观锁商品秒杀简单demo
    悲观锁总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读......
  • seata学习-简单demo入门
    概述学习一个框架,我喜欢从demo中了解该框架所能达到的效果再进行深入地学习。本篇文章将会介绍seata的一个入门使用demo,作为使用seata的入门学习文章。使用案例首先到github中下载一个RM的运行服务,本例中使用的是:https://github.com/seata/seata/releases/download......
  • Mybatis配置文件的空白模板和联系demo所用到的依赖
    核心配置文件模板<?xmlversion="1.0"encoding="UTF-8"?><!DOCTYPEconfigurationPUBLIC"-//mybatis.org//DTDConfig3.0//EN""https://mybatis.org/dtd/mybatis-3-config.dtd"><configuration><envir......