详解java中的对象池池化功能

 背景


在我们实际开发中,存在一些对象,其创建和初始化所需资源较大,比如jdbc连接、redis连接等,然而在使用这些对象时,若不采用一定的优化技术,在服务在高并发情况下会造成服务性能瓶颈。笔者之前在工作中接手的一个RTB系统,由于该系统在操作redis时每次都需创建一个redis连接,导致在服务的QPS达到一定的量级后,通过观察redis服务的连接监控,发现存在大量的新建连接数,增加redis服务端的CPU负载,造成redis查询耗时增加,笔者对redis连接进行池化优化处理,通过一个指定的数量连接池管理redis连接,当每次需操作redis是,则从该连接池中获取一个空闲的redis连接,待操作完成后再归还该连接,期间无需重新建立/断开redis连接,减少了不必要的网络连接和端口的消耗。

 commons-pool2 池化功能

笔者使用的apache开源的一款优秀的对象池管理组件apache-commons-pool2,commons-pool2中主要的核心类主要有:GenericObjectPool、PooledObjectFactory、GenericObjectPoolConfig等。
- GenericObjectPool类实现了对象池的管理,在实际使用中,需要指定GenericObjectPoolConfig类和PoolObjectFactory接口实现类,其核心方法有:borrowObject和returnObject,borrowObject方法用于从连接池中获取一个池化对象,returnObject方法用于归还池化对象,其源码如下:

```java
  /* @param borrowMaxWaitMillis The time to wait in milliseconds for an object
     *                            to become available
     *
     * @return object instance from the pool
     *
     * @throws NoSuchElementException if an instance cannot be returned
     *
     * @throws Exception if an object instance cannot be returned due to an
     *                   error
     */
    public T borrowObject(final long borrowMaxWaitMillis) throws Exception
    
     /**
     * {@inheritDoc}
     * <p>
     * If {@link #getMaxIdle() maxIdle} is set to a positive value and the
     * number of idle instances has reached this value, the returning instance
     * is destroyed.
     * </p>
     * <p>
     * If {@link #getTestOnReturn() testOnReturn} == true, the returning
     * instance is validated before being returned to the idle instance pool. In
     * this case, if validation fails, the instance is destroyed.
     * </p>
     * <p>
     * Exceptions encountered destroying objects for any reason are swallowed
     * but notified via a {@link SwallowedExceptionListener}.
     * </p>
     */
    @Override
    public void returnObject(final T obj)

```

  1.  PoolObjectFactory接口用于使用者实现该接口创建自定义的PoolObject对象,该工厂接口用于使用者根据具体业务创建和管理池化对戏,其源码如下

```java
public interface PooledObjectFactory<T> {

  /**
   * Creates an instance that can be served by the pool and wrap it in a
   * {@link PooledObject} to be managed by the pool.
   *
   * @return a {@code PooledObject} wrapping an instance that can be served by the pool
   *
   * @throws Exception if there is a problem creating a new instance,
   *    this will be propagated to the code requesting an object.
   */
  PooledObject<T> makeObject() throws Exception;

  /**
   * Destroys an instance no longer needed by the pool.
   * <p>
   * It is important for implementations of this method to be aware that there
   * is no guarantee about what state <code>obj</code> will be in and the
   * implementation should be prepared to handle unexpected errors.
   * </p>
   * <p>
   * Also, an implementation must take in to consideration that instances lost
   * to the garbage collector may never be destroyed.
   * </p>
   *
   * @param p a {@code PooledObject} wrapping the instance to be destroyed
   *
   * @throws Exception should be avoided as it may be swallowed by
   *    the pool implementation.
   *
   * @see #validateObject
   * @see ObjectPool#invalidateObject
   */
  void destroyObject(PooledObject<T> p) throws Exception;

  /**
   * Ensures that the instance is safe to be returned by the pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be validated
   *
   * @return <code>false</code> if <code>obj</code> is not valid and should
   *         be dropped from the pool, <code>true</code> otherwise.
   */
  boolean validateObject(PooledObject<T> p);

  /**
   * Reinitializes an instance to be returned by the pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be activated
   *
   * @throws Exception if there is a problem activating <code>obj</code>,
   *    this exception may be swallowed by the pool.
   *
   * @see #destroyObject
   */
  void activateObject(PooledObject<T> p) throws Exception;

  /**
   * Uninitializes an instance to be returned to the idle object pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be passivated
   *
   * @throws Exception if there is a problem passivating <code>obj</code>,
   *    this exception may be swallowed by the pool.
   *
   * @see #destroyObject
   */
  void passivateObject(PooledObject<T> p) throws Exception;
}

```

  •  makeObject方法用于创建一个新的池化对象,在实际工作可通过继承PooledObjectFactory的默认实现类BasePooledObjectFactory并实现其create()方法创建自定义的池化对象
  •  destoryObject方法用于销毁一个池化对象,当对象池检测到某个对象的空闲时间超出设置的最大空闲时间maxIdle,或使用后归还对象时检测到该池化对象已被破坏(校验无效),因此需调用该方法销毁池化对象,释放资源
  • validateObject方法用于检测对象是否有效,在对象池中的对象一定是有效的,比如从对象池中获取的redis client对象保证一定是连接可用
  •  activeObject方法用于**对象,比如对象是socket连接,若socket没有连接,则可通过该方法创建连接

 2.  GenericObjectPoolConfig类用于对象池参数配置,主要有
