首页 > 其他分享 >Go+seata实现分布式事务

Go+seata实现分布式事务

时间:2024-04-26 10:34:02浏览次数:24  
标签:log err nil sql Go 分布式 id seata

docker安装seata

version: '3'
services:
  seata-server:
    image: seataio/seata-server:latest
    ports:
      - "8091:8091"
      - "7091:7091"
    environment:
      - SEATA_PORT=8091
      - STORE_MODE=file

  mysql:
    image: mysql:8.0.32
    container_name: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=12345678
    command: --default-authentication-plugin=mysql_native_password --default-time-zone='+08:00'
    volumes:
      - ./mysql:/docker-entrypoint-initdb.d
      - ./mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
    ports:
      - "3306:3306"

mysql数据库文件和mysql配置(可选不一定非要docker,只需要大于8.0就行)

CREATE database if NOT EXISTS seata_client default character set utf8mb4 collate utf8mb4_unicode_ci;
USE seata_client;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

CREATE TABLE IF NOT EXISTS  order_tbl (
  id int(11) NOT NULL AUTO_INCREMENT,
  user_id varchar(255) DEFAULT NULL,
  commodity_code varchar(255) DEFAULT NULL,
  count int(11) DEFAULT '0',
  money int(11) DEFAULT '0',
  descs varchar(255) DEFAULT '',
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO seata_client.order_tbl (id, user_id, commodity_code, count, money, descs) VALUES (1, 'NO-100001', 'C100000', 100, 10, 'init desc');

DROP TABLE IF EXISTS undo_log;

CREATE TABLE undo_log (
                            id bigint NOT NULL AUTO_INCREMENT,
                            branch_id bigint NOT NULL,
                            xid varchar(100) NOT NULL,
                            context varchar(128) NOT NULL,
                            rollback_info longblob NOT NULL,
                            log_status int NOT NULL,
                            log_created datetime NOT NULL,
                            log_modified datetime NOT NULL,
                            ext varchar(100) DEFAULT NULL,
                            PRIMARY KEY (id),
                            KEY idx_unionkey (xid,branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

本次主要测试at跟xa模式,at模式跟xa模式差距不大

连接seata的配置文件

# time 时间单位对应的是 time.Duration(1)
seata:
  enabled: true
  # application id
  application-id: applicationName
  # service group
  tx-service-group: default_tx_group
  access-key: aliyunAccessKey
  secret-key: aliyunSecretKey
  enable-auto-data-source-proxy: true
  data-source-proxy-mode: AT
  client:
    rm:
      # Maximum cache length of asynchronous queue
      async-commit-buffer-limit: 10000
      # The maximum number of retries when report reports the status
      report-retry-count: 5
      # The interval for regularly checking the metadata of the db(AT)
      table-meta-check-enable: false
      # Whether to report the status if the transaction is successfully executed(AT)
      report-success-enable: false
      # Whether to allow regular check of db metadata(AT)
      saga-branch-register-enable: false
      saga-json-parser: fastjson
      saga-retry-persist-mode-update: false
      saga-compensate-persist-mode-update: false
      #Ordered.HIGHEST_PRECEDENCE + 1000  #
      tcc-action-interceptor-order: -2147482648
      # Parse SQL parser selection
      sql-parser-type: druid
      lock:
        retry-interval: 30
        retry-times: 10
        retry-policy-branch-rollback-on-conflict: true
    tm:
      commit-retry-count: 5
      rollback-retry-count: 5
      default-global-transaction-timeout: 60s
      degrade-check: false
      degrade-check-period: 2000
      degrade-check-allow-times: 10s
      interceptor-order: -2147482648
    undo:
      # Judge whether the before image and after image are the same,If it is the same, undo will not be recorded
      data-validation: false
      # Serialization method
      log-serialization: json
      # undo log table name
      log-table: undo_log
      # Only store modified fields
      only-care-update-columns: true
      compress:
        # Compression type. Allowed Options: None, Gzip, Zip, Sevenz, Bzip2, Lz4, Zstd, Deflate
        type: None
        #  Compression threshold Unit: k
        threshold: 64k
    load-balance:
      type: RandomLoadBalance
      virtual-nodes: 10
  service:
    vgroup-mapping:
      # Prefix for Print Log
      default_tx_group: default
    grouplist:
      default: 127.0.0.1:8091
    enable-degrade: false
    # close the transaction
    disable-global-transaction: false
  transport:
    shutdown:
      wait: 3s
    # Netty related configurations
    # type
    type: TCP
    server: NIO
    heartbeat: true
    # Encoding and decoding mode
    serialization: seata
    # Message compression mode
    compressor: none
    # Allow batch sending of requests (TM)
    enable-tm-client-batch-send-request: false
    # Allow batch sending of requests (RM)
    enable-rm-client-batch-send-request: true
    # RM send request timeout
    rpc-rm-request-timeout: 30s
    # TM send request timeout
    rpc-tm-request-timeout: 30s
  # Configuration Center
  config:
    type: file
    file:
      name: config.conf
    nacos:
      namespace: ""
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      username: ""
      password: ""
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #access-key: ""
      #secret-key: ""
      data-id: seata.properties
  # Registration Center
  registry:
    type: file
    file:
      name: registry.conf
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      group: "SEATA_GROUP"
      namespace: ""
      username: ""
      password: ""
      ##if use MSE Nacos with auth, mutex with username/password attribute  #
      #access-key: ""  #
      #secret-key: ""  #
  log:
    exception-rate: 100
  tcc:
    fence:
      # Anti suspension table name
      log-table-name: tcc_fence_log_test
      clean-period: 60s
  # getty configuration
  getty:
    reconnect-interval: 0
    # temporary not supported connection-num
    connection-num: 1
    session:
      compress-encoding: false
      tcp-no-delay: true
      tcp-keep-alive: true
      keep-alive-period: 120s
      tcp-r-buf-size: 262144
      tcp-w-buf-size: 65536
      tcp-read-timeout: 1s
      tcp-write-timeout: 5s
      wait-timeout: 1s
      max-msg-len: 16498688
      session-name: client_test
      cron-period: 1s

utils.go

package util

import (
	"database/sql"
	"os"

	sql2 "github.com/seata/seata-go/pkg/datasource/sql"
)

func GetAtMySqlDb() *sql.DB {
	dsn := "root:root@tcp(192.168.252.1:3306)/seata_client?multiStatements=true&interpolateParams=true"
	dbAt, err := sql.Open(sql2.SeataATMySQLDriver, dsn)
	if err != nil {
		panic("init seata at mysql driver error")
	}
	return dbAt
}

func GetXAMySqlDb() *sql.DB {
	dsn := "root:root@tcp(192.168.252.1:3306)/seata_client?multiStatements=true&interpolateParams=true"
	dbAt, err := sql.Open(sql2.SeataXAMySQLDriver, dsn)
	if err != nil {
		panic("init seata at mysql driver error")
	}
	return dbAt
}

客户端,模拟用户请求

package main

import (
	"context"
	"flag"
	"fmt"
	"github.com/parnurzeal/gorequest"
	"github.com/seata/seata-go/pkg/constant"
	"github.com/seata/seata-go/pkg/tm"
	"github.com/seata/seata-go/pkg/util/log"
	"net/http"
	"time"

	"github.com/seata/seata-go/pkg/client"
)

var serverIpPort = "http://127.0.0.1:8080"

func main() {
	flag.Parse()
	client.InitPath("../../../conf/seatago.yml")

	bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
	defer cancel()
	sampleUpdate(bgCtx)

}
func updateData(ctx context.Context) (re error) {
	request := gorequest.New()
	log.Infof("branch transaction begin")
	request.Post(serverIpPort+"/updateDataSuccess").
		Set(constant.XidKey, tm.GetXID(ctx)).
		End(func(response gorequest.Response, body string, errs []error) {
			if response.StatusCode != http.StatusOK {
				re = fmt.Errorf("update data fail")
			}
		})
	return
}

func sampleUpdate(ctx context.Context) {
	if err := tm.WithGlobalTx(ctx, &tm.GtxConfig{
		Name:    "ATSampleLocalGlobalTx_Update",
		Timeout: time.Second * 30,
	}, updateData); err != nil {
		log.Info(err)
	}
}

服务端,模拟api网关,往不同的微服务发请求

package main

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/seata/seata-go-samples/util"
	"github.com/seata/seata-go/pkg/client"
	ginmiddleware "github.com/seata/seata-go/pkg/integration/gin"
	"github.com/seata/seata-go/pkg/util/log"
	"net/http"
)

var db *sql.DB

func main() {
	client.InitPath("../../../conf/seatago.yml")
    //TODO 这里是不同模式
    // db = util.GetXAMySqlDb() //xa
	db = util.GetAtMySqlDb()   //at
	r := gin.Default()
	r.Use(ginmiddleware.TransactionMiddleware())
	r.POST("/updateDataSuccess", updateDataSuccessHandler)
	if err := r.Run(":8080"); err != nil {
		log.Fatalf("start tcc server fatal: %v", err)
	}
}

func updateDataSuccessHandler(c *gin.Context) {
	log.Infof("get tm updateData")
	if err := updateDataSuccess(c); err != nil {
		c.JSON(http.StatusBadRequest, "updateData failure")
		return
	}
	if err := updateDataSuccess2(c); err != nil {
		c.JSON(http.StatusBadRequest, "updateData2 failure")
		return
	}
	//c.JSON(http.StatusOK, "updateData ok") //成功
	c.JSON(http.StatusBadRequest, "updateData failure") //TODO 测试fail,回滚
}

func updateDataSuccess(ctx context.Context) error {
	sql := "update order_tbl set count=? where id=?"
	ret, err := db.ExecContext(ctx, sql, 10, 1)
	if err != nil {
		fmt.Printf("update failed, err:%v\n", err)
		return nil
	}

	rows, err := ret.RowsAffected()
	if err != nil {
		fmt.Printf("update failed, err:%v\n", err)
		return nil
	}
	fmt.Printf("更新成功 success: %d.\n", rows)
	return nil
}

func updateDataSuccess2(ctx context.Context) error {
	sql := "update order_tbl set count=? where id=?"
	ret, err := db.ExecContext(ctx, sql, 101, 1)
	if err != nil {
		fmt.Printf("update failed, err:%v\n", err)
		return nil
	}

	rows, err := ret.RowsAffected()
	if err != nil {
		fmt.Printf("update failed, err:%v\n", err)
		return nil
	}
	fmt.Printf("更新成功 success: %d.\n", rows)
	return nil
}

把原生sql连接改成gorm连接

package main

import (
	"context"
	"database/sql"
	"time"

	"github.com/seata/seata-go/pkg/client"
	sql2 "github.com/seata/seata-go/pkg/datasource/sql"
	"github.com/seata/seata-go/pkg/tm"
	"gorm.io/driver/mysql"
	"gorm.io/gorm"
)

type OrderTblModel struct {
	Id            int64  `gorm:"column:id" json:"id"`
	UserId        string `gorm:"column:user_id" json:"user_id"`
	CommodityCode string `gorm:"commodity_code" json:"commodity_code"`
	Count         int64  `gorm:"count" json:"count"`
	Money         int64  `gorm:"money" json:"money"`
	Descs         string `gorm:"descs" json:"descs"`
}

func main() {
	initConfig()
	// insert
	tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
		Name:    "ATSampleLocalGlobalTx",
		Timeout: time.Second * 30,
	}, insertData)
	<-make(chan struct{})
}

