暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

NIFI使用的JDBC连接池(common-dbcp2)

NIFI实战 2020-11-21
671

本文common-dbcp2学习来源于:

https://www.iteye.com/blog/bsr1983-2092467

配置
默认
含义
username
通过JDBC建立一个连接所需的用户名
password
通过JDBC建立一个连接所需的密码
url
通过JDBC建立一个连接所需的URL
driverClassName
所使用的JDBC驱动的类全名
connectionProperties
连接参数是在建立一个新连接时发送给JDBC驱动的
字符串的格式必须是[参数名=参数值;]
提示:用户名和密码属性是需要明确指出的,所以这两个参数不需要包含在这里
defaultAutoCommitJDBC驱动的缺省值通过这个池创建连接的默认自动提交状态。如果不设置,则setAutoCommit 方法将不被调用。
defaultReadOnlyJDBC驱动的缺省值通过这个池创建连接的默认只读状态。如果不设置,则setReadOnly  方法将不被调用。(部分驱动不支持只读模式,如:Informix)
defaultTransactionIsolationJDBC驱动的缺省值通过这个池创建连接的默认事务策略,设置值为下列中的某一个:
  • NONE

  • READ_COMMITTED

  • READ_UNCOMMITTED

  • REPEATABLE_READ

  • SERIALIZABLE

defaultCatalog
通过这个池创建连接的默认缺省的catalog 
cacheStatetrue如果设置为true,池化的连接将在第一次读或写,以及随后的写的时候缓存当前的只读状态和自动提交设置。这样就省去了对getter的任何进一步的调用时对数据库的额外查询。如果直接访问底层连接,只读状态和/或自动提交设置改变缓存值将不会被反映到当前的状态,在这种情况下,应该将该属性设置为false以禁用缓存。
initialSize0当这个池被启动时初始化的创建的连接个数,起始生效版本:1.2
maxTotal8
可以在这个池中同时被分配的有效连接数的最大值,如设置为负数,则不限制
maxIdle8
可以在池中保持空闲的最大连接数,超出设置值之外的空闲连接将被回收,如设置为负数,则不限制
minIdle0可以在池中保持空闲的最小连接数,超出设置值之外的空闲连接将被创建,如设置为0,则不创建
maxWaitMillisindefinitely(如果没有可用连接)池在抛出异常前等待的一个连接被归还的最大毫秒数,设置为-1则等待时间不确定
validationQuery
在连接池返回连接给调用者前用来进行连接校验的查询sql。如果指定,则这个查询必须是一个至少返回一行数据的SQL SELECT语句。如果没有指定,则连接将通过调用isValid() 方法进行校验。
testOnCreatefalse指明对象在创建后是否需要被校验,如果对象校验失败,则触发对象创建的租借尝试将失败。
testOnBorrowtrue指明在从池中租借对象时是否要进行校验,如果对象校验失败,则对象将从池子释放,然后我们将尝试租借另一个
testOnReturnfalse指明在将对象归还给连接池前是否需要校验。
testWhileIdlefalse指明对象是否需要通过对象驱逐者进行校验(如果有的话),假如一个对象校验失败,则对象将被从池中释放。
timeBetweenEvictionRunsMillis-1空闲对象驱逐线程运行时的休眠毫秒数,如果设置为非正数,则不运行空闲对象驱逐线程。
numTestsPerEvictionRun3在每个空闲对象驱逐线程运行过程中中进行检查的对象个数。(如果有的话)
minEvictableIdleTimeMillis1000 * 60 * 30符合对象驱逐对象驱逐条件的对象在池中最小空闲毫秒总数(如果有的话)
softMiniEvictableIdleTimeMillis-1符合对象驱逐对象驱逐条件的对象在池中最小空闲毫秒总数,额外的条件是池中至少保留有minIdle所指定的个数的连接。当miniEvictableIdleTimeMillis 被设置为一个正数,空闲连接驱逐者首先检测miniEvictableIdleTimeMillis,当空闲连接被驱逐者访问时,首先与miniEvictableIdleTimeMillis 所指定的值进行比较(而不考虑当前池中的空闲连接数),然后比较softMinEvictableIdleTimeMillis所指定的连接数,包括minIdle条件。
softMiniEvictableIdleTimeMillis-1符合对象驱逐对象驱逐条件的对象在池中最小空闲毫秒总数,额外的条件是池中至少保留有minIdle所指定的个数的连接。当miniEvictableIdleTimeMillis 被设置为一个正数,空闲连接驱逐者首先检测miniEvictableIdleTimeMillis,当空闲连接被驱逐者访问时,首先与miniEvictableIdleTimeMillis 所指定的值进行比较(而不考虑当前池中的空闲连接数),然后比较softMinEvictableIdleTimeMillis所指定的连接数,包括minIdle条件。
onnectionInitSqlsnull在第一次创建时用来初始化物理连接的SQL语句集合。这些语句只在配置的连接工厂创建连接时被执行一次。
lifotrue置为true表明连接池(如果池中有可用的空闲连接时)将返回最后一次使用的租借对象(最后进入)。设置为false则表明池将表现为FIFO队列——将会按照它们被归还的顺序从空闲连接实例池中获取连接

 提示: 如果在高负载的系统中将maxIdle的值设置的很低,则你可能会发现在一个新的连接刚刚被创建的时候就立即被关闭了。这是活跃的线程及时关闭连接要比那些打开连接的线程要快,导致空闲的连接数大于maxIdle。高负载系统中maxIdle的最合适的配置值是多样的,但是缺省值是一个好的开始点。


