详解java中的对象池池化功能
背景
在我们实际开发中,存在一些对象,其创建和初始化所需资源较大,比如jdbc连接、redis连接等,然而在使用这些对象时,若不采用一定的优化技术,在服务在高并发情况下会造成服务性能瓶颈。笔者之前在工作中接手的一个RTB系统,由于该系统在操作redis时每次都需创建一个redis连接,导致在服务的QPS达到一定的量级后,通过观察redis服务的连接监控,发现存在大量的新建连接数,增加redis服务端的CPU负载,造成redis查询耗时增加,笔者对redis连接进行池化优化处理,通过一个指定的数量连接池管理redis连接,当每次需操作redis是,则从该连接池中获取一个空闲的redis连接,待操作完成后再归还该连接,期间无需重新建立/断开redis连接,减少了不必要的网络连接和端口的消耗。
commons-pool2 池化功能
笔者使用的apache开源的一款优秀的对象池管理组件apache-commons-pool2,commons-pool2中主要的核心类主要有:GenericObjectPool、PooledObjectFactory、GenericObjectPoolConfig等。
- GenericObjectPool类实现了对象池的管理,在实际使用中,需要指定GenericObjectPoolConfig类和PoolObjectFactory接口实现类,其核心方法有:borrowObject和returnObject,borrowObject方法用于从连接池中获取一个池化对象,returnObject方法用于归还池化对象,其源码如下:
```java
/* @param borrowMaxWaitMillis The time to wait in milliseconds for an object
* to become available
*
* @return object instance from the pool
*
* @throws NoSuchElementException if an instance cannot be returned
*
* @throws Exception if an object instance cannot be returned due to an
* error
*/
public T borrowObject(final long borrowMaxWaitMillis) throws Exception
/**
* {@inheritDoc}
* <p>
* If {@link #getMaxIdle() maxIdle} is set to a positive value and the
* number of idle instances has reached this value, the returning instance
* is destroyed.
* </p>
* <p>
* If {@link #getTestOnReturn() testOnReturn} == true, the returning
* instance is validated before being returned to the idle instance pool. In
* this case, if validation fails, the instance is destroyed.
* </p>
* <p>
* Exceptions encountered destroying objects for any reason are swallowed
* but notified via a {@link SwallowedExceptionListener}.
* </p>
*/
@Override
public void returnObject(final T obj)
```
- PoolObjectFactory接口用于使用者实现该接口创建自定义的PoolObject对象,该工厂接口用于使用者根据具体业务创建和管理池化对戏,其源码如下
```java
public interface PooledObjectFactory<T> {
/**
* Creates an instance that can be served by the pool and wrap it in a
* {@link PooledObject} to be managed by the pool.
*
* @return a {@code PooledObject} wrapping an instance that can be served by the pool
*
* @throws Exception if there is a problem creating a new instance,
* this will be propagated to the code requesting an object.
*/
PooledObject<T> makeObject() throws Exception;
/**
* Destroys an instance no longer needed by the pool.
* <p>
* It is important for implementations of this method to be aware that there
* is no guarantee about what state <code>obj</code> will be in and the
* implementation should be prepared to handle unexpected errors.
* </p>
* <p>
* Also, an implementation must take in to consideration that instances lost
* to the garbage collector may never be destroyed.
* </p>
*
* @param p a {@code PooledObject} wrapping the instance to be destroyed
*
* @throws Exception should be avoided as it may be swallowed by
* the pool implementation.
*
* @see #validateObject
* @see ObjectPool#invalidateObject
*/
void destroyObject(PooledObject<T> p) throws Exception;
/**
* Ensures that the instance is safe to be returned by the pool.
*
* @param p a {@code PooledObject} wrapping the instance to be validated
*
* @return <code>false</code> if <code>obj</code> is not valid and should
* be dropped from the pool, <code>true</code> otherwise.
*/
boolean validateObject(PooledObject<T> p);
/**
* Reinitializes an instance to be returned by the pool.
*
* @param p a {@code PooledObject} wrapping the instance to be activated
*
* @throws Exception if there is a problem activating <code>obj</code>,
* this exception may be swallowed by the pool.
*
* @see #destroyObject
*/
void activateObject(PooledObject<T> p) throws Exception;
/**
* Uninitializes an instance to be returned to the idle object pool.
*
* @param p a {@code PooledObject} wrapping the instance to be passivated
*
* @throws Exception if there is a problem passivating <code>obj</code>,
* this exception may be swallowed by the pool.
*
* @see #destroyObject
*/
void passivateObject(PooledObject<T> p) throws Exception;
}
```
- makeObject方法用于创建一个新的池化对象,在实际工作可通过继承PooledObjectFactory的默认实现类BasePooledObjectFactory并实现其create()方法创建自定义的池化对象
- destoryObject方法用于销毁一个池化对象,当对象池检测到某个对象的空闲时间超出设置的最大空闲时间maxIdle,或使用后归还对象时检测到该池化对象已被破坏(校验无效),因此需调用该方法销毁池化对象,释放资源
- validateObject方法用于检测对象是否有效,在对象池中的对象一定是有效的,比如从对象池中获取的redis client对象保证一定是连接可用
- activeObject方法用于**对象,比如对象是socket连接,若socket没有连接,则可通过该方法创建连接
2. GenericObjectPoolConfig类用于对象池参数配置,主要有
maxTotal:对象池大小
maxIdle:对象池中的最大空闲时间
minIdle:对象池中的最小空闲时间
- demo演示
本文基于常用的redis客户端连接jedis实现一个连接池demo演示,其中jedis、commons-pool2的maven依赖如下
```
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
```
demo演示代码结构如下:

