暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Thrift RPC实战.(四) thrift连接池的实现

程序员小阳的代码人生 2018-03-30
823

对象池是一种很实用的技术,经典的例子就是数据库连接池。本篇直接在apache开源项目commons-pool的基础上开发。

步骤:

1、定义对象工厂

  1. package com.yangyang.thrift.connection;


  2. import org.apache.commons.pool2.BasePooledObjectFactory;

  3. import org.apache.commons.pool2.PooledObject;

  4. import org.apache.commons.pool2.impl.DefaultPooledObject;

  5. import org.apache.thrift.protocol.TCompactProtocol;

  6. import org.apache.thrift.protocol.TProtocol;

  7. import org.apache.thrift.transport.TFastFramedTransport;

  8. import org.apache.thrift.transport.TSocket;

  9. import org.apache.thrift.transport.TTransport;

  10. import org.apache.thrift.transport.TTransportException;


  11. /**

  12. * 定义对象工厂

  13. * Created by chenshunyang on 16/8/15.

  14. */

  15. public class TProtocolFactory extends BasePooledObjectFactory<TProtocol> {


  16.    private String host;

  17.    private  int port;

  18.    private  boolean keepAlive =true;


  19.    public TProtocolFactory(String host, int port, boolean keepAlive) {

  20.        this.host = host;

  21.        this.port = port;

  22.        this.keepAlive = keepAlive;

  23.    }


  24.    @Override

  25.    public TProtocol create() throws Exception {

  26.        TSocket socket = new TSocket(host,port);

  27.        TTransport tTransport = new TFastFramedTransport(socket);

  28.        tTransport.open();

  29.        return new TCompactProtocol(tTransport);

  30.    }


  31.    @Override

  32.    public PooledObject<TProtocol> wrap(TProtocol protocol) {

  33.        return new DefaultPooledObject<TProtocol>(protocol);

  34.    }


  35.    /**

  36.     *  对象钝化(即:从激活状态转入非激活状态,returnObject时触发)

  37.     * @param pooledObject

  38.     * @throws Exception

  39.     */

  40.    @Override

  41.    public void passivateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {

  42.        if (keepAlive){

  43.            pooledObject.getObject().getTransport().flush();

  44.            pooledObject.getObject().getTransport().close();

  45.        }

  46.    }


  47.    /**

  48.     * 对象激活(borrowObject时触发)

  49.     * @param pooledObject

  50.     * @throws TTransportException

  51.     */

  52.    public void activateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {

  53.        if (!pooledObject.getObject().getTransport().isOpen()){

  54.            pooledObject.getObject().getTransport().open();

  55.        }

  56.    }


  57.    /**

  58.     * 对象销毁(clear时会触发)

  59.     * @param pooledObject

  60.     * @throws TTransportException

  61.     */

  62.    public void destroyObject(PooledObject<TProtocol> pooledObject) throws TTransportException{

  63.        passivateObject(pooledObject);

  64.        pooledObject.markAbandoned();

  65.    }


  66.    /**

  67.     * 验证对象有效性

  68.     * @param pooledObject

  69.     * @return

  70.     */

  71.    public boolean validateObject(PooledObject<TProtocol> pooledObject){

  72.        if (pooledObject.getObject() != null){

  73.            if (pooledObject.getObject().getTransport().isOpen()){

  74.                return true;

  75.            }

  76.            try {

  77.                pooledObject.getObject().getTransport().open();

  78.                return  true;

  79.            } catch (TTransportException e) {

  80.                e.printStackTrace();

  81.            }

  82.        }

  83.        return false;

  84.    }

  85. }

复制

有二个关键的方法,需要重写:activateObject(对象激活) 及 passivateObject(对象钝化)

2、定义对象池

  1. package com.yangyang.thrift.connection;


  2. import org.apache.commons.pool2.PooledObjectFactory;

  3. import org.apache.commons.pool2.impl.GenericObjectPool;

  4. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;


  5. /**

  6. * 定义对象池,GenericObjectPool为对象池的默认实现

  7. * Created by chenshunyang on 16/8/15.

  8. */

  9. public class AutoClearGenericObjectPool<T> extends GenericObjectPool<T>{


  10.    public AutoClearGenericObjectPool(PooledObjectFactory<T> factory) {

  11.        super(factory);

  12.    }


  13.    public AutoClearGenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config) {

  14.        super(factory, config);

  15.    }


  16.    @Override

  17.    public void returnObject(T obj) {

  18.        super.returnObject(obj);

  19.        //空闲数>=激活数时,清理掉空闲连接

  20.        if (getNumIdle() >= getNumActive()) {

  21.            clear();

  22.        }

  23.    }

  24. }

复制

common-pools提供了对象池的默认实现:GenericObjectPool 但是该对象池中,对于处于空闲的对象,需要手动调用clear来释放空闲对象,如果希望改变这一行为,可以自己派生自己的子类,重写returnObject方法,上面的代码中,每次归还对象时,如果空闲的对象比激活的对象还要多(即:一半以上的对象都在打酱油),则调用clear方法。

