首页 > 其他分享 >Go使用rocketmq实现分布式事务-demo

Go使用rocketmq实现分布式事务-demo

时间:2024-03-26 17:25:57浏览次数:28  
标签:primitive return err demo fmt msg Printf Go rocketmq

本示例只是demo,没有接入mysql

rocketmq自行安装

sever

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"os"
	"strconv"
	"sync"
	"sync/atomic"
	"time"
)

// 事务消息的结构体
type DemoListener struct {
	localTrans       *sync.Map
	transactionIndex int32
}

func NewDemoListener() *DemoListener {
	return &DemoListener{
		localTrans: new(sync.Map),
	}
}

// 执行并发送
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
	nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
	fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
	status := nextIndex % 3
	dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
	//在执行SendMessageInTransaction方法的时候会调用此方法ExecuteLocalTransaction,
	//如果ExecuteLocalTransaction 返回primitive.UnknowState 那么brocker就会调用CheckLocalTransaction方法检查消息状态
	// 如果返回  primitive.CommitMessageState 和primitive.RollbackMessageState 则不会调用CheckLocalTransaction
	return primitive.UnknowState
	//return primitive.RollbackMessageState
	//return primitive.CommitMessageState
}

// 检查本地事务是否成功
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
	v, existed := dl.localTrans.Load(msg.TransactionId)
	if !existed {
		fmt.Printf("unknow msg: %v, return Commit", msg)
		return primitive.CommitMessageState
	}
	state := v.(primitive.LocalTransactionState)
	fmt.Printf("检查本地事务是否成功 msg transactionID : %v\n", msg.TransactionId)
	switch state {
	case 1:
		fmt.Printf("回滚: %v\n", msg.Body)
		return primitive.RollbackMessageState
	case 2:
		fmt.Printf("未知: %v\n", msg.Body)
		return primitive.UnknowState
	default:
		fmt.Printf("默认提交: %v\n", msg.Body)
		return primitive.CommitMessageState
	}
}

func main() {
	p, _ := rocketmq.NewTransactionProducer(
		NewDemoListener(), //自定义listener
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.3.98:9876"})), //连接地址
		producer.WithRetry(1),               //重试次数
		producer.WithGroupName("testGroup"), //生产者组,可有可无,跟client要对上
	)
	err := p.Start()
	if err != nil {
		fmt.Printf("开启失败: %s\n", err.Error())
		os.Exit(1)
	}

	topic := "test"
	for i := 0; i < 10; i++ {
		res, err := p.SendMessageInTransaction(
			context.Background(),
			primitive.NewMessage(topic, []byte("测试RocketMQ事务消息"+strconv.Itoa(i))),
		)

		if err != nil {
			fmt.Printf("发送消息失败: %s\n", err)
		} else {
			fmt.Printf("发送消息成功=%s\n", res.String())
		}
	}
	time.Sleep(5 * time.Minute)
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("关闭失败: %s", err.Error())
	}
}

client

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"os"
	"time"
)

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"), //组,跟服务端对上
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.3.98:9876"})), //地址
	)
	//消费对应topic
	err := c.Subscribe("test", consumer.MessageSelector{},
		func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for i := range msgs {
				fmt.Printf("订阅回调: %v \n", msgs[i])
			}
			//这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater
			return consumer.ConsumeSuccess, nil
			//这个相当于失败  要回滚
			//return consumer.ConsumeRetryLater, nil
		})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("关闭消费者失败: %s", err.Error())
	}
}

标签:primitive,return,err,demo,fmt,msg,Printf,Go,rocketmq
From: https://www.cnblogs.com/qcy-blog/p/18097120

相关文章

  • ENSP Demo 5 L3 Switch - Router
    可以将SW1的G0/0/1接口改为三层接口,也可以创建虚接口关联G0/0/1。syssysnsw1vlanbatch3040intg0/0/2portlink-typeaccessportdefaultvlan30intg0/0/1portlink-typeaccessportdefaultvlan40interfacevlanif30ipadd23.0.0.124interfacevlan......
  • go-buffer-pool
    go-buffer-poolgo-buffer-poolpackagepoolimport("math""math/bits""sync")//GlobalPoolisastaticPoolforreusingbyteslicesofvarioussizes.varGlobalPool=new(BufferPool)//MaxLengthisthemaximumlen......
  • 分享一个项目:go `file_line`,在编译器得到源码行号,减少运行期runtime消耗
    作者:张富春(ahfuzhang),转载时请注明作者和引用链接,谢谢!cnblogs博客zhihuGithub公众号:一本正经的瞎扯file_linehttps://github.com/ahfuzhang/file_lineLike__FILE__/__LINE__ofC:usegogeneratetogetsourcecodelinenumberatcompiletime.像C语言里面......
  • 【IT老齐054】MongoDB介绍
    【IT老齐054】MongoDB介绍场景特点多形性:同一个集合中可以包含不同字段(类型)的文档对象动态性:线上修改数据模式,修改是应用与数据库均无须下线数据治理:支持使用JSONSchema来规范数据模式。在保证模式灵活动态的前提下,提供数据治理能力速度优势数据库引擎只需要在......
  • Django_Restful_Framework视图与路由
    视图与路由drf除了在数据序列化部分简写代码以外,还在视图中提供了简写操作。所以在django原有的django.views.View类基础上,drf封装了多个子类出来提供给我们使用。**DjangoRESTframwork**提供的视图的主要作用:控制序列化器的执行(检验、保存、转换数据)控制数据库查询的执......
  • Go 切片
    Go切片切片结构源码包src/runtime/slice.go中定义slice的结构为typeslicestruct{arrayunsafe.Pointerlenintcapint}array指针指向底层数组len表示切片长度cap表示切片容量切片扩容机制go1.18之后:growslice方法中可以得知go切片......
  • 使用go语言, 如何 只需一步调用,创建支付宝代扣
    使用go语言,如何只需一步调用,创建支付宝代扣  目标原理快速使用gitclonehttps://github.com/284851828/alilite_go.gitgomodtidygorundemo.go"#alilite_go"packagemainimport( "log" "time" c"alilite/client"//Replacewith......
  • Django_Restful_Framework
    1.Web应用模式在开发Web应用中,有两种应用模式:前后端不分离前后端分离2.api接口为了在团队内部形成共识、防止个人习惯差异引起的混乱,我们需要找到一种大家都觉得很好的接口实现规范,而且这种规范能够让后端写的接口,用途一目了然,减少双方之间的合作成本。目前市面上大部......
  • nexus 代理 go
    创建 BlobStores创建Repositoriesnginx配置server{listen19000;server_namelocalhost;#设置代理访问日志access_loglogs/yum.access.log;error_loglogs/yum.error.log;location/goproxy/{ proxy_passhtt......
  • 理解 go mod init 命令
    gomodinit命令是Go1.11中引入的Go模块系统的基本组成部分。它用于创建或初始化Go模块,是管理Go项目中依赖关系和版本的一种方法。下面是关于gomodinit的全部内容:1.初始化Go模块gomodinit命令的主要用途是初始化项目中的Go模块。Go模块是指与版本相关......