参数 缺省值 描述

poolPreparedStatementsfalse设置该连接池的预处理语句池是否生效
maxOpenPreparedStatementsunlimited可以在语句池中同时分配的最大语句数。设置为负数则不限制。

 这个设置同时作用于预处理语句池. 当一个可用的语句池被创建给每一个连接时,通过以下方法创建的预处理语句将被池化。

  • public PreparedStatement prepareStatement(String sql)

  • public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)

 提示 -要确保你的连接会留下一些资源给其他语句。池化预处理语句可能会在数据库中保持他们的游标,可能会引起连接的游标越界,尤其是maxOpenPreparedStatements的值被设置为默认值(无限的),而且一个应用程序可能会为每个连接打开大量不同的预处理语句。为了避免这个问题maxOpenPreparedStatements应该被设置为一个小于连接可以打开的最大游标数的值。

参数 缺省值 描述

accessToUnderlyingConnectionAllowedfalse控制PoolGuard是否可以访问底层连接 

如果允许访问的话,使用如下代码结构:

    Connection conn = ds.getConnection();
    Connection dconn =((DelegatingConnection) conn).getInnermostDelegate();
    conn.close()

     默认值为false,这是一个有着潜在风险的操作,使用不当可能会导致非常严重的后果。(在守卫连接已被关闭的情况下,关闭底层连接或者继续使用它),只有在你需要直接访问驱动的特有扩展是可以谨慎使用。

     NOTE: 除了最原始那个之外,不要关闭底层连接

    参数 缺省值 描述




    removeAbandonedfalse标记是否删除超过removeAbandonedTimout所指定时间的被遗弃的连接。
    如果设置为true,则一个连接在超过removeAbandonedTimeout所设定的时间未使用即被认为是应该被抛弃并应该被移除的。创建一个语句,预处理语句,可调用语句或使用它们其中的一个执行查询(使用执行方法中的某一个)会重新设置其父连接的lastUsed 属性。
    在写操作较少的应用程序中将该参数设置为true可以将数据库连接从连接关闭失败中恢复。
    removeAbandonedTimeout300一个被抛弃连接可以被移除的超时时间,单位为秒
    logAbandonedfalse标志是否为应用程序中遗弃语句或连接的代码开启日志堆栈追踪。
    因为一个堆栈跟踪已被创建,被抛弃的语句和连接相关的日志将被覆盖到打开每个连接或者创建一个Statement时

    如果你启用了removeAbandoned,则一个连接被池回收再利用是可能的,因为它被认为是已遗弃 在(getNumIdle() < 2) and (getNumActive() > getMaxTotal() - 3)成立时,这个机制将被触发。

     例如, maxTotal=20 ,这里有18个活跃连接,一个限制连接,将触发 "removeAbandoned"。但是只有在活动连接超过 "removeAbandonedTimeout" 所指定的秒数内未使用才会被删除(默认为300秒)。遍历一个结果集并不被统计为被使用,创建一个语句,预处理语句,可调用语句或使用它们其中的一个执行查询(使用执行方法中的某一个)会重新设置其父连接的lastUsed 属性。

    实战:从nifi中剥离出的连接池使用的核心代码

      import org.apache.commons.dbcp2.BasicDataSource;


      import javax.xml.bind.DatatypeConverter;
      import java.io.File;
      import java.io.FilenameFilter;
      import java.io.UnsupportedEncodingException;
      import java.net.*;
      import java.security.MessageDigest;
      import java.security.NoSuchAlgorithmException;
      import java.sql.*;
      import java.util.*;
      import java.util.logging.Logger;
      import java.util.stream.Collectors;


      public class DBCP2Test {
      public static void main(String[] args) throws Exception {
      // https://www.iteye.com/blog/bsr1983-2092467
      // 通过JDBC建立一个连接所需的URL
      String DATABASE_URL = "jdbc:mysql://192.168.42.6:3306/datax";
      // 所使用的JDBC驱动的类全名
      String DB_DRIVERNAME = "com.mysql.jdbc.Driver";
      // JDBC 驱动路径
      String DB_DRIVER_LOCATION = "mysql-connector-java-5.1.44.jar";
      // 通过JDBC建立一个连接所需的用户名
      String DB_USER = "root";
      // 通过JDBC建立一个连接所需的密码
      String DB_PASSWORD = "123456";
      // (如果没有可用连接)池在抛出异常前等待的一个连接被归还的最大毫秒数,设置为-1则等待时间不确定
      String MAX_WAIT_TIME = "500";
      // 可以在这个池中同时被分配的有效连接数的最大值,如设置为负数,则不限制
      String MAX_TOTAL_CONNECTIONS = "8";
      // 在连接池返回连接给调用者前用来进行连接校验的查询sql。如果指定,则这个查询必须是一个至少返回一行数据的SQL SELECT语句。如果没有指定,则连接将通过调用isValid() 方法进行校验
      String VALIDATION_QUERY = "select 1";
      // 可以在池中保持空闲的最小连接数,超出设置值之外的空闲连接将被创建,如设置为0,则不创建
      String MIN_IDLE = "0";
      // 可以在池中保持空闲的最大连接数,超出设置值之外的空闲连接将被回收,如设置为负数,则不限制
      String MAX_IDLE = "8";
      // 一个连接的最大存活毫秒数。如果超过这个时间,则连接在下次激活、钝化、校验时都将会失败。如果设置为0或小于0的值,则连接的存活时间是无限的。
      String MAX_CONN_LIFETIME = "-1";
      // 空闲对象驱逐线程运行时的休眠毫秒数,如果设置为非正数,则不运行空闲对象驱逐线程。
      String EVICTION_RUN_PERIOD = "-1";
      // 符合对象驱逐对象驱逐条件的对象在池中最小空闲毫秒总数(如果有的话)
      String MIN_EVICTABLE_IDLE_TIME = "30";
      // 符合对象驱逐对象驱逐条件的对象在池中最小空闲毫秒总数,额外的条件是池中至少保留有minIdle所指定的个数的连接。当miniEvictableIdleTimeMillis 被设置为一个正数,空闲连接驱逐者首先检测miniEvictableIdleTimeMillis,当空闲连接被驱逐者访问时,首先与miniEvictableIdleTimeMillis 所指定的值进行比较(而不考虑当前池中的空闲连接数),然后比较softMinEvictableIdleTimeMillis所指定的连接数,包括minIdle条件。
      String SOFT_MIN_EVICTABLE_IDLE_TIME = "-1";
      BasicDataSource dataSource = null;
      Connection connection = null;
      Statement statement = null;
      ResultSet rs = null;
      try {
      dataSource = new BasicDataSource();
      dataSource.setDriverClassName(DB_DRIVERNAME);
      dataSource.setDriverClassLoader(new DBCP2Test().getDriverClassLoader(DB_DRIVER_LOCATION, DB_DRIVERNAME));
      dataSource.setMaxWaitMillis(Long.parseLong(MAX_WAIT_TIME));
      dataSource.setMaxTotal(Integer.parseInt(MAX_TOTAL_CONNECTIONS));
      dataSource.setMinIdle(Integer.parseInt(MIN_IDLE));
      dataSource.setMaxIdle(Integer.parseInt(MAX_IDLE));
      dataSource.setMaxConnLifetimeMillis(Long.parseLong(MAX_CONN_LIFETIME));
      dataSource.setTimeBetweenEvictionRunsMillis(Long.parseLong(EVICTION_RUN_PERIOD));
      dataSource.setMinEvictableIdleTimeMillis(Long.parseLong(MIN_EVICTABLE_IDLE_TIME) * 60 * 1000);
      dataSource.setSoftMinEvictableIdleTimeMillis(Long.parseLong(SOFT_MIN_EVICTABLE_IDLE_TIME));
      dataSource.setValidationQuery(VALIDATION_QUERY);
      dataSource.setTestOnBorrow(true);
      dataSource.setUrl(DATABASE_URL);
      dataSource.setUsername(DB_USER);
      dataSource.setPassword(DB_PASSWORD);
      connection = dataSource.getConnection();
      statement = connection.createStatement();
      String sql = "SELECT * from achievement_source LIMIT 1";
      rs = statement.executeQuery(sql);
      while (rs.next()) {
      int id = rs.getInt(1);
      String name = rs.getString(2);
      String gender = rs.getString(3);
      System.out.println("id:" + id + " 姓名:" + name + " 性别:" + gender);
      }
      } catch (Exception e) {
      System.out.println(e.getMessage());
      } finally {
      try {
      if (rs != null) {
      rs.close();
      }
      } catch (Exception e) {
      System.out.println(e.getMessage());
      }
      try {
      if (statement != null) {
      statement.close();
      }
      } catch (Exception e) {
      System.out.println(e.getMessage());
      }
      try {
      if (connection != null) {
      connection.close();
      }
      } catch (Exception e) {
      System.out.println(e.getMessage());
      }
      try {
      if (dataSource != null) {
      dataSource.close();
      }
      } catch (Exception e) {
      System.out.println(e.getMessage());
      }
      }
      }


      public ClassLoader getDriverClassLoader(String locationString, String drvName) throws Exception {
      if (locationString != null && locationString.length() > 0) {
      try {
      final ClassLoader classLoader = getCustomClassLoader(
      locationString,
      this.getClass().getClassLoader(),
      (dir, name) -> name != null && name.endsWith(".jar")
      );
      final Class<?> clazz = Class.forName(drvName, true, classLoader);
      if (clazz == null) {
      System.out.println("Can't load Database Driver " + drvName);
      }
      final Driver driver = (Driver) clazz.newInstance();
      DriverManager.registerDriver(new DriverShim(driver));
      return classLoader;
      } catch (final MalformedURLException e) {
      System.out.println("Invalid Database Driver Jar Url" + e.getMessage());
      throw e;
      } catch (final Exception e) {
      System.out.println("Can't load Database Driver" + e.getMessage());
      throw e;
      }
      } else {
      return Thread.currentThread().getContextClassLoader();
      }
      }


      public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
      URL[] classpaths = getURLsForClasspath(modulePath, filenameFilter, false);
      return createModuleClassLoader(classpaths, parentClassLoader);
      }




      public static URL[] getURLsForClasspath(String modulePath, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
      return getURLsForClasspath(modulePath == null ? Collections.emptySet() : Collections.singleton(modulePath), filenameFilter, suppressExceptions);
      }




      public static URL[] getURLsForClasspath(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
      // use LinkedHashSet to maintain the ordering that the incoming paths are processed
      Set<String> modules = new LinkedHashSet<>();
      if (modulePaths != null) {
      modulePaths.stream()
      .flatMap(path -> Arrays.stream(path.split(",")))
      .filter(path -> isNotBlank(path))
      .map(String::trim)
      .forEach(m -> modules.add(m));
      }
      return toURLs(modules, filenameFilter, suppressExceptions);
      }


      private static boolean isNotBlank(final String value) {
      return value != null && !value.trim().isEmpty();
      }


      protected static URL[] toURLs(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
      List<URL> additionalClasspath = new LinkedList<>();
      if (modulePaths != null) {
      for (String modulePathString : modulePaths) {
      boolean isUrl = true;
      try {
      additionalClasspath.add(new URL(modulePathString));
      } catch (MalformedURLException mue) {
      isUrl = false;
      }
      if (!isUrl) {
      try {
      File modulePath = new File(modulePathString);


      if (modulePath.exists()) {


      additionalClasspath.add(modulePath.toURI().toURL());


      if (modulePath.isDirectory()) {
      File[] files = modulePath.listFiles(filenameFilter);


      if (files != null) {
      for (File classpathResource : files) {
      if (classpathResource.isDirectory()) {
      System.out.println("Recursive directories are not supported, skipping " + classpathResource.getAbsolutePath());
      } else {
      additionalClasspath.add(classpathResource.toURI().toURL());
      }
      }
      }
      }
      } else {
      throw new MalformedURLException("Path specified does not exist");
      }
      } catch (MalformedURLException e) {
      if (!suppressExceptions) {
      throw e;
      }
      }
      }
      }
      }
      return additionalClasspath.toArray(new URL[additionalClasspath.size()]);
      }


      public static String generateAdditionalUrlsFingerprint(Set<URL> urls) {
      List<String> listOfUrls = urls.stream().map(Object::toString).collect(Collectors.toList());
      StringBuffer urlBuffer = new StringBuffer();


      Collections.sort(listOfUrls);
      try {
      MessageDigest md = MessageDigest.getInstance("MD5");
      listOfUrls.forEach(url -> {
      urlBuffer.append(url).append("-").append(getLastModified(url)).append(";");
      });
      byte[] bytesOfAdditionalUrls = urlBuffer.toString().getBytes("UTF-8");
      byte[] bytesOfDigest = md.digest(bytesOfAdditionalUrls);


      return DatatypeConverter.printHexBinary(bytesOfDigest);
      } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
      System.out.println("Unable to generate fingerprint for the provided additional resources {}" + Arrays.toString(new Object[]{urls, e}));
      return null;
      }
      }


      private static long getLastModified(String url) {
      File file = null;
      try {
      file = new File(new URI(url));
      } catch (URISyntaxException e) {
      System.out.println("Error getting last modified date for " + url);
      }
      return file != null ? file.lastModified() : 0;
      }


      protected static ClassLoader createModuleClassLoader(URL[] modules, ClassLoader parentClassLoader) {
      return new URLClassLoader(modules, parentClassLoader);
      }


      static class DriverShim implements Driver {
      private Driver driver;


      DriverShim(Driver d) {
      this.driver = d;
      }


      @Override
      public boolean acceptsURL(String u) throws SQLException {
      return this.driver.acceptsURL(u);
      }


      @Override
      public Connection connect(String u, Properties p) throws SQLException {
      return this.driver.connect(u, p);
      }


      @Override
      public int getMajorVersion() {
      return this.driver.getMajorVersion();
      }


      @Override
      public int getMinorVersion() {
      return this.driver.getMinorVersion();
      }


      @Override
      public DriverPropertyInfo[] getPropertyInfo(String u, Properties p) throws SQLException {
      return this.driver.getPropertyInfo(u, p);
      }


      @Override
      public boolean jdbcCompliant() {
      return this.driver.jdbcCompliant();
      }


      @Override
      public Logger getParentLogger() throws SQLFeatureNotSupportedException {
      return driver.getParentLogger();
      }


      }
      }



      文章转载自NIFI实战,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论