3、使用示例:

  1. package com.yangyang.thrift.connection;


  2. import org.apache.commons.pool2.ObjectPool;

  3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

  4. import org.apache.thrift.protocol.TProtocol;


  5. import java.util.ArrayList;

  6. import java.util.List;

  7. import java.util.Random;


  8. /**

  9. * Created by chenshunyang on 16/8/15.

  10. */

  11. public class ProtocolPoolTest {

  12.    public static void main(String[] args) throws Exception{

  13.        GenericObjectPoolConfig config = new GenericObjectPoolConfig();

  14.        config.setMaxIdle(10);

  15.        config.setMinIdle(1);

  16.        config.setTestOnBorrow(true);


  17.        ObjectPool<TProtocol> pool = new AutoClearGenericObjectPool<TProtocol>(

  18.                new TProtocolFactory("127.0.0.1", 2181, true), config);


  19.        List<TProtocol> list = new ArrayList<TProtocol>();

  20.        for (int i = 1; i <= 10; i++) {

  21.            TProtocol protocol = pool.borrowObject();

  22.            System.out.println(protocol.toString());

  23.            if (i % 2 == 0) {

  24.                //10个连接中,将偶数归还

  25.                pool.returnObject(protocol);

  26.            } else {

  27.                list.add(protocol);

  28.            }

  29.        }


  30.        Random rnd = new Random();

  31.        while (true) {

  32.            System.out.println(String.format("active:%d,idea:%d", pool.getNumActive(), pool.getNumIdle()));

  33.            Thread.sleep(5000);

  34.            //每次还一个

  35.            if (list.size() > 0) {

  36.                int i = rnd.nextInt(list.size());

  37.                pool.returnObject(list.get(i));

  38.                list.remove(i);

  39.            }


  40.            //直到全部还完

  41.            if (pool.getNumActive() <= 0) {

  42.                break;

  43.            }

  44.        }


  45.        System.out.println("clear all------------------------");


  46.        list.clear();

  47.        //连接池为空,测试是否能重新创建新连接

  48.        for (int i = 1; i <= 10; i++) {

  49.            TProtocol protocol = pool.borrowObject();

  50.            System.out.println(protocol.toString());

  51.            if (i % 2 == 0) {

  52.                pool.returnObject(protocol);

  53.            } else {

  54.                list.add(protocol);

  55.            }

  56.        }


  57.        while (true) {

  58.            System.out.println(String.format("active:%d,idea:%d", pool.getNumActive(), pool.getNumIdle()));

  59.            Thread.sleep(5000);

  60.            if (list.size() > 0) {

  61.                int i = rnd.nextInt(list.size());

  62.                pool.returnObject(list.get(i));

  63.                list.remove(i);

  64.            }


  65.            if (pool.getNumActive() <= 0) {

  66.                pool.close();

  67.                break;

  68.            }

  69.        }



  70.    }

  71. }

复制

注:需要从对象池取一个对象时,调用borrowObject(背后会调用activeObject激活对象),类似的,对象使用完之后,需要调用returnObject将对象放回对象池(背后会调用passivateObject使对象钝化)

输出:

  1. org.apache.thrift.protocol.TCompactProtocol@6ae40994


  2. org.apache.thrift.protocol.TCompactProtocol@1a93a7ca


  3. org.apache.thrift.protocol.TCompactProtocol@4dcbadb4


  4. org.apache.thrift.protocol.TCompactProtocol@4e515669


  5. org.apache.thrift.protocol.TCompactProtocol@4e515669


  6. org.apache.thrift.protocol.TCompactProtocol@17d10166


  7. org.apache.thrift.protocol.TCompactProtocol@17d10166


  8. org.apache.thrift.protocol.TCompactProtocol@1b9e1916


  9. org.apache.thrift.protocol.TCompactProtocol@1b9e1916


  10. org.apache.thrift.protocol.TCompactProtocol@ba8a1dc


  11. active:5,idea:1


  12. active:4,idea:2


  13. active:3,idea:0


  14. active:2,idea:1


  15. active:1,idea:0


  16. clear all------------------------


  17. org.apache.thrift.protocol.TCompactProtocol@1b701da1


  18. org.apache.thrift.protocol.TCompactProtocol@726f3b58


  19. org.apache.thrift.protocol.TCompactProtocol@442d9b6e


  20. org.apache.thrift.protocol.TCompactProtocol@ee7d9f1


  21. org.apache.thrift.protocol.TCompactProtocol@ee7d9f1


  22. org.apache.thrift.protocol.TCompactProtocol@15615099


  23. org.apache.thrift.protocol.TCompactProtocol@15615099


  24. org.apache.thrift.protocol.TCompactProtocol@1edf1c96


  25. org.apache.thrift.protocol.TCompactProtocol@1edf1c96


  26. org.apache.thrift.protocol.TCompactProtocol@368102c8


  27. active:5,idea:1


  28. active:4,idea:2


  29. active:3,idea:0


  30. active:2,idea:1


  31. active:1,idea:0

复制

从输出上看,归还对象后,再次取出时,并没有创建新对象,而是直接使用了对象池中已经空闲的对象。当对象池中的所有对象都归还变成空闲并被clear后,再次从对象池中借对象时,会重新创建对象。

参考文章:

http://www.cnblogs.com/yjmyzz/p/the-implementation-of-thrift-pool-using-common-pool2.html http://www.blogways.net/blog/2014/01/15/apache-commons-pool.html

---------------------------------------------

欢迎关注我的公众号:


文章转载自程序员小阳的代码人生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论