func initConfig() {
	// init seata client config
	client.InitPath("../../conf/seatago.yml")
	// init db object
	initDB()
}

var gormDB *gorm.DB

func initDB() {
	sqlDB, err := sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true")
	if err != nil {
		panic("init service error")
	}

	gormDB, err = gorm.Open(mysql.New(mysql.Config{
		Conn: sqlDB,
	}), &gorm.Config{})
}

// insertData insert one data
func insertData(ctx context.Context) error {
	data := OrderTblModel{
		UserId:        "NO-100003",
		CommodityCode: "C100001",
		Count:         101,
		Money:         11,
		Descs:         "insert desc",
	}

	return gormDB.WithContext(ctx).Table("order_tbl").Create(&data).Error
}

// deleteData delete one data
func deleteData(ctx context.Context) error {
	return gormDB.WithContext(ctx).Where("id = ?", "1").Delete(&OrderTblModel{}).Error
}

// updateDate update one data
func updateData(ctx context.Context) error {
	return gormDB.WithContext(ctx).Model(&OrderTblModel{}).Where("id = ?", "1").Update("commodity_code", "C100002").Error
}

最后,如果报错 first phase error: undo log parser type jackson not found 那么请修改

这句是写死的,没有走配置

标签:log,err,nil,sql,Go,分布式,id,seata
From: https://www.cnblogs.com/qcy-blog/p/18159440

