首页 > 其他分享 >gorm.io/sharding改造:赋能单表,灵活支持多分表策略(下)

gorm.io/sharding改造:赋能单表,灵活支持多分表策略(下)

时间:2024-10-25 08:52:53浏览次数:8  
标签:err db 查询 分表 单表 io sharding id

背景

 分表组件改造的背景,我在这篇文章《gorm.io/sharding改造:赋能单表,灵活支持多分表策略(上)》中已经做了详细的介绍——这个组件不支持单表多个分表策略,为了突破这个限制做的改造。

在上一篇文章中,我们讨论了注册的改造,注册的改造修改逻辑比较简单,但是,上一篇文章中遗留了一个很重要的议题——在增删改查的实际业务操作中,分表组件究竟如何精准地定位到对应的分表策略,以确保业务逻辑的顺利执行?这篇文章,我们重点讨论这个逻辑。

源码解读

首先,我们需要看一下当我们执行查询,新增,更新或是删除逻辑,其执行流程是什么。比如,这么一个查询。

err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error

我们大概梳理一下其执行流程。

  1. 初始化查询
    • 当我们执行查询 err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error,首先会通过 db.Model(&Order{}) 初始化一个查询实例,设置相关的模型信息。
  2. 构建查询条件
    • 接着,通过 .Where("user_id = ?", userID) 方法,将查询条件 user_id = ? 以及对应的参数 userID 添加到查询实例中。
  3. 执行查询
    • 调用 .Find(&orders) 方法时,开始执行查询流程。
    • 在 Find 方法中,首先通过 db.getInstance() 获取数据库实例。
    • 然后,检查是否存在查询条件,如果有,则构建 SQL 条件表达式,并将其添加到查询语句中。
    • 设置查询结果的目标对象 dest,即 &orders
    func (db *DB) Find(dest interface{}, conds ...interface{}) (tx *DB) {
    	tx = db.getInstance()
    	if len(conds) > 0 {
    		if exprs := tx.Statement.BuildCondition(conds[0], conds[1:]...); len(exprs) > 0 {
    			tx.Statement.AddClause(clause.Where{Exprs: exprs})
    		}
    	}
    	tx.Statement.Dest = dest
    	return tx.callbacks.Query().Execute(tx)
    }
  4. 执行回调和处理
    • 调用 tx.callbacks.Query().Execute(tx) 执行查询回调链。
    • 在 Execute 方法中,会遍历并执行所有注册的查询前和查询后的回调函数。
    func (p *processor) Execute(db *DB) *DB {
    	
    //省略其他代码逻辑
    ......	
    for _, f := range p.fns {
    		f(db)
    	}
    
    //省略其他代码逻辑
    ......	
    	return db
    }
  5. 分片和查询执行
    • 最终,调用 pool.QueryContext 方法,根据上下文、SQL 查询语句和参数执行实际的数据库查询。
    • 在 QueryContext 方法中,会调用 pool.sharding.resolve 方法解析并修改查询语句,以处理数据库分片逻辑。
    • resolve 方法解析 SQL 查询语句,提取表名,并根据表名获取相应的分片配置。
    • 根据分片配置,可能会修改原始查询语句,以适应分片策略。
    func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
    	var (
    		curTime = time.Now()
    	)
        
       //该方法根据传入的SQL查询(及其参数)和上下文信息,动态地解析、修改并返回最终的分片 
       //查询、原始查询、目标表名以及可能出现的错误。
    	_, stQuery, _, err := pool.sharding.resolve(query, args...)
    	if err != nil {
    		return nil, err
    	}
    
    	// 省略
        ......
    
    	return rows, err
    }
    func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) {
    	ftQuery = query
    	stQuery = query
    	if len(s.configs) == 0 {
    		return
    	}
    
    	expr, err := sqlparser.NewParser(strings.NewReader(query)).ParseStatement()
    	if err != nil {
    		return ftQuery, stQuery, tableName, nil
    	}
    
    	// 省略
        ......
     
    	tableName = table.Name.Name
    	r, ok := s.configs[tableName]
    	if !ok {
    		return
    	} 
    
        // 省略
        ......
      return
    }
    
  6. 返回结果
    • 执行查询后,将结果填充到目标对象 &orders 中,并返回查询结果或错误。

