先搭建nats集群
version: "3.5"
services:
nats:
image: nats
ports:
- "8222:8222"
- "4222:4222"
- "6222:6222"
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222"
networks: ["nats"]
nats-1:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222 -js -n node2"
networks: ["nats"]
depends_on: ["nats"]
nats-2:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222 -js -n node3"
networks: ["nats"]
depends_on: ["nats"]
networks:
nats:
name: nats
检测集群状态
curl http://127.0.0.1:8222/routez
go代码测试Work-queue Stream,其他测试代码在 https://natsbyexample.com/
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
url := "nats://192.168.252.128:4222"
nc, err := nats.Connect(url)
defer nc.Drain()
if err != nil {
fmt.Println(err)
}
js, _ := jetstream.New(nc)
cfg := jetstream.StreamConfig{
Name: "EVENTS",
Retention: jetstream.WorkQueuePolicy,
Subjects: []string{"events.>"},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, _ := js.CreateStream(ctx, cfg)
fmt.Println("created the stream")
js.Publish(ctx, "events.us.page_loaded", nil)
js.Publish(ctx, "events.eu.mouse_clicked", nil)
js.Publish(ctx, "events.us.input_focused", nil)
fmt.Println("published 3 messages")
fmt.Println("# Stream info without any consumers")
printStreamState(ctx, stream)
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "processor-1",
})
msgs, _ := cons.Fetch(3)
for msg := range msgs.Messages() {
msg.DoubleAck(ctx)
}
fmt.Println("\n# Stream info with one consumer")
printStreamState(ctx, stream)
_, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "processor-2",
})
fmt.Println("\n# Create an overlapping consumer")
fmt.Println(err)
stream.DeleteConsumer(ctx, "processor-1")
_, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "processor-2",
})
fmt.Printf("created the new consumer? %v\n", err == nil)
stream.DeleteConsumer(ctx, "processor-2")
fmt.Println("\n# Create non-overlapping consumers")
cons1, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "processor-us",
FilterSubject: "events.us.>",
})
cons2, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "processor-eu",
FilterSubject: "events.eu.>",
})
js.Publish(ctx, "events.eu.mouse_clicked", nil)
js.Publish(ctx, "events.us.page_loaded", nil)
js.Publish(ctx, "events.us.input_focused", nil)
js.Publish(ctx, "events.eu.page_loaded", nil)
fmt.Println("published 4 messages")
msgs, _ = cons1.Fetch(2)
for msg := range msgs.Messages() {
fmt.Printf("us sub got: %s\n", msg.Subject())
msg.Ack()
}
msgs, _ = cons2.Fetch(2)
for msg := range msgs.Messages() {
fmt.Printf("eu sub got: %s\n", msg.Subject())
msg.Ack()
}
}
func printStreamState(ctx context.Context, stream jetstream.Stream) {
info, _ := stream.Info(ctx)
b, _ := json.MarshalIndent(info.State, "", " ")
fmt.Println(string(b))
}
标签:stream,nats,ctx,fmt,jetstream,js,Go
From: https://www.cnblogs.com/qcy-blog/p/18409663