首页 > 数据库 >GO语言使用redis stream队列demo

GO语言使用redis stream队列demo

时间:2023-11-03 15:35:00浏览次数:37  
标签:stream err demo fmt redis client key Println

GO语言使用redis stream队列demo

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)

var client *redis.Client
var ctx context.Context
var key = "my_streamKey"       //key
var myConsumer = "my_consumer" //消费者
var group = "my_group"         // 消费者组的名称

func main() {
	Init()
	for {
		XAdd()
		time.Sleep(time.Second * 1)
	}

}

func Init() {
	// 创建Redis客户端
	client = redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379", // Redis服务器地址和端口
		Password: "123456",          // Redis服务器密码,如果有的话
		DB:       12,                   // Redis数据库索引
		PoolSize: 200,                  // 连接池大小
	})
	ctx = client.Context()
	groupInit()
	// 启动消费者
	go func() {
		for {
			streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group,              // 消费者组的名称
				Consumer: myConsumer,         // 消费者的名称
				Streams:  []string{key, ">"}, // Stream的名称和ID
				Count:    100,                // 要读取的消息数量
				Block:    time.Second * 1,    // 阻塞时间,0表示不阻塞
			}).Result()

			if err != nil {
				//fmt.Println("XReadGroup error:", err)
				continue
			}

			for _, stream := range streams {
				streamName := stream.Stream
				for _, message := range stream.Messages {
					messageID := message.ID
					messageValues := message.Values
					fmt.Printf("Stream: %s, Message ID: %s, Values: %v\n", streamName, messageID, messageValues)

					// 标记消息已经被消费者读取
					err := client.XAck(ctx, key, group, messageID).Err()
					if err != nil {
						fmt.Println("XAck error:", err)
						return
					}
					fmt.Println("消息已经被标记为已读取")
				}
			}
		}
	}()

}

func XAdd() {
	// 发布消息到Stream
	streamID, err := client.XAdd(ctx, &redis.XAddArgs{
		Stream: key, // Stream的名称
		Values: map[string]interface{}{
			"key1": "value1",
			"key2": "value2",
		},
	}).Result()

	if err != nil {
		fmt.Println("XAdd error:", err)
		return
	}

	fmt.Println("Stream ID:", streamID)
}

func groupInit() {
	// 判断key是否存在
	existsKey, err := client.Exists(ctx, key).Result()
	if err != nil {
		fmt.Println("Exists error:", err)
		return
	}

	if existsKey == 1 {
		fmt.Println("Key存在")
		// 获取所有消费者组信息
		groups, err := client.XInfoGroups(ctx, key).Result()
		if err != nil {
			fmt.Println("XInfoGroups error:", err)
			return
		}

		// 判断目标消费者组是否存在

		exists := false
		for _, g := range groups {
			if g.Name == group {
				exists = true
				break
			}
		}
		if exists {
			fmt.Println("消费者组存在")
		} else {
			fmt.Println("消费者组不存在")
			// 创建Stream
			streamCreated, err := client.XGroupCreateMkStream(ctx, key, group, "0").Result()
			if err != nil {
				fmt.Println("创建消费组 XGroupCreateMkStream error:", err)
			}
			fmt.Println("创建消费组:", streamCreated)
		}
	} else if existsKey == 0 {
		fmt.Println("Key不存在")
		// 创建Stream
		streamCreated, err := client.XGroupCreateMkStream(ctx, key, group, "0").Result()
		if err != nil {
			fmt.Println("创建消费组 XGroupCreateMkStream error:", err)
		}
		fmt.Println("创建消费组:", streamCreated)

	} else {
		fmt.Println("Exists返回值异常")
	}

}

标签:stream,err,demo,fmt,redis,client,key,Println
From: https://blog.51cto.com/u_13626606/8171438

相关文章

  • redis单线程
    一,redis单线程是什么意思 Redis的单线程指的是Redis的网络IO和键值对读写是由一个线程来完成的,这是Redis对外提供键值存储服务的主要流程。然而,请注意,Redis的其他功能,如持久化、异步删除、集群数据同步等,实际上是由额外的线程执行的。Redis的单线程模型主要是为了避免资源共享......
  • 带你了解 Stream 的使用,提升集合开发效率
    当涉及Java编程时,JavaStream是一个功能强大且高效的工具,用于处理集合数据。它提供了一种声明式的方式来操作数据,可以显著简化代码并提高可读性。在本文中,我们将深入探讨JavaStream,介绍其基本概念、常用操作和用例。什么是JavaStreamJavaStream是Java8引入的一种新的抽......
  • 分布式锁【Redission】
    一、简介    Redission,一个基于Redis实现的分布式工具,为Redis官网分布式解决方案。    Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(SeparationofConcern),从而让使用者能够将精力更集中地放在处理业务逻辑......
  • Java 8: 异步利器 CompletableFuture vs Parallel Stream 选哪个
    应人们对性能和体验的要求,异步在项目中用的越来越多,CompletableFuture和ParallelStream无疑是异步并发的利器。既然两者都可以实现异步并发,那么带来一个问题:什么时候该使用哪个呢,哪个场景下使用哪个会更好呢?这篇文章因此出现,旨在当执行异步进行编程时CompletableFuture与Parall......
  • 62.redis5安装
    1.依赖包安装yum-yinstallcppbinutilsglibcglibc-kernheadersglibc-commonglibc-develgccmakegcc-c++libstdc++-develtcl2.安装包准备官网https://redis.io下载最新版rediswgethttp://download.redis.io/releases/redis-4.0.14.tar.gztar-zxvfredis-5.0.14.......
  • 查找附近店铺(Redis GEO数据结构实现)
    附近店铺(RedisGEO数据结构实现)GEO数据结构GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)GEO......
  • 美团面试:Redis 除了缓存还能做什么?可以做消息队列吗?
    这是一道面试中常见的Redis基础面试题,主要考察求职者对于Redis应用场景的了解。即使不准备面试也建议看看,实际开发中也能够用到。内容概览:Redis除了做缓存,还能做什么?分布式锁:通过Redis来做分布式锁是一种比较常见的方式。通常情况下,我们都是基于Redisson来实现分布......
  • 如何使用 Redis 实现后台房间的数据管理?
    ​ ​摘要:利用Redis实现房间业务管理的实践与思考。文|即构业务后台开发团队在一些互动场景中,比如语音聊天室、电商直播等,成员控制、连麦、献花、发弹幕等互动功能,通常要求后台服务器能够储存管理房间及房间内成员的数据。那么如何组织、存储、操作这些数据以完成既定的业......
  • spring boot中redis的使用
    1.添加Redis依赖首先,需要在pom.xml文件中添加Redis依赖: <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-redis</artifactId></dependency>这个依赖包含了SpringDataRedis,以及Jedis和Lettuce这两......
  • Stream流的操作
       传参 List<ApproveJobContent>contentsOptional<String>value=contents.stream().filter(ctx->"DEMAND_APPLY_EXT_ATTRBUTES".equals(ctx.getContentName()).map(ApproveJobContent::getContentValue).findAny(); value.orElse("def......