我们重点关注resolve方法,这个方法包含了分表逻辑的处理逻辑:r, ok := s.configs[tableName]获取对应表的分表策略。

通过上述代码的解析,我们现在应该有了解决方案。原来的逻辑获取分表策略是根据表明获取的。那我们只要修改这个逻辑,根据表名+分表键名作为唯一键获取对应的分表策略就能实现我们的目标。

方案

接下来,我们需要思考的是,如何把分表键传进来呢?

我一开始想的是通过解析query获取查询条件中的分表键。但是,当我深入的看了这个逻辑之后,发现这个设想不能实现,因为value, id, keyFind, err = s.nonInsertValue(r.ShardingKey, condition, args...)这个方法中获取查询条件的字段是在这个函数内部实现的,不能保持一个统一的结构,而且改造复杂度比较高。

context在go语言有着广泛的使用场景,所以,我想着通过context的方式把分表键传递进来。有了这个想法,改造起来就很简单了。我们只需要resolve方法增加一个context的传参,并且r, ok := s.configs[tableName]这个获取分表策略,改成用表名+从context中获取的分表键作为键来获取分表策略即可。

如此,我们就实现了根据表名+分表键获取对应分表策略的逻辑,至此,我们的改造任务完成。

案例

我目前也只是简单的测试了两种分表策略的场景,仅仅只覆盖了查询和插入的场景。更复杂的场景还没有测试。诸如并发情况下的场景。

package test

import (
	"context"
	"fmt"
	"testing"
	"time"

	"gorm.io/driver/mysql"
	"gorm.io/gorm"
	"gorm.io/sharding"
)
var globalDB *gorm.DB

type Order struct {
	ID        int64  `gorm:"primaryKey"`
	OrderId   string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键
	UserID    int64  `gorm:"sharding:user_id"`
	ProductID int64
	OrderDate time.Time
	OrderYear int
}
// 自定义 ShardingAlgorithm
func customShardingAlgorithm4(value any) (suffix string, err error) {
	if year, ok := value.(int); ok {
		return fmt.Sprintf("_%d", year), nil
	}
	return "", fmt.Errorf("invalid order_date")
}

func customShardingAlgorithmUserId(value any) (suffix string, err error) {
	if userId, ok := value.(int64); ok {
		return fmt.Sprintf("_%d", userId%4), nil
	}
	return "", fmt.Errorf("invalid user_id")
}

// customePrimaryKeyGeneratorFn 自定义主键生成函数
func customePrimaryKeyGeneratorFn(tableIdx int64) int64 {
	var id int64
	seqTableName := "gorm_sharding_orders_id_seq" // 序列表名
	db := globalDB
	// 使用事务来确保主键生成的原子性
	tx := db.Begin()
	defer func() {
		if r := recover(); r != nil {
			tx.Rollback()
		}
	}()

	// 锁定序列表以确保并发安全(可选,取决于你的 MySQL 配置和并发级别)
	// 注意:在某些 MySQL 版本和配置中,使用 LOCK TABLES 可能不是最佳选择
	// 这里仅作为示例,实际应用中可能需要更精细的并发控制策略
	tx.Exec("LOCK TABLES " + seqTableName + " WRITE")

	// 查询当前的最大 ID
	tx.Raw("SELECT id FROM " + seqTableName + " ORDER BY id DESC LIMIT 1").Scan(&id)

	// 更新序列表(这里直接递增 1,实际应用中可能需要更复杂的逻辑)
	newID := id + 1
	tx.Exec("INSERT INTO "+seqTableName+" (id) VALUES (?)", newID) // 这里假设序列表允许插入任意 ID,实际应用中可能需要其他机制来确保 ID 的唯一性和连续性

	// 释放锁定
	tx.Exec("UNLOCK TABLES")

	// 提交事务
	if err := tx.Commit().Error; err != nil {
		panic(err) // 实际应用中应该使用更优雅的错误处理机制
	}

	return newID
}

