首页 > 数据库 >MySQL AutoIncrement--PXC集群批量插入操作获取自增ID异常问题

MySQL AutoIncrement--PXC集群批量插入操作获取自增ID异常问题

时间:2023-04-05 12:00:09浏览次数:54  
标签:INSERT 自增 -- AutoIncrement beginAt 插入 byte ID

问题描述

由于MySQL PXC集群的所有节点均可读写,因此当PXC集群中节点增加和减少时,PXC集群会自动调整集群各节点的自增ID步长,避免不同集群节点生成相同自增ID值产生冲突。

当PXC集群中读节点数量发生变化时,客户端执行BatchInsert方法可能返回错误的自增ID值。

问题原因

由于MySQL数据库仅提供LAST_INSERT_ID()函数来获取最后插入记录的自增ID,如果批量插入多条记录时,则返回批量插入记录的第一个自增ID值,如:

# 获取MySQL服务器的自增步长
mysql> SHOW VARIABLES LIKE '%auto_increment%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| auto_increment_increment | 1     |
| auto_increment_offset    | 1     |
+--------------------------+-------+
2 rows in set (0.00 sec)
 
 
 
 
mysql> CREATE TABLE tb102(id INT AUTO_INCREMENT PRIMARY KEY ,c1 INT);
Query OK, 0 rows affected (0.00 sec)
 
mysql> INSERT INTO tb102(c1)VALUES(1);
Query OK, 1 row affected (0.00 sec)
 
mysql> INSERT INTO tb102(c1)VALUES(2);
Query OK, 1 row affected (0.01 sec)
 
# 获取上一条INSERT插入操作产生的自增ID。
mysql> SELECT LAST_INSERT_ID();
+------------------+
| LAST_INSERT_ID() |
+------------------+
|                2 |
+------------------+
1 row in set (0.00 sec)
 
 
mysql> INSERT INTO tb102(c1)VALUES(3),(4),(5);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0
 
# 获取上一条INSERT插入操作产生的自增ID。
# 如果一次INSERT插入多条记录,则返回第一个自增ID。
mysql> SELECT LAST_INSERT_ID();
+------------------+
| LAST_INSERT_ID() |
+------------------+
|                3 |
+------------------+
1 row in set (0.00 sec)
 
mysql> SELECT * FROM tb102;
+----+------+
| id | c1   |
+----+------+
|  1 |    1 |
|  2 |    2 |
|  3 |    3 |
|  4 |    4 |
|  5 |    5 |
+----+------+
5 rows in set (0.00 sec)

当应用程序使用BatchInsert方式批量插入多条记录且需要返回多条记录对应的自增ID时,客户端会:

执行LAST_INSERT_ID() 获取批量插入的第一个自增ID
按照批量插入操作的影响行数循环,在第一自增ID值上依次增加自增步长

当应用程序采用批量插入多条记录时,会通过executeBatch-->executeBatchInternal-->getBatchedGeneratedKeys-->getGeneratedKeysInternal来获取到批量插入记录的自增ID:

protected ResultSetInternalMethods getGeneratedKeysInternal(long numKeys) throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {
        String encoding = this.session.getServerSession().getCharsetSettings().getMetadataEncoding();
        int collationIndex = this.session.getServerSession().getCharsetSettings().getMetadataCollationIndex();
        Field[] fields = new Field[1];
        fields[0] = new Field("", "GENERATED_KEY", collationIndex, encoding, MysqlType.BIGINT_UNSIGNED, 20);
 
        ArrayList<Row> rowSet = new ArrayList<>();
 
        long beginAt = getLastInsertID();
 
        if (this.results != null) {
            String serverInfo = this.results.getServerInfo();
 
            //
            // Only parse server info messages for 'REPLACE' queries
            //
            if ((numKeys > 0) && (this.results.getFirstCharOfQuery() == 'R') && (serverInfo != null) && (serverInfo.length() > 0)) {
                numKeys = getRecordCountFromInfo(serverInfo);
            }
 
            if ((beginAt != 0 /* BIGINT UNSIGNED can wrap the protocol representation */) && (numKeys > 0)) {
                for (int i = 0; i < numKeys; i++) {
                    byte[][] row = new byte[1][];
                    if (beginAt > 0) {
                        row[0] = StringUtils.getBytes(Long.toString(beginAt));
                    } else {
                        byte[] asBytes = new byte[8];
                        asBytes[7] = (byte) (beginAt & 0xff);
                        asBytes[6] = (byte) (beginAt >>> 8);
                        asBytes[5] = (byte) (beginAt >>> 16);
                        asBytes[4] = (byte) (beginAt >>> 24);
                        asBytes[3] = (byte) (beginAt >>> 32);
                        asBytes[2] = (byte) (beginAt >>> 40);
                        asBytes[1] = (byte) (beginAt >>> 48);
                        asBytes[0] = (byte) (beginAt >>> 56);
 
                        BigInteger val = new BigInteger(1, asBytes);
 
                        row[0] = val.toString().getBytes();
                    }
                    rowSet.add(new ByteArrayRow(row, getExceptionInterceptor()));
                    beginAt += this.connection.getAutoIncrementIncrement();
                }
            }
        }
 
        ResultSetImpl gkRs = this.resultSetFactory.createFromResultsetRows(ResultSet.CONCUR_READ_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE,
                new ResultsetRowsStatic(rowSet, new DefaultColumnDefinition(fields)));
 
        return gkRs;
    }
}

客户端使用LAST_INSERT_ID()获取最后插入的自增列的值