maxTotal:对象池大小
maxIdle:对象池中的最大空闲时间
minIdle:对象池中的最小空闲时间

  •  demo演示

本文基于常用的redis客户端连接jedis实现一个连接池demo演示,其中jedis、commons-pool2的maven依赖如下

```
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.8.0</version>
        </dependency>
        
```
demo演示代码结构如下:
![commons-pool2-demo](http://note.youdao.com/yws/res/5226/E364512011C340C388AEACED2FD0A13D)

- JedisConnebtPoolFactory是基于BasePooledObjectFactory的子类,用于创建连接池池化对象,代码如下:
```java


/**
 * @Description: jedis connection pool factory class
 * @Author: ruands
 * @Date: 2020-04-29 22:44:54
 */
public class JedisConnectPoolFactory extends BasePooledObjectFactory<Jedis> {

    private JedisConnectPropeties propeties;

    private JedisConnect jedisConnect;


    public JedisConnectPoolFactory(JedisConnectPropeties propeties) {
        this.propeties = propeties;
    }

    /**
     * create a jedis instance
     *
     * @return
     * @throws Exception
     */
    @Override
    public Jedis create() throws Exception {
        this.jedisConnect = new JedisConnectImpl();
        return jedisConnect.create(propeties);
    }

    /**
     * wrap jedis object as a pool object
     *
     * @param obj
     * @return
     */
    @Override
    public PooledObject<Jedis> wrap(Jedis obj) {
        return new DefaultPooledObject<>(obj);
    }

    /**
     * destroy jedis object
     *
     * @param p
     */
    @Override
    public void destroyObject(PooledObject<Jedis> p){
        if (null != this.jedisConnect) {
            this.jedisConnect.close();
        }
        if (null != p){
            p.deallocate();
        }
    }
}

```

  •  JedisPoolConfig是jedis连接池的配置类,代码如下

```java

/**
 * @Description: jedis pool config
 * @Author: ruands
 * @Date: 2020-04-29 23:05:26
 */
public class JedisPoolConfig extends GenericObjectPoolConfig {

    public JedisPoolConfig(){
        setTestWhileIdle(true);
        setMinEvictableIdleTimeMillis(60000);
        setTimeBetweenEvictionRunsMillis(30000);
        setNumTestsPerEvictionRun(-1);
        setMaxTotal(5);
    }
}
```

  •  JedisPoolTest是连接池测试了,代码如下

```java

/**
 * @Description: jedis pool test
 * @Author: ruands
 * @Date: 2020-04-29 23:18:05
 */
public class JedisPoolTest {

    @Test
    public void test() throws Exception {
        AtomicLong index = new AtomicLong();
        JedisConnectPropeties propeties = new JedisConnectPropeties();
        propeties.setIp("192.168.10.215");
        propeties.setPort("6505");
        JedisConnectPoolFactory jedisConnectPoolFactory = new JedisConnectPoolFactory(propeties);
        GenericObjectPool<Jedis> objectPool = new GenericObjectPool<>(jedisConnectPoolFactory, new JedisPoolConfig());
        ExecutorService executorService = Executors.newFixedThreadPool(20, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("thread-name-" + index.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
        });
        CountDownLatch latch = new CountDownLatch(50);
        for (int i = 0; i < 100; i++) {
            executorService.submit(new TestTask(objectPool, String.valueOf(i), String.valueOf(i), latch));
        }
        latch.await();
        System.err.println("finish");
    }


    private class TestTask implements Runnable {

        private GenericObjectPool<Jedis> pool;
        private String key;
        private String value;
        private CountDownLatch latch;

        public TestTask(GenericObjectPool<Jedis> pool, String key, String value, CountDownLatch latch) {
            this.pool = pool;
            this.key = key;
            this.value = value;
            this.latch = latch;
        }

        @Override
        public void run() {
            Jedis jedis = null;
            try {
                jedis = pool.borrowObject();
                jedis.setex(key, 600, value);
                System.err.println("thread name: " + Thread.currentThread().getName() + ",jedis.name:" + jedis.toString() +
                        ",key: " + key + ",value:" + jedis.get(key));
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (null != jedis) {
                    pool.returnObject(jedis);
                }
                latch.countDown();
            }
        }
    }
}

```
执行结果如下
详解java中的对象池池化功能
连接池配置的最大连接数MaxTotal为5,因此创建了5个jedis连接,同时用10个线程的线程池循环执行TestTask执行20次,在并发执行过程中,通过复用连接池中的5个jedis连接,避免了多次建立jedis连接