// Test_Gorm_Sharding 用于测试 Gorm Sharding 插件
func Test_Gorm_Sharding6(t *testing.T) {
	// 连接到 MySQL 数据库
	dsn := "dev:xxxx@tcp(ip:port)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"
	db, err := gorm.Open(mysql.New(mysql.Config{
		DSN: dsn,
	}), &gorm.Config{})
	if err != nil {
		panic("failed to connect database")
	}
	globalDB = db

	config1 := sharding.Config{
		ShardingKey:       "order_year",
		ShardingAlgorithm: customShardingAlgorithm4, // 使用自定义的分片算法
		//PrimaryKeyGenerator: sharding.PKMySQLSequence,
		PrimaryKeyGenerator:   sharding.PKCustom,
		PrimaryKeyGeneratorFn: customePrimaryKeyGeneratorFn,
	}
	config2 := sharding.Config{
		ShardingKey:         "user_id",
		NumberOfShards:      4,
		ShardingAlgorithm:   customShardingAlgorithmUserId, // 使用自定义的分片算法
		PrimaryKeyGenerator: sharding.PKSnowflake,          // 使用 Snowflake 算法生成主键
	}
	mapConfig := make(map[string]sharding.Config)
	mapConfig["orders_order_year"] = config1
	mapConfig["orders_user_id"] = config2
	// 配置 Gorm Sharding 中间件,使用自定义的分片算法
	middleware := sharding.RegisterWithKeys(mapConfig) // 逻辑表名为 "orders"
	db.Use(middleware)

	// 查询示例
	var orders []Order

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ctx = context.WithValue(ctx, "sharding_key", "order_year")
	db = db.WithContext(ctx)
	err = db.Model(&Order{}).Where("order_year=? and product_id=?", 2025, 102).Find(&orders).Error
	if err != nil {
		fmt.Println("Error querying orders:", err)
	}
	fmt.Printf("sharding key order_year Selected orders: %#v\n", orders)

	// 查询示例
	FindByUserID2(db, int64(1))
	// 示例:插入订单数据
	InsertOrderByUserId(db)
	InsertOrderByOrderYear(db)
}

func FindByUserID2(db *gorm.DB, userID int64) ([]Order, error) {
	var orders []Order
	// 查询示例
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ctx = context.WithValue(ctx, "sharding_key", "user_id")
	db = db.WithContext(ctx)
	err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
	if err != nil {
		fmt.Println("Error querying orders:", err)
	}
	fmt.Printf("no sharding key user_id Selected orders: %#v\n", orders)
	return orders, err
}

type OrderByUserId struct {
	ID        int64  `gorm:"primaryKey"`
	OrderId   string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键
	UserID    int64  `gorm:"sharding:user_id"`
	ProductID int64
	OrderDate time.Time
}

func InsertOrderByUserId(db *gorm.DB) error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ctx = context.WithValue(ctx, "sharding_key", "user_id")
	db = db.WithContext(ctx)
	// 示例:插入订单数据
	order := OrderByUserId{
		OrderId:   "20240101ORDER0001",
		UserID:    100,
		ProductID: 100,
		OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
	}
	err := db.Table("orders").Create(&order).Error
	if err != nil {
		fmt.Println("Error creating order:", err)
	}
	order2 := OrderByUserId{
		OrderId:   "20250101ORDER0001",
		UserID:    105,
		ProductID: 100,
		OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
	}
	err = db.Table("orders").Create(&order2).Error
	if err != nil {
		fmt.Println("Error creating order:", err)
	}
	return err
}

func InsertOrderByOrderYear(db *gorm.DB) error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ctx = context.WithValue(ctx, "sharding_key", "order_year")
	db = db.WithContext(ctx)
	orderYear := 2024
	// 示例:插入订单数据
	order := Order{
		OrderId:   "20240101ORDER0002",
		UserID:    1,
		ProductID: 100,
		OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
		OrderYear: orderYear,
	}
	err := db.Create(&order).Error
	if err != nil {
		fmt.Println("Error creating order:", err)
	}
	orderYear = 2025
	order2 := Order{
		OrderId:   "20250101ORDER0002",
		UserID:    1,
		ProductID: 100,
		OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
		OrderYear: orderYear,
	}
	err = db.Create(&order2).Error
	if err != nil {
		fmt.Println("Error creating order:", err)
	}
	return err
}

总结

通过改造gorm.io/sharding组件,我们实现了根据表名+分表键获取对应分表策略的逻辑。这一改造使得组件能够支持单表多个分表策略,更加灵活和强大。目前,我们已经简单测试了查询和插入场景,更复杂的场景和并发情况还需进一步测试和优化。通过这一改造,我们为业务逻辑的执行提供了更加精准和高效的分表策略定位。

