问题描述
由于MySQL PXC集群的所有节点均可读写,因此当PXC集群中节点增加和减少时,PXC集群会自动调整集群各节点的自增ID步长,避免不同集群节点生成相同自增ID值产生冲突。
当PXC集群中读节点数量发生变化时,客户端执行BatchInsert方法可能返回错误的自增ID值。
问题原因
由于MySQL数据库仅提供LAST_INSERT_ID()函数来获取最后插入记录的自增ID,如果批量插入多条记录时,则返回批量插入记录的第一个自增ID值,如:
# 获取MySQL服务器的自增步长
mysql> SHOW VARIABLES LIKE '%auto_increment%';
+--------------------------+-------+
| Variable_name | Value |
+--------------------------+-------+
| auto_increment_increment | 1 |
| auto_increment_offset | 1 |
+--------------------------+-------+
2 rows in set (0.00 sec)
mysql> CREATE TABLE tb102(id INT AUTO_INCREMENT PRIMARY KEY ,c1 INT);
Query OK, 0 rows affected (0.00 sec)
mysql> INSERT INTO tb102(c1)VALUES(1);
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO tb102(c1)VALUES(2);
Query OK, 1 row affected (0.01 sec)
# 获取上一条INSERT插入操作产生的自增ID。
mysql> SELECT LAST_INSERT_ID();
+------------------+
| LAST_INSERT_ID() |
+------------------+
| 2 |
+------------------+
1 row in set (0.00 sec)
mysql> INSERT INTO tb102(c1)VALUES(3),(4),(5);
Query OK, 3 rows affected (0.00 sec)
Records: 3 Duplicates: 0 Warnings: 0
# 获取上一条INSERT插入操作产生的自增ID。
# 如果一次INSERT插入多条记录,则返回第一个自增ID。
mysql> SELECT LAST_INSERT_ID();
+------------------+
| LAST_INSERT_ID() |
+------------------+
| 3 |
+------------------+
1 row in set (0.00 sec)
mysql> SELECT * FROM tb102;
+----+------+
| id | c1 |
+----+------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
| 5 | 5 |
+----+------+
5 rows in set (0.00 sec)
当应用程序使用BatchInsert方式批量插入多条记录且需要返回多条记录对应的自增ID时,客户端会:
执行LAST_INSERT_ID() 获取批量插入的第一个自增ID
按照批量插入操作的影响行数循环,在第一自增ID值上依次增加自增步长
当应用程序采用批量插入多条记录时,会通过executeBatch-->executeBatchInternal-->getBatchedGeneratedKeys-->getGeneratedKeysInternal来获取到批量插入记录的自增ID:
protected ResultSetInternalMethods getGeneratedKeysInternal(long numKeys) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
String encoding = this.session.getServerSession().getCharsetSettings().getMetadataEncoding();
int collationIndex = this.session.getServerSession().getCharsetSettings().getMetadataCollationIndex();
Field[] fields = new Field[1];
fields[0] = new Field("", "GENERATED_KEY", collationIndex, encoding, MysqlType.BIGINT_UNSIGNED, 20);
ArrayList<Row> rowSet = new ArrayList<>();
long beginAt = getLastInsertID();
if (this.results != null) {
String serverInfo = this.results.getServerInfo();
//
// Only parse server info messages for 'REPLACE' queries
//
if ((numKeys > 0) && (this.results.getFirstCharOfQuery() == 'R') && (serverInfo != null) && (serverInfo.length() > 0)) {
numKeys = getRecordCountFromInfo(serverInfo);
}
if ((beginAt != 0 /* BIGINT UNSIGNED can wrap the protocol representation */) && (numKeys > 0)) {
for (int i = 0; i < numKeys; i++) {
byte[][] row = new byte[1][];
if (beginAt > 0) {
row[0] = StringUtils.getBytes(Long.toString(beginAt));
} else {
byte[] asBytes = new byte[8];
asBytes[7] = (byte) (beginAt & 0xff);
asBytes[6] = (byte) (beginAt >>> 8);
asBytes[5] = (byte) (beginAt >>> 16);
asBytes[4] = (byte) (beginAt >>> 24);
asBytes[3] = (byte) (beginAt >>> 32);
asBytes[2] = (byte) (beginAt >>> 40);
asBytes[1] = (byte) (beginAt >>> 48);
asBytes[0] = (byte) (beginAt >>> 56);
BigInteger val = new BigInteger(1, asBytes);
row[0] = val.toString().getBytes();
}
rowSet.add(new ByteArrayRow(row, getExceptionInterceptor()));
beginAt += this.connection.getAutoIncrementIncrement();
}
}
}
ResultSetImpl gkRs = this.resultSetFactory.createFromResultsetRows(ResultSet.CONCUR_READ_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE,
new ResultsetRowsStatic(rowSet, new DefaultColumnDefinition(fields)));
return gkRs;
}
}
客户端使用LAST_INSERT_ID()获取最后插入的自增列的值
/**
* getLastInsertID returns the value of the auto_incremented key after an
* executeQuery() or excute() call.
*
* <p>
* This gets around the un-threadsafe behavior of "select LAST_INSERT_ID()" which is tied to the Connection that created this Statement, and therefore could
* have had many INSERTS performed before one gets a chance to call "select LAST_INSERT_ID()".
* </p>
*
* @return the last update ID.
*/
public long getLastInsertID() {
synchronized (checkClosed().getConnectionMutex()) {
return this.lastInsertId;
}
}
客户端在连接初始化时获取MySQL服务器端的参数变量auto_increment_increment并缓存到本地重复使用:
@Override
public int getAutoIncrementIncrement() {
return this.autoIncrementIncrement;
}
/**
* Sets varying properties that depend on server information. Called once we
* have connected to the server.
*
* @throws SQLException
* if a database access error occurs
*/
private void initializePropsFromServer() throws SQLException {
this.autoIncrementIncrement = this.session.getServerSession().getServerVariable("auto_increment_increment", 1);
}
当MySQL服务器端自增步长随PXC节点变化而变化时,MySQL客户端仍使用缓存的自增步长(auto_increment_increment),则会导致MySQL客户端计算得到的自增值与MySQL服务器端实际产生的自增值不同。
优化建议
由于PXC节点数量变化无法控制(如硬件故障导致节点强制下线),因此无法保证PXC节点长期保持相同自增步长。
即使使用发布订阅机制在PXC集群节点发生变化时通知客户端重连并刷新本地缓存的自增步长,在PXC集群节点变化到MySQL客户端重连期间仍会存在该问题。
因此建议:
- 如果业务不依赖BatchInsert方法返回的批量自增列值时,可继续使用BatchInsert方法。
- 如果业务严重依赖BatchInsert方法返回的批量自增列值时,将批量插入操作改为多次单条插入操作,在每次单条记录插入后获取自增列值,并将多次插入插入封装到一个事务中。该方式与批量插入操作相比会增加多次请求的网络延时,可通过多并发方式来解决。