相关文章

  • 分布式事务之dtm
    github: https://github.com/dtm-labs/dtm 本人使用场景,目前微服务中存在的用户服务,商品服务,订单服务,支付服务,在进行下单操作的时候,需要创建订单并扣减库存,这个时候就需要保证事务的一致性,但是对于目前的微服务架构来说就需要一套分布式的事务来实现,于是引入DT......
  • docker.from_env() 获取docker守护进程时出现 TypeError: load_config() got an unexp
    某天使用python重启docker容器时,出现了一个令人费解的BUG,我的代码为1defrestart_docker(container_name):2#连接到docker守护进程3client=docker.from_env()4try:5#获取容器对象6container=client.containers.get(containe......
  • spannerlib优雅的go异常处理
    蹩脚的go异常处理一般写go的人,如果他不是写算法,正常写业务代码的话,可能都会为优雅的异常处理而烦恼,因为脑子抽筋的go设计者们,总是感觉语法糖是一种很低级的东西。但是在我们大多数公司的业务逻辑中,没有语法糖让代码非常丑陋,不易于维护。如何让go代码更具有可读性,哪么就要给go......
  • Golang - 空指针如何预防
    在Go语言中,空指针是一个常见的运行时错误来源,它通常发生在尝试访问一个未被初始化或已被设置为nil的指针所指向的值。1、凡是有点『.』操作的的行为都要先进行非nil判断:例如,想记录一个err的msg,通过err.Error()就可以获取到err的string类型的错误消息msg,但这里需要对err进行非nil......
  • 三十分钟入门基础Go(Java小子版)
    前言Go语言定义Go(又称Golang)是Google的RobertGriesemer,RobPike及KenThompson开发的一种静态、强类型、编译型语言。Go语言语法与C相近,但功能上有:内存安全,GC,结构形态及CSP-style并发计算。适用范围本篇文章适用于学习过其他面向对象语言(Java、Php),但没有学过......
  • Django admin static files errors All In One
    DjangoadminstaticfileserrorsAllInOne404errorshttp://127.0.0.1:8000/admin/demos$pythonmanage.pyrunserver#admin#123456#[email protected]://127.0.0.1:8000/admin/login/?next=/admin/(......
  • MongoDB db.collection.explain()
    db.collection.explain()封装了explain命令,推荐使用db.collection.explain()来执行explain命令。 db.collection.explain()返回以下方法的查询计划:aggregate()、count()、find()、remove()、distinct()、findAndModify() 因此,db.collection.explain()的使用方法就是......
  • golang工具函数,把一个金额整型,单位为分,转成"1,231,111.00"格式的字符串
    这个函数首先将整数除以100来获取代表元的浮点数,然后格式化此数值为两位小数的字符串。接下来,函数将字符串分成整数和小数部分,并且为整数部分添加千位分隔符。最后,如果存在小数部分,它会将这两部分重新组合并返回正确格式化的金额字符串。为了正确地处理负数,我们需要先检查金额是......
  • go学习00
    下载gin失败解决办法:cmd输入goenv-wGO111MODULE=ongoenv-wGOPROXY=https://goproxy.io,direct即可安装gin包gogetgithub.com/gin-gonic/gingoland中go.mod的require是灰色解决办法:即可愉快运行Demopackagemainimport( "github.com/gin-gonic/gin")func......
  • mongodb 分片创建使用汇总
    5.Mongodb1.查看分片:db.runComand({listshards:1})2.查看数据存储情况:showdbs3.查看分片状态:sh.status()4.开启数据库分片配置:db.runComand({enablesharding:"testdb"})db.enableSharding("testdb")db.shardCollection("testdb.persons",{id:1})5.创建索引(如果有......