标签:err,db,查询,分表,单表,io,sharding,id
From: https://blog.csdn.net/ezreal_pan/article/details/143213425

相关文章

  • 深入浅出理解BLE AUDIO CSIS
    CSIS是CoordinateSetsIdentificationservice,翻译过来就是协调集识别服务。什么是协调集,可以理解为具有相同特征的一伙设备,最典型的就是左右两个蓝牙耳机是一个协调集,所以它们具有相同的协调集标志,但是具有相同协调集的设备要如何识别,这就是本篇需要讲解的内容,其实还是比......
  • 【源码】Sharding-JDBC源码分析之Sql解析的原理
     Sharding-JDBC系列1、Sharding-JDBC分库分表的基本使用2、Sharding-JDBC分库分表之SpringBoot分片策略3、Sharding-JDBC分库分表之SpringBoot主从配置4、SpringBoot集成Sharding-JDBC-5.3.0分库分表5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表6、【源码......
  • IO进程_day5
    目录线程Thread1.什么是线程1.1概念1.2进程和线程区别1.3线程资源2.函数接口2.1创建线程:pthread_create2.2退出线程:pthread_exit2.3回收线程资源练习:通过父子进程完成对文件的拷贝(cp)练习:输入输出,quit结束3.同步3.1概念3.2同步机制3.3函数接......
  • WPF No imaging component suitable to complete this operation was found.
    System.NotSupportedExceptionHResult=0x80131515Message=Noimagingcomponentsuitabletocompletethisoperationwasfound.Source=PresentationCoreStackTrace:Thisisbecausetheimagefileisnotcompletedorcorrupted,thealternativeistoa......
  • 在 Windows Server 2008 R2 中,您可以使用批处理(.bat)文件来查询 Win32_NetworkAdapterC
    在WindowsServer2008R2中,Win32_NetworkAdapterConfiguration类是Windows管理工具(WMI)基础结构的一部分。它提供了有关服务器上网络适配器配置的详细信息。您可以使用此类查询各种网络设置,例如IP地址、子网掩码、DNS服务器和DHCP设置。Win32_NetworkAdapterConfigurat......
  • io流
    输入流:用于读取数据,输入源从数据库读取数据,如文件,网络连接等。常见的输入流类有fileInputStream,byterArrayInputStream.socketInputStream.输出流:用于读写数据,输出流将数据写入到目标,如文件,数据库等常见的有fileOutStream,ByteArrayOutSteaqm,SocketOutStream.IO场景:读写文件......
  • 使用 Windows Management Instrumentation (WMI) 更新用户的 DNS 设置可以通过几种方
    使用WindowsManagementInstrumentation(WMI)更新用户的DNS设置可以通过几种方法实现,主要是使用Win32_NetworkAdapterConfiguration类来修改网络适配器的DNS设置。以下是一个使用PowerShell脚本的示例,展示如何通过WMI更新DNS设置。使用PowerShell更新DNS设置......
  • [Coci2011]kamion 题解
    前言题目链接:Hydro&bzoj;黑暗爆炸。题意简述给你一张\(n\)个点\(m\)条边的有向图。有\(p\)种括号,每条边的边权可以是这\(p\)种括号中某一种的左括号或者右括号,也可以为空。问你有多少条从\(1\)开始到\(n\)的长度小于等于\(k\)的路径,满足括号匹配,或者剩余若干未......
  • 论文翻译 | Bounding the Capabilities of Large Language Models in Open Text Gener
    摘要        开放式生成模型的局限性尚不清楚,但却越来越重要。是什么让他们成功,又是什么让他们失败?在本文中,我们采用了一种以提示为中心的方法来分析和限定开放式生成模型的能力。我们提出了两种具有挑战性的提示约束类型的通用分析方法:结构和风格。这些约束类型被归......
  • SciTech-Mathmatics-Probability+Statistics-Bayes Formula: Application:
    SciTech-Mathmatics-Probability+StatisticsBayesFormula:Application:DescriptiveStatistics(ofSamples)\[\large\begin{array}{lll}\\\text{SamplesData}\begin{cases}\\\bm{Center\Tendency}\overset{\bm{Mean}}{\right......