遇到一个很奇怪的异常,通过 JDBC batch insert 时,会报 `Unknown command(27)` 的异常。
![exception.png](https://upload-images.jianshu.io/upload_images/13187386-c3138cbb820d3f21.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
而且这个问题很容易复现,复现例子:
1. 建表语句
```SQL
create table t_selection_test (
a VARCHAR(96),
b VARCHAR(20),
c VARCHAR(96),
d DECIMAL(38, 10)
) DUPLICATE KEY (a)
DISTRIBUTED BY HASH(a) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
```
2. 写入代码
```java
Connection conn = DriverManager.getConnection(
"jdbc:mysql://127.0.0.1:9030/test?rewriteBatchedStatements=true",
"root", "");
String sql = "INSERT INTO t_select_test (a, b, c, d) VALUES(?, ?, ?, ?)";
PreparedStatement ps = conn.prepareStatement(sql);
for (int i = 0; i < 4; i++) {
ps.setString(1, "a");
ps.setString(2, "b");
ps.setString(3, "c");
ps.setBigDecimal(4, BigDecimal.TEN);
ps.addBatch();
}
ps.executeBatch();
```
最开始怀疑是 Doris 的问题,查看 Fe 源码发现,Mysql 解析的代码在 `org.apache.doris.qe.MysqlConnectProcessor#dispatch` 方法
```java
private void dispatch() throws IOException {
int code = packetBuf.get();
MysqlCommand command = MysqlCommand.fromCode(code);
if (command == null) {
ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown command(" + code + ")");
LOG.warn("Unknown command(" + code + ")");
return;
}
......
}
```
其中 `MysqlCommand` 中确实没有 code 为 27 的命令定义,查看 MySQL 的命令报文定义,发现 27(16 进制的 0x1B)为 `COM_SET_OPTION` 指令,用于打开或关闭多语句执行,具体定义如下:
只有两个枚举值
- MYSQL_OPTION_MULTI_STATEMENTS_ON - 1
- MYSQL_OPTION_MULTI_STATEMENTS_OFF - 0
这个时候就感觉可能是 mysql-connector-java 的问题,又试了几个版本的 connector 包,最终发现在 8.0.28 以及之前的版本会有这个问题,然后就开始了我们的 debug 之旅。
## 问题分析
版本众多,我们就只来看看问题分界的两个版本:8.0.28 和 8.0.29
### 8.0.28
首先我们通过测试代码中的 `ps.executeBatch()` 方法进入
调用的是实现类方法 `com.mysql.cj.jdbc.StatementImpl#executeBatch`
```java
public int[] executeBatch() throws SQLException {
return Util.truncateAndConvertToInt(executeBatchInternal());
}
```
进来直接调用 `executeBatchInternal` 方法,实际是调用的实现类方法 `com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchInternal`
```java
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
......
try {
......
if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
// line 425
if (getParseInfo().canRewriteAsMultiValueInsertAtSqlLevel()) {
return executeBatchedInserts(batchTimeout);
}
if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
&& this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
// line 431
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}
return executeBatchSerially(batchTimeout);
} finally {
this.query.getStatementExecuting().set(false);
clearBatch();
}
}
}
```
在 425 行 `canRewriteAsMultiValueInsertAtSqlLevel` 判断为 false,继续向下执行(这个地方很重要,我们后面分析)
然后会最终走到 431 行的 `executePreparedBatchAsMultiStatement` 方法中
```java
protected long[] executePreparedBatchAsMultiStatement(int batchTimeout) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
......
try {
......
try {
if (!multiQueriesEnabled) {
// line 494
((NativeSession) locallyScopedConn.getSession()).enableMultiQueries();
}
......
} finally {
if (batchedStatement != null) {
batchedStatement.close();
batchedStatement = null;
}
}
......
}
}
```
然后我们继续执行,直到执行 494 行的 `enableMultiQueries` 异常发生
然后我们再次 debug 进入到这个方法,看看它干了什么
```java
public void enableMultiQueries() {
sendCommand(this.commandBuilder.buildComSetOption(((NativeProtocol) this.protocol).getSharedSendPacket(), 0), false, 0);
((NativeServerSession) getServerSession()).preserveOldTransactionState();
}
```
诶,就是它了,`buildComSetOption` ,顾名思义,构建了 `COM_SET_OPTION` 命令并发送。
找到了罪魁祸首,我们就来看看为什么他要发送这个指令
回到 425 行的这个判断,为什么明明我们在 JDBC url 上面开启了 `rewriteBatchedStatements`,但是却没有执行呢?
我们点开 `canRewriteAsMultiValueInsertAtSqlLevel` 方法,发现只是简单的返回了 `ParseInfo` 的 `canRewriteAsMultiValueInsert` 属性,默认值为 `false`, 那我们看看这个属性有没有被赋值
```java
public boolean canRewriteAsMultiValueInsertAtSqlLevel() {
return this.canRewriteAsMultiValueInsert;
}
```
经过 Debug 发现,在`ClientPreparedStatement` 初始化时,调用 `com.mysql.cj.ParseInfo#ParseInfo(java.lang.String, com.mysql.cj.Session, java.lang.String)` 构造方法后再调用`com.mysql.cj.ParseInfo#ParseInfo(java.lang.String, com.mysql.cj.Session, java.lang.String, boolean)` 这个构造方法构建 `ParseInfo` 对象
```java
public ParseInfo(String sql, Session session, String encoding) {
this(sql, session, encoding, true);
}
public ParseInfo(String sql, Session session, String encoding, boolean buildRewriteInfo) {
......
// line 185
if (buildRewriteInfo) {
this.canRewriteAsMultiValueInsert = this.numberOfQueries == 1 && !this.parametersInDuplicateKeyClause
&& canRewrite(sql, this.isOnDuplicateKeyUpdate, this.locationOfOnDuplicateKeyUpdate, this.statementStartPos);
if (this.canRewriteAsMultiValueInsert && session.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue()) {
buildRewriteBatchedParams(sql, session, encoding);
}
}
}
```
然后我们继续向下看
在 185 行,由于`buildRewriteInfo` 为 `true`,进入到 if 语句中
`canRewriteAsMultiValueInsert` 为 3 个判断条件的的与值, 其中:
1. `this.numberOfQueries == 1` 判断是不是只有一个语句,这个当然为 `true`
2. `!this.parametersInDuplicateKeyClause` 判断参数不在 `ON DUPLICATE KEY` 的语法中,这个也为 `true`
3. `canRewrite` 这个方法我们跟进去看
```java
protected static boolean canRewrite(String sql, boolean isOnDuplicateKeyUpdate, int locationOfOnDuplicateKeyUpdate, int statementStartPos) {
// Needs to be INSERT or REPLACE.
// Can't have INSERT ... SELECT or INSERT ... ON DUPLICATE KEY UPDATE with an id=LAST_INSERT_ID(...).
if (StringUtils.startsWithIgnoreCaseAndWs(sql, "INSERT", statementStartPos)) {
// line 660
if (StringUtils.indexOfIgnoreCase(statementStartPos, sql, "SELECT", OPENING_MARKERS, CLOSING_MARKERS, SearchMode.__MRK_COM_MYM_HNT_WS) != -1) {
return false;
}
if (isOnDuplicateKeyUpdate) {
int updateClausePos = StringUtils.indexOfIgnoreCase(locationOfOnDuplicateKeyUpdate, sql, " UPDATE ");
if (updateClausePos != -1) {
return StringUtils.indexOfIgnoreCase(updateClausePos, sql, "LAST_INSERT_ID", OPENING_MARKERS, CLOSING_MARKERS,
SearchMode.__MRK_COM_MYM_HNT_WS) == -1;
}
}
return true;
}
return StringUtils.startsWithIgnoreCaseAndWs(sql, "REPLACE", statementStartPos)
&& StringUtils.indexOfIgnoreCase(statementStartPos, sql, "SELECT", OPENING_MARKERS, CLOSING_MARKERS, SearchMode.__MRK_COM_MYM_HNT_WS) == -1;
}
```
有注释提示,只有 INSERT 或 REPLACE 语句才能被重写,而 `INSERT SELECT` 和 `INSERT ON DUPLICATE KEY UPDATE` 语法不能被重写
这个 `SELECT` 是不是有点似曾相识? 我们接着往下走
直到走到 660 行这个判断,然后返回了 `false`
恍然大悟,这个地方判断 sql 是不是 `INSERT SELECT` 语句,因为我们表名中包含 select 字串,所以这个 `StringUtils.indexOfIgnoreCase` 方法返回值不是 -1
而 `SearchMode.__MRK_COM_MYM_HNT_WS` 配合第 4、5 两个参数,不匹配两个标记所引用的内容,这也就是为什么表名通过重音标号引用后没有出现异常
出现问题的地方找到了,我们再来看执行正常的版本是如何处理的
### 8.0.29
还是通过测试代码中的 `ps.executeBatch()` 方法进入
一样走到`com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchInternal`
```java
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
......
try {
......
if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
// line 408
if (getQueryInfo().isRewritableWithMultiValuesClause()) {
return executeBatchedInserts(batchTimeout);
}
if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
&& this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}
return executeBatchSerially(batchTimeout);
} finally {
this.query.getStatementExecuting().set(false);
clearBatch();
}
}
}
```
走到 408 行,`isRewritableWithMultiValuesClause` 判断为 `true`,调用 `executeBatchedInserts` 执行批量写入,正常结束返回
这个地方和 8.0.28 版本明显不一样了,我们来看看 `isRewritableWithMultiValuesClause` 是如何判断的
这块也是相似的直接返回了 `isRewritableWithMultiValuesClause` 成员变量的值
```java
public boolean isRewritableWithMultiValuesClause() {
return this.isRewritableWithMultiValuesClause;
}
```
debug 看到在构建 `ClientPreparedStatement` 时, 通过 `com.mysql.cj.QueryInfo#QueryInfo(java.lang.String, com.mysql.cj.Session, java.lang.String)` 构造方法构造 `QueryInfo` 对象时进行了赋值
```java
public QueryInfo(String sql, Session session, String encoding) {
......
// line 97
boolean rewriteBatchedStatements = session.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue();
......
// Skip comments at the beginning of queries.
this.queryStartPos = strInspector.indexOfNextAlphanumericChar();
if (this.queryStartPos == -1) {
this.queryStartPos = this.queryLength;
} else {
this.numberOfQueries = 1;
this.statementFirstChar = Character.toUpperCase(strInspector.getChar());
}
// Only INSERT and REPLACE statements support multi-values clause rewriting.
// line 127
boolean isInsert = strInspector.matchesIgnoreCase(INSERT_STATEMENT) != -1;
if (isInsert) {
strInspector.incrementPosition(INSERT_STATEMENT.length()); // Advance to the end of "INSERT".
}
boolean isReplace = !isInsert && strInspector.matchesIgnoreCase(REPLACE_STATEMENT) != -1;
if (isReplace) {
strInspector.incrementPosition(REPLACE_STATEMENT.length()); // Advance to the end of "REPLACE".
}
// Check if the statement has potential to be rewritten as a multi-values clause statement, i.e., if it is an INSERT or REPLACE statement and
// 'rewriteBatchedStatements' is enabled.
boolean rewritableAsMultiValues = (isInsert || isReplace) && rewriteBatchedStatements;
......
// complex grammar check
// line 271
this.isRewritableWithMultiValuesClause = rewritableAsMultiValues;
......
}
```
首先在 97 行,获取到 session 中设置的 `rewriteBatchedStatements` 属性
在 127 行,判断是不是 `INSERT` 或 `REPLACE` 语句,并且开启了 `rewriteBatchedStatements` 属性,
然后进行了一系列复杂的匹配,来判断是否为 `INSERT INTO VALUES` 这样的语法,并且带有 ? 占位符
最后在 271 行把判断结果赋值给 `isRewritableWithMultiValuesClause`
## 总结
所以,这个异常的问题就在于,判断是否可以重写 batch statement sql 的时候,对于 `INSERT INTO VALUES` 语法的判断上,8.0.28 以及更早的版本的逻辑不够严谨,8.0.29 上面进行了更加复杂的判断,从而避免了这样的错误
标签:insert,JDBC,return,String,INSERT,......,batch,&&,sql From: https://www.cnblogs.com/gnehil/p/18432142