- JedisConnebtPoolFactory是基于BasePooledObjectFactory的子类,用于创建连接池池化对象,代码如下:
```java
/**
* @Description: jedis connection pool factory class
* @Author: ruands
* @Date: 2020-04-29 22:44:54
*/
public class JedisConnectPoolFactory extends BasePooledObjectFactory<Jedis> {
private JedisConnectPropeties propeties;
private JedisConnect jedisConnect;
public JedisConnectPoolFactory(JedisConnectPropeties propeties) {
this.propeties = propeties;
}
/**
* create a jedis instance
*
* @return
* @throws Exception
*/
@Override
public Jedis create() throws Exception {
this.jedisConnect = new JedisConnectImpl();
return jedisConnect.create(propeties);
}
/**
* wrap jedis object as a pool object
*
* @param obj
* @return
*/
@Override
public PooledObject<Jedis> wrap(Jedis obj) {
return new DefaultPooledObject<>(obj);
}
/**
* destroy jedis object
*
* @param p
*/
@Override
public void destroyObject(PooledObject<Jedis> p){
if (null != this.jedisConnect) {
this.jedisConnect.close();
}
if (null != p){
p.deallocate();
}
}
}
```
- JedisPoolConfig是jedis连接池的配置类,代码如下
```java
/**
* @Description: jedis pool config
* @Author: ruands
* @Date: 2020-04-29 23:05:26
*/
public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig(){
setTestWhileIdle(true);
setMinEvictableIdleTimeMillis(60000);
setTimeBetweenEvictionRunsMillis(30000);
setNumTestsPerEvictionRun(-1);
setMaxTotal(5);
}
}
```
- JedisPoolTest是连接池测试了,代码如下
```java
/**
* @Description: jedis pool test
* @Author: ruands
* @Date: 2020-04-29 23:18:05
*/
public class JedisPoolTest {
@Test
public void test() throws Exception {
AtomicLong index = new AtomicLong();
JedisConnectPropeties propeties = new JedisConnectPropeties();
propeties.setIp("192.168.10.215");
propeties.setPort("6505");
JedisConnectPoolFactory jedisConnectPoolFactory = new JedisConnectPoolFactory(propeties);
GenericObjectPool<Jedis> objectPool = new GenericObjectPool<>(jedisConnectPoolFactory, new JedisPoolConfig());
ExecutorService executorService = Executors.newFixedThreadPool(20, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("thread-name-" + index.incrementAndGet());
thread.setDaemon(true);
return thread;
}
});
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 100; i++) {
executorService.submit(new TestTask(objectPool, String.valueOf(i), String.valueOf(i), latch));
}
latch.await();
System.err.println("finish");
}
private class TestTask implements Runnable {
private GenericObjectPool<Jedis> pool;
private String key;
private String value;
private CountDownLatch latch;
public TestTask(GenericObjectPool<Jedis> pool, String key, String value, CountDownLatch latch) {
this.pool = pool;
this.key = key;
this.value = value;
this.latch = latch;
}
@Override
public void run() {
Jedis jedis = null;
try {
jedis = pool.borrowObject();
jedis.setex(key, 600, value);
System.err.println("thread name: " + Thread.currentThread().getName() + ",jedis.name:" + jedis.toString() +
",key: " + key + ",value:" + jedis.get(key));
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != jedis) {
pool.returnObject(jedis);
}
latch.countDown();
}
}
}
}
```
执行结果如下
连接池配置的最大连接数MaxTotal为5,因此创建了5个jedis连接,同时用10个线程的线程池循环执行TestTask执行20次,在并发执行过程中,通过复用连接池中的5个jedis连接,避免了多次建立jedis连接