背景
分表组件改造的背景,我在这篇文章《gorm.io/sharding改造:赋能单表,灵活支持多分表策略(上)》中已经做了详细的介绍——这个组件不支持单表多个分表策略,为了突破这个限制做的改造。
在上一篇文章中,我们讨论了注册的改造,注册的改造修改逻辑比较简单,但是,上一篇文章中遗留了一个很重要的议题——在增删改查的实际业务操作中,分表组件究竟如何精准地定位到对应的分表策略,以确保业务逻辑的顺利执行?这篇文章,我们重点讨论这个逻辑。
源码解读
首先,我们需要看一下当我们执行查询,新增,更新或是删除逻辑,其执行流程是什么。比如,这么一个查询。
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
我们大概梳理一下其执行流程。
- 初始化查询:
- 当我们执行查询
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
,首先会通过db.Model(&Order{})
初始化一个查询实例,设置相关的模型信息。
- 当我们执行查询
- 构建查询条件:
- 接着,通过
.Where("user_id = ?", userID)
方法,将查询条件user_id = ?
以及对应的参数userID
添加到查询实例中。
- 接着,通过
- 执行查询:
- 调用
.Find(&orders)
方法时,开始执行查询流程。 - 在
Find
方法中,首先通过db.getInstance()
获取数据库实例。 - 然后,检查是否存在查询条件,如果有,则构建 SQL 条件表达式,并将其添加到查询语句中。
- 设置查询结果的目标对象
dest
,即&orders
。
func (db *DB) Find(dest interface{}, conds ...interface{}) (tx *DB) { tx = db.getInstance() if len(conds) > 0 { if exprs := tx.Statement.BuildCondition(conds[0], conds[1:]...); len(exprs) > 0 { tx.Statement.AddClause(clause.Where{Exprs: exprs}) } } tx.Statement.Dest = dest return tx.callbacks.Query().Execute(tx) }
- 调用
- 执行回调和处理:
- 调用
tx.callbacks.Query().Execute(tx)
执行查询回调链。 - 在
Execute
方法中,会遍历并执行所有注册的查询前和查询后的回调函数。
func (p *processor) Execute(db *DB) *DB { //省略其他代码逻辑 ...... for _, f := range p.fns { f(db) } //省略其他代码逻辑 ...... return db }
- 调用
- 分片和查询执行:
- 最终,调用
pool.QueryContext
方法,根据上下文、SQL 查询语句和参数执行实际的数据库查询。 - 在
QueryContext
方法中,会调用pool.sharding.resolve
方法解析并修改查询语句,以处理数据库分片逻辑。 resolve
方法解析 SQL 查询语句,提取表名,并根据表名获取相应的分片配置。- 根据分片配置,可能会修改原始查询语句,以适应分片策略。
func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { var ( curTime = time.Now() ) //该方法根据传入的SQL查询(及其参数)和上下文信息,动态地解析、修改并返回最终的分片 //查询、原始查询、目标表名以及可能出现的错误。 _, stQuery, _, err := pool.sharding.resolve(query, args...) if err != nil { return nil, err } // 省略 ...... return rows, err }
func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) { ftQuery = query stQuery = query if len(s.configs) == 0 { return } expr, err := sqlparser.NewParser(strings.NewReader(query)).ParseStatement() if err != nil { return ftQuery, stQuery, tableName, nil } // 省略 ...... tableName = table.Name.Name r, ok := s.configs[tableName] if !ok { return } // 省略 ...... return }
- 最终,调用
- 返回结果:
- 执行查询后,将结果填充到目标对象
&orders
中,并返回查询结果或错误。
- 执行查询后,将结果填充到目标对象
我们重点关注resolve方法,这个方法包含了分表逻辑的处理逻辑:r, ok := s.configs[tableName]获取对应表的分表策略。
通过上述代码的解析,我们现在应该有了解决方案。原来的逻辑获取分表策略是根据表明获取的。那我们只要修改这个逻辑,根据表名+分表键名作为唯一键获取对应的分表策略就能实现我们的目标。
方案
接下来,我们需要思考的是,如何把分表键传进来呢?
我一开始想的是通过解析query获取查询条件中的分表键。但是,当我深入的看了这个逻辑之后,发现这个设想不能实现,因为value, id, keyFind, err = s.nonInsertValue(r.ShardingKey, condition, args...)这个方法中获取查询条件的字段是在这个函数内部实现的,不能保持一个统一的结构,而且改造复杂度比较高。
context在go语言有着广泛的使用场景,所以,我想着通过context的方式把分表键传递进来。有了这个想法,改造起来就很简单了。我们只需要resolve方法增加一个context的传参,并且r, ok := s.configs[tableName]这个获取分表策略,改成用表名+从context中获取的分表键作为键来获取分表策略即可。
如此,我们就实现了根据表名+分表键获取对应分表策略的逻辑,至此,我们的改造任务完成。
案例
我目前也只是简单的测试了两种分表策略的场景,仅仅只覆盖了查询和插入的场景。更复杂的场景还没有测试。诸如并发情况下的场景。
package test
import (
"context"
"fmt"
"testing"
"time"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/sharding"
)
var globalDB *gorm.DB
type Order struct {
ID int64 `gorm:"primaryKey"`
OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键
UserID int64 `gorm:"sharding:user_id"`
ProductID int64
OrderDate time.Time
OrderYear int
}
// 自定义 ShardingAlgorithm
func customShardingAlgorithm4(value any) (suffix string, err error) {
if year, ok := value.(int); ok {
return fmt.Sprintf("_%d", year), nil
}
return "", fmt.Errorf("invalid order_date")
}
func customShardingAlgorithmUserId(value any) (suffix string, err error) {
if userId, ok := value.(int64); ok {
return fmt.Sprintf("_%d", userId%4), nil
}
return "", fmt.Errorf("invalid user_id")
}
// customePrimaryKeyGeneratorFn 自定义主键生成函数
func customePrimaryKeyGeneratorFn(tableIdx int64) int64 {
var id int64
seqTableName := "gorm_sharding_orders_id_seq" // 序列表名
db := globalDB
// 使用事务来确保主键生成的原子性
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
// 锁定序列表以确保并发安全(可选,取决于你的 MySQL 配置和并发级别)
// 注意:在某些 MySQL 版本和配置中,使用 LOCK TABLES 可能不是最佳选择
// 这里仅作为示例,实际应用中可能需要更精细的并发控制策略
tx.Exec("LOCK TABLES " + seqTableName + " WRITE")
// 查询当前的最大 ID
tx.Raw("SELECT id FROM " + seqTableName + " ORDER BY id DESC LIMIT 1").Scan(&id)
// 更新序列表(这里直接递增 1,实际应用中可能需要更复杂的逻辑)
newID := id + 1
tx.Exec("INSERT INTO "+seqTableName+" (id) VALUES (?)", newID) // 这里假设序列表允许插入任意 ID,实际应用中可能需要其他机制来确保 ID 的唯一性和连续性
// 释放锁定
tx.Exec("UNLOCK TABLES")
// 提交事务
if err := tx.Commit().Error; err != nil {
panic(err) // 实际应用中应该使用更优雅的错误处理机制
}
return newID
}
// Test_Gorm_Sharding 用于测试 Gorm Sharding 插件
func Test_Gorm_Sharding6(t *testing.T) {
// 连接到 MySQL 数据库
dsn := "dev:xxxx@tcp(ip:port)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.New(mysql.Config{
DSN: dsn,
}), &gorm.Config{})
if err != nil {
panic("failed to connect database")
}
globalDB = db
config1 := sharding.Config{
ShardingKey: "order_year",
ShardingAlgorithm: customShardingAlgorithm4, // 使用自定义的分片算法
//PrimaryKeyGenerator: sharding.PKMySQLSequence,
PrimaryKeyGenerator: sharding.PKCustom,
PrimaryKeyGeneratorFn: customePrimaryKeyGeneratorFn,
}
config2 := sharding.Config{
ShardingKey: "user_id",
NumberOfShards: 4,
ShardingAlgorithm: customShardingAlgorithmUserId, // 使用自定义的分片算法
PrimaryKeyGenerator: sharding.PKSnowflake, // 使用 Snowflake 算法生成主键
}
mapConfig := make(map[string]sharding.Config)
mapConfig["orders_order_year"] = config1
mapConfig["orders_user_id"] = config2
// 配置 Gorm Sharding 中间件,使用自定义的分片算法
middleware := sharding.RegisterWithKeys(mapConfig) // 逻辑表名为 "orders"
db.Use(middleware)
// 查询示例
var orders []Order
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = context.WithValue(ctx, "sharding_key", "order_year")
db = db.WithContext(ctx)
err = db.Model(&Order{}).Where("order_year=? and product_id=?", 2025, 102).Find(&orders).Error
if err != nil {
fmt.Println("Error querying orders:", err)
}
fmt.Printf("sharding key order_year Selected orders: %#v\n", orders)
// 查询示例
FindByUserID2(db, int64(1))
// 示例:插入订单数据
InsertOrderByUserId(db)
InsertOrderByOrderYear(db)
}
func FindByUserID2(db *gorm.DB, userID int64) ([]Order, error) {
var orders []Order
// 查询示例
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = context.WithValue(ctx, "sharding_key", "user_id")
db = db.WithContext(ctx)
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
if err != nil {
fmt.Println("Error querying orders:", err)
}
fmt.Printf("no sharding key user_id Selected orders: %#v\n", orders)
return orders, err
}
type OrderByUserId struct {
ID int64 `gorm:"primaryKey"`
OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键
UserID int64 `gorm:"sharding:user_id"`
ProductID int64
OrderDate time.Time
}
func InsertOrderByUserId(db *gorm.DB) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = context.WithValue(ctx, "sharding_key", "user_id")
db = db.WithContext(ctx)
// 示例:插入订单数据
order := OrderByUserId{
OrderId: "20240101ORDER0001",
UserID: 100,
ProductID: 100,
OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
}
err := db.Table("orders").Create(&order).Error
if err != nil {
fmt.Println("Error creating order:", err)
}
order2 := OrderByUserId{
OrderId: "20250101ORDER0001",
UserID: 105,
ProductID: 100,
OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
}
err = db.Table("orders").Create(&order2).Error
if err != nil {
fmt.Println("Error creating order:", err)
}
return err
}
func InsertOrderByOrderYear(db *gorm.DB) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = context.WithValue(ctx, "sharding_key", "order_year")
db = db.WithContext(ctx)
orderYear := 2024
// 示例:插入订单数据
order := Order{
OrderId: "20240101ORDER0002",
UserID: 1,
ProductID: 100,
OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
OrderYear: orderYear,
}
err := db.Create(&order).Error
if err != nil {
fmt.Println("Error creating order:", err)
}
orderYear = 2025
order2 := Order{
OrderId: "20250101ORDER0002",
UserID: 1,
ProductID: 100,
OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
OrderYear: orderYear,
}
err = db.Create(&order2).Error
if err != nil {
fmt.Println("Error creating order:", err)
}
return err
}
总结
通过改造gorm.io/sharding
组件,我们实现了根据表名+分表键获取对应分表策略的逻辑。这一改造使得组件能够支持单表多个分表策略,更加灵活和强大。目前,我们已经简单测试了查询和插入场景,更复杂的场景和并发情况还需进一步测试和优化。通过这一改造,我们为业务逻辑的执行提供了更加精准和高效的分表策略定位。