首页 > 其他分享 >Go操作nats的jetstream

Go操作nats的jetstream

时间:2024-09-12 10:28:01浏览次数:9  
标签:stream nats ctx fmt jetstream js Go

先搭建nats集群

version: "3.5"
services:
  nats:
    image: nats
    ports:
      - "8222:8222"
      - "4222:4222"
      - "6222:6222"  
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222"
    networks: ["nats"]
  nats-1:
    image: nats 
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222 -js -n node2"
    networks: ["nats"]
    depends_on: ["nats"]
  nats-2:
    image: nats
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222 -js -n node3"
    networks: ["nats"]
    depends_on: ["nats"]

networks:
  nats:
    name: nats

检测集群状态

curl http://127.0.0.1:8222/routez

go代码测试Work-queue Stream,其他测试代码在 https://natsbyexample.com/

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

func main() {

	url := "nats://192.168.252.128:4222"

	nc, err := nats.Connect(url)
	defer nc.Drain()
	if err != nil {
		fmt.Println(err)
	}

	js, _ := jetstream.New(nc)

	cfg := jetstream.StreamConfig{
		Name:      "EVENTS",
		Retention: jetstream.WorkQueuePolicy,
		Subjects:  []string{"events.>"},
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	stream, _ := js.CreateStream(ctx, cfg)
	fmt.Println("created the stream")

	js.Publish(ctx, "events.us.page_loaded", nil)
	js.Publish(ctx, "events.eu.mouse_clicked", nil)
	js.Publish(ctx, "events.us.input_focused", nil)
	fmt.Println("published 3 messages")

	fmt.Println("# Stream info without any consumers")
	printStreamState(ctx, stream)

	cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Name: "processor-1",
	})

	msgs, _ := cons.Fetch(3)
	for msg := range msgs.Messages() {
		msg.DoubleAck(ctx)
	}

	fmt.Println("\n# Stream info with one consumer")
	printStreamState(ctx, stream)

	_, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Name: "processor-2",
	})
	fmt.Println("\n# Create an overlapping consumer")
	fmt.Println(err)

	stream.DeleteConsumer(ctx, "processor-1")

	_, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Name: "processor-2",
	})
	fmt.Printf("created the new consumer? %v\n", err == nil)
	stream.DeleteConsumer(ctx, "processor-2")

	fmt.Println("\n# Create non-overlapping consumers")
	cons1, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Name:          "processor-us",
		FilterSubject: "events.us.>",
	})
	cons2, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Name:          "processor-eu",
		FilterSubject: "events.eu.>",
	})

	js.Publish(ctx, "events.eu.mouse_clicked", nil)
	js.Publish(ctx, "events.us.page_loaded", nil)
	js.Publish(ctx, "events.us.input_focused", nil)
	js.Publish(ctx, "events.eu.page_loaded", nil)
	fmt.Println("published 4 messages")

	msgs, _ = cons1.Fetch(2)
	for msg := range msgs.Messages() {
		fmt.Printf("us sub got: %s\n", msg.Subject())
		msg.Ack()
	}

	msgs, _ = cons2.Fetch(2)
	for msg := range msgs.Messages() {
		fmt.Printf("eu sub got: %s\n", msg.Subject())
		msg.Ack()
	}
}

func printStreamState(ctx context.Context, stream jetstream.Stream) {
	info, _ := stream.Info(ctx)
	b, _ := json.MarshalIndent(info.State, "", " ")
	fmt.Println(string(b))
}

标签:stream,nats,ctx,fmt,jetstream,js,Go
From: https://www.cnblogs.com/qcy-blog/p/18409663

相关文章

  • 锋哥写一套前后端分离Python权限系统 基于Django5+DRF+Vue3.2+Element Plus+Jwt 视频
    大家好,我是java1234_小锋老师,最近写了一套【前后端分离Python权限系统基于Django5+DRF+Vue3.2+ElementPlus+Jwt】视频教程,持续更新中,计划月底更新完,感谢支持。视频在线地址:打造前后端分离Python权限系统基于Django5+DRF+Vue3.2+ElementPlus+Jwt视频教程(火爆连载更新中......
  • Python毕业设计基于Django的 校园菜鸟驿站管理系统
    文末获取资源,收藏关注不迷路文章目录一、项目介绍二、主要使用技术三、研究内容四、核心代码五、文章目录一、项目介绍首先,以需求为依据,根据需求分析结果进行了系统的设计,并将其划分为管理员和用户二种角色和多个主要模块:用户、快递类型、快递信息、取件信息等。......
  • Python毕业设计基于Django的毕业设计选题管理系统
    文末获取资源,收藏关注不迷路文章目录一、项目介绍二、主要使用技术三、研究内容四、核心代码五、文章目录一、项目介绍本文讲述了毕业设计选题管理系统。结合电子管理系统的特点,分析了毕业设计选题管理系统的背景,给出了毕业设计选题管理系统实现的设计方案。本论......
  • 2024-09-11:用go语言,给定一个从0开始的整数数组nums和一个正奇数整数k, 要求在nums数组
    2024-09-11:用go语言,给定一个从0开始的整数数组nums和一个正奇数整数k,要求在nums数组中选择k个不重叠的子数组,使得这些子数组的能量值之和最大。子数组的能量值是通过一定规则计算得到的,具体规则是对于某个子数组,将其每个元素乘以一个特定系数,并将这些结果相加,系数随着元素在子数组......
  • flask+python抗洪救灾管理系统 2rucy-django毕业设计项目
    目录技术栈和环境说明具体实现截图预期达到的目标系统设计详细视频演示技术路线解决的思路性能/安全/负载方面可行性分析论证python-flask核心代码部分展示python-django核心代码部分展示研究方法感恩大学老师和同学源码获取技术栈和环境说明本系统以Python开发语言......
  • django基于web的团员信息管理系统
    django基于web的团员信息管理系统。开发技术:Python语言;django框架;mysql数据库。项目内容:本系统分为四部分,别是团员、班级、院系、管理员。团员信息管理系统具有注册登录、个人中心、院系管理、班级管理、团员管理、团队活动管理、公告信息管理、系统日志管理、活动信息管......
  • AdaBoost算法(AdbBoost Algorithm)—有监督学习方法、非概率模型、判别模型、非线性模型
    定义输入:训练数据集T={(x1......
  • Go runtime 调度器精讲(二):调度器初始化
    原创文章,欢迎转载,转载请注明出处,谢谢。0.前言上一讲介绍了Go程序初始化的过程,这一讲继续往下看,进入调度器的初始化过程。接着上一讲的执行过程,省略一些不相关的代码,执行到runtime/asm_amd64.s:rt0_go:343L:(dlv)siasm_amd64.s:3430x45431c*8b442418......
  • golang 的录音库
    一、PortAudio的go绑定【不推荐】https://github.com/gordonklaus/portaudio这个库有热度,但在Windows上需要从源码使用 VisualStudio或MinGW编译,比较麻烦。Beep库并没有录音功能,但有一个相关库 https://github.com/MarkKremer/microphone.git可以实现录音,然后使......
  • Python系列(11)- 使用 Pipenv 搭建 Django + Rest Framework 开发环境
    Django是一个开放源代码的Web应用框架,用Python语言编写的。采用了MTV的框架模式,即模型Model,模版Template和视图View。它最初是被开发来用于管理劳伦斯出版集团旗下的一些以新闻内容为主的网站的,即是CMS(内容管理系统)软件。Django:http://www.djangoproject.comGitHub:htt......