暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

用Java获取PostgreSQL变更数据

3058

上期示例了一下 Oracle CDC的配置 过程,本期我们再来看一下 用 Java 程序实现 PostgreSQL  如何实现变更数据的获取。


数据同步方式

PostgreSQL数据库提供了两种复制方式:物理复制和逻辑复制。

  • 物理复制

物理复制是指将主库 WAL 日志的日志页直接发到备机,备机完全应用的一种复制方式。

  • 逻辑复制

PostgreSQL 逻辑复制是事务级别的复制,使用订阅复制槽技术,通过在订阅端回放 WAL 日志中的逻辑条目。

物理复制和逻辑复制有各自的适用场景以及优缺点,这部分不是本篇讨论范围。

本篇我们主要介绍如何通过 Java 程序,实现 PostgreSQL 的逻辑复制。

PostgreSQL 配置

要使用 PostgreSQL 的逻辑复制功能,首先需要对数据库进行相应的配置以支持逻辑复制功能。

  • 在 postgres.conf 中加入以下配置项

    wal_level = logical
    max_wal_senders = 10
    max_worker_processes = 10
    max_replication_slots = 10
    复制
    • 创建复制账号

      CREATE USER repuser REPLICATION LOGIN
      CONNECTION LIMIT 8 ENCRYPTED PASSWORD 'repuser';
      复制
      • 创建逻辑复制槽

        SELECT * FROM pg_create_logical_replication_slot(
        'regression_slot', 'test_decoding');
        复制

        Java 代码实现

        在进行逻辑复制时,我们需要使用到 LSN 号进行复制。具体步骤如下:

        1、获取 LSN

        在 PostgreSQL 9.x 版本中,执行以下查询即可:

          SELECT pg_current_xlog_location();
          复制

          在 PostgreSQL 10.x 及以上版本中,执行以下查询即可:

            SELECT pg_current_wal_lsn();
            复制

            具体 Java 实现如下:

            代码中的 queryLSN(String sql,String column) 方法就是简单的数据库查询,在这里不再列出具体代码。

            下面的代码中用了个偷懒的办法,没有先判断数据库版本,而是先执行 pg_current_xlog_location() 如果报错代码为 42883 ,也就是未定义的函数,说明数据库版本不是9.x,则再执行 pg_current_wal_lsn() 。

              /**
              * 得到开始的LSN
              * @return
              * @throws SQLException
              */
              public LogSequenceNumber getStartLSN() throws SQLException {
              LogSequenceNumber currentLSN = null;
              try {
              final String lsn = this.queryLSN(
              "SELECT pg_current_xlog_location();",
              "pg_current_xlog_location");
              currentLSN = LogSequenceNumber.valueOf(lsn);
              }catch (SQLException exFunction) {
              if (!exFunction.getSQLState().equals("42883")) {
              throw exFunction;
              }
              final String lsn2 = this.queryforLSN(
              "SELECT pg_current_wal_lsn();",
              "pg_current_wal_lsn");
              currentLSN = LogSequenceNumber.valueOf(lsn2);
              }
              return currentLSN;
              }
              复制

              2、开启复制槽

              以下代码中的 dbConnection 就是一个 PostgreSQL 的数据库连接。具体数据库连接代码不再列出。
              下面代码中的 regression_slot 即最开始配置数据库时创建的逻辑复制槽名称。

                /**
                * 开启复制槽
                * @throws Exception
                */
                @Override
                void startSlot() throws Exception {
                final LogSequenceNumber startLSN = this.getStartLSN();
                this.replConnection = this.dbConnection.unwrap(PGConnection.class);
                this.stream = ((this.replConnection.getReplicationAPI()
                .replicationStream()
                .logical()
                .withSlotName('regression_slot'))
                .withSlotOption("include-xids", true)
                .withSlotOption("include-timestamp",true)
                .withStartPosition(startLSN)).start();


                }
                复制

                3、获取数据库变更数据

                开启复制槽之后,我们就可以获取实时变更数据。具体获取变更数据代码如下:

                  /**
                  * 读取变更数据
                  * @return
                  * @throws Exception
                  */
                  @Override
                  public String processRecords() throws Exception {
                  String event = null;
                  final ByteBuffer record = this.stream.read();
                  if (record != null) {
                  final int offset = record.arrayOffset();
                  final byte[] source = record.array();
                  final int length = source.length - offset;
                  event = new String(source, offset, length);
                  }
                  return event;
                  }
                  复制

                  我们可以使用一个线程,循环获取需要数据库的实时变更,并将得到的数据加入到一个队列中。

                  到这里,我们就完成了用 Java 程序获取 PostgreSQL 数据变更的关键代码。

                  现在就可以到数据库里插入数据。

                  读取到的数据格式如下:

                    BEGIN 36652
                    table test.source: INSERT: id[integer]:1 name[character varying]:'grainger' address[character varying]:'China'
                    COMMIT 36652 (at 2020-09-19 12:00:41.005607+08)
                    复制

                    是不是很简单呢?

                    关键代码都给你了,剩下的就看你的了。


                    打个广告

                    帮家里卖点茶,打个广告,感谢各位朋友的大力支持。

                    中秋送礼,日照绿茶。



                    文章转载自山东Oracle用户组,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论