记一次使用 org.apache.commons.pool2.impl.GenericObjectPool 过程中遇到的问题
背景:
因为一个古老的非Spring项目需要使用RabbitMq,些时需要自己封装客户端来使用。
要求:
- 不使用Spring框架
- 尽量不浪费资源,更少的connection和channel
- 全局connection最多只能有 3 个
- 每个connetion关联的 channel 最多 1000 个
方案:
使用org.apache.commons.pool2.impl.GenericObjectPool对connection和channel对象做池化管理
现象:
超过一点时间后,就会报如下错误,连接被从客户端主动关闭,消费者丢失。
com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:988) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:978) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:916) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:871) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:268) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:185) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:117) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:722) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.14.3.jar:5.14.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:669) ~[amqp-client-5.14.3.jar:5.14.3]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
[ERROR] 2024-04-09 08:58:08,244 org.jinko.util.rabbitmq.pool.RabbitConnectionPool - connection State EVICTION BorrowedCount 1 ErrorTime PT28.119S
关键点:
-
从connection中获取 channel 的代码
@Override public Channel create() throws Exception { Connection connection = connectionPool.borrowObject(); connectionPool.returnObject(connection); return connection.createChannel(); }
-
连接池配置
## connection ??? connect_pool_max_idle=3 connect_pool_min_idle=0 connect_pool_max_total=3 connect_pool_test_on_borrow=true connect_pool_test_on_return=true connect_pool_test_while_idle=true connect_pool_time_between_eviction_runs_millis=30000 ## Channel ??? channel_pool_max_idle=100 channel_pool_min_idle=0 channel_pool_max_total=1000 channel_pool_test_on_borrow=true channel_pool_test_on_return=true channel_pool_test_while_idle=true channel_pool_time_between_eviction_runs_millis=30000
问题:
-
获取channel的代码,在获取到 channel 后,connection 对象就会被返回给 connectionPool ,此时这个对象就是处于闲置状态,当达到一定的条件后,这个对象就会被回收。
但是如果不返回这个这个 connection ,那这个对象就只能被使用一次,也就是一个 connection 只能获取一个 channel ,所以这个 connection 对象就必须返回给 connectionPool 。
问题分析:
-
池内对象默认回收条件
public class DefaultEvictionPolicy<T> implements EvictionPolicy<T> { @Override public boolean evict(final EvictionConfig config, final PooledObject<T> underTest, final int idleCount) { if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() && config.getMinIdle() < idleCount) || config.getIdleEvictTime() < underTest.getIdleTimeMillis()) { return true; } return false; } } // underTest.getIdleTimeMillis() 对象闲置的时间, public long getIdleTimeMillis() { final long elapsed = System.currentTimeMillis() - lastReturnTime; return elapsed >= 0 ? elapsed : 0; } // config.getIdleSoftEvictTime() 配置的软驱逐时间,这个时间的默认值为 -1 ,但是在实际的逻辑中,因为值小于 0 ,会转换为 Long.MAX_VALUE // config.getMinIdle() 配置的最小闲置对象数量 idleCount 当前队列(闲置)的对象数量 // config.getIdleEvictTime() 配置的强制驱逐时间,默认值为 1000L * 60L * 30L // 条件分析 // A config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() 如果使用默认配置,这个基本上一直为 false // B config.getMinIdle() < idleCount 从上面的配置中可以知道,这个很容易变为 true // C config.getIdleEvictTime() < underTest.getIdleTimeMillis() 当超过 1800000 毫秒,这个connection没有获取过 channel ,这个条件就会为 true // 当 connection 对象闲置超过 1800000 毫秒时, (A&&B)||C = true 这个 connection 对象就会被强制回收
问题解决:
-
把参数调整为 (Long.MAX_VALUE) 这样基本不会出现强制回收,但是不严谨
poolConfig.setMinEvictableIdleTimeMillis(Long.MAX_VALUE);
-
自己实现回收逻辑
poolConfig.setEvictionPolicy(new EvictionPolicy<Connection>() { @Override public boolean evict(EvictionConfig config, PooledObject<Connection> underTest, int idleCount) { if (underTest.getObject().isOpen()) { /* 不主动回收任何正常连接对象 */ CONNECTIONS_CLOSE_TIME.put(underTest, Instant.now()); return false; } Duration between = Duration.between(CONNECTIONS_CLOSE_TIME.get(underTest), Instant.now()); if (between.compareTo(TIME) > 0) { log.error("connection State {} BorrowedCount {} ErrorTime {}", underTest.getState(), underTest.getBorrowedCount(), between); /* 只有超过一定的时间,这个 connection 一直处于关闭状态,这时候才会回收这个 connection 对象 */ return true; } return false; } });
总结:
这个connection 对象是需要持久化的对象,但是又不能被某一个线程一直占用(不符合池化的理念),正常来说池内的对象达到一定条件后需要被回收,默认的回收逻辑跟这个业务场景又不契合,所以需要自己来做一些修改。
-