/**
 * getLastInsertID returns the value of the auto_incremented key after an
 * executeQuery() or excute() call.
 *
 * <p>
 * This gets around the un-threadsafe behavior of "select LAST_INSERT_ID()" which is tied to the Connection that created this Statement, and therefore could
 * have had many INSERTS performed before one gets a chance to call "select LAST_INSERT_ID()".
 * </p>
 *
 * @return the last update ID.
 */
public long getLastInsertID() {
    synchronized (checkClosed().getConnectionMutex()) {
        return this.lastInsertId;
    }
}

客户端在连接初始化时获取MySQL服务器端的参数变量auto_increment_increment并缓存到本地重复使用:

@Override
public int getAutoIncrementIncrement() {
    return this.autoIncrementIncrement;
}
 
 
 /**
 * Sets varying properties that depend on server information. Called once we
 * have connected to the server.
 *
 * @throws SQLException
 *             if a database access error occurs
 */
private void initializePropsFromServer() throws SQLException {
    this.autoIncrementIncrement = this.session.getServerSession().getServerVariable("auto_increment_increment", 1);
}

当MySQL服务器端自增步长随PXC节点变化而变化时,MySQL客户端仍使用缓存的自增步长(auto_increment_increment),则会导致MySQL客户端计算得到的自增值与MySQL服务器端实际产生的自增值不同。

优化建议

由于PXC节点数量变化无法控制(如硬件故障导致节点强制下线),因此无法保证PXC节点长期保持相同自增步长。

即使使用发布订阅机制在PXC集群节点发生变化时通知客户端重连并刷新本地缓存的自增步长,在PXC集群节点变化到MySQL客户端重连期间仍会存在该问题。

因此建议:

  • 如果业务不依赖BatchInsert方法返回的批量自增列值时,可继续使用BatchInsert方法。
  • 如果业务严重依赖BatchInsert方法返回的批量自增列值时,将批量插入操作改为多次单条插入操作,在每次单条记录插入后获取自增列值,并将多次插入插入封装到一个事务中。该方式与批量插入操作相比会增加多次请求的网络延时,可通过多并发方式来解决。

标签:INSERT,自增,--,AutoIncrement,beginAt,插入,byte,ID
From: https://www.cnblogs.com/gaogao67/p/17289087.html

相关文章

  • 简析反序列化漏洞
    反序列化漏洞反序列化漏洞一、漏洞原理相关概念什么是序列化与反序列化?漏洞成因常见魔术方法总结二、漏洞危害三、漏洞出现场景四、检测方法五、防御六、漏洞复现一、漏洞原理相关概念什么是序列化与反序列化?序列化:把对象的状态信息转换为可以存储或传输的形式的过程,一般......
  • js高级
    判断:typeof判断null、object、array的结果均是object,即判断返回均是对象类型,所以typeof是不能判断出null、object与array这三种数据类型的instanceof可以判断检测对象的具体类型,注意使用方法:要检测的对象 instanceof 相应对象类型 ,若要检测对象属于相应的对象类型,则......
  • linux 查看内存使用情况命令
    查看所有盘符的使用情况:df-h查看各个用户使用的存储空间大小:du-sh/home/*查看当前目录总共占的容量,而不单独列出各个子项占用的容量:du-sh查看当前目录下一级子文件和子目录所占用的磁盘容量:du-lh--max-depth=1统计当前文件夹|目录大小,并按文件大小排序:du-sh*|sort......
  • js dom 类型判断
    Node对象中的nodeName获取指定节点的节点名称(返回的是大写字母表示的)Node对象中的nodeType获取指定节点的节点类型元素节点属性节点文本节点123Node对象中的nodeValue获取指定节点的值详情见官网:https://developer.mozilla.org/......
  • 计网学习笔记七 IP protocol basic
    在这一节讲了IP协议的基本内容:包括IPv4提供的操作、数据报在IPv4下是怎么样的结构、数据报是怎样切片发送的、IPv4的编址方式有什么……IPv6在下一节讲网络层协议簇时细讲。IPv4协议的具体定义:RFC791Internet的地址分类按地址层级:物理上的网络地址:区分物理接口,同一个......
  • Leet Code 69. x 的平方根
    classSolution{public:intmySqrt(intx){longa=x;while(a*a>x){a=(a+x/a)/2;}returna;}};......
  • 科技大数据:如何利用科普信息来更好地理解技术
    科技在不断的发展,我们的生活方式也在不断地改变。从最初的人工智能到现在的云计算、大数据等,科技的发展已经成为了我们日常生活中不可或缺的一部分。然而,对于大多数人来说,这些新兴技术可能是非常难以理解的。因此,科普信息的传播和普及变得越来越重要,这不仅可以让人们更好地了解和......
  • 为什么我推荐你使用 systemd timer 替代 cronjob?
    概述前几天在使用Terraform+cloud-init批量初始化我的实验室Linux机器。正好发现有一些定时场景需要使用到cronjob,进一步了解到systemdtimer完全可以替换cronjob,并且systemdtimer有一些非常有趣的功能。回归话题:为什么我推荐你使用systemdtimer替代cronjo......
  • 用nvm管理nodejs版本
    1 nvmgithub下载地址:https://github.com/coreybutler/nvm-windows/releases,2 输入nvm-v或nvmv检查是否安装成功3.nvm的使用nvmoff//禁用node.js版本管理(不卸载任何东西)nvmon//启用node.js版本管理nvminstall<version......
  • 无穷大与无穷小
    M表示要多大有多大的数,一批西冷表示要多小有多小的数无穷小      无穷小性质性质一        性质二  性质三(重要)       无穷大  例题    例二  无穷小与无穷大的关系     ......