暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Seata 源码剖析之 TC TM

贝贝猫技术分享 2019-10-28
244


引言

书接上文,之前我们介绍 Seata 的理论思想,接下来,我们着重剖析一下 Seata 中的关键组件。

源码剖析

Seata 整体的代码比较多, 为了避免代码堆砌影响读感, 这里我只介绍核心内容, 我会先介绍 Seata 中的核心组件 TC 和 TM, 然后再分别介绍一下 Seata 中现存的两个分支事务模式 AT、TCC 的完整流程,RM 的内容会根据分支事务模式的不同分别介绍。

TC

Transaction Coordinator 整体的模块图如上所示:

  • Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如分支的注册, commit, rollback 等协调活动。

  • Store: 存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。

  • Discover: 服务注册/发现模块,用于将 Server 地址暴露给我们 Client。

  • Config: 用来存储和查找我们服务端的配置。

  • Lock: 锁模块,用于给 Seata 提供全局锁的功能。

  • Rpc: 用于和其他端通信。

  • HA-Cluster: 高可用集群,目前还没开源。为 Seata 提供可靠的高可用功能。

Discover

首先来讲讲比较基础的 Discover 模块,又称服务注册/发现模块。我们将 TC 启动之后,需要将自己的地址暴露给其他使用者 TM & RM, 这部分工作就是由 Discover 模块实现的。

    public interface RegistryService<T> {


    **
    * The constant PREFIX_SERVICE_MAPPING.
    */
    String PREFIX_SERVICE_MAPPING = "vgroup_mapping.";
    **
    * The constant PREFIX_SERVICE_ROOT.
    */
    String PREFIX_SERVICE_ROOT = "service";
    **
    * The constant CONFIG_SPLIT_CHAR.
    */
    String CONFIG_SPLIT_CHAR = ".";


    **
    * Register.
    *
    * @param address the address
    * @throws Exception the exception
    */
    void register(InetSocketAddress address) throws Exception;


    **
    * Unregister.
    *
    * @param address the address
    * @throws Exception the exception
    */
    void unregister(InetSocketAddress address) throws Exception;


    **
    * Subscribe.
    *
    * @param cluster the cluster
    * @param listener the listener
    * @throws Exception the exception
    */
    void subscribe(String cluster, T listener) throws Exception;


    **
    * Unsubscribe.
    *
    * @param cluster the cluster
    * @param listener the listener
    * @throws Exception the exception
    */
    void unsubscribe(String cluster, T listener) throws Exception;


    **
    * Lookup list.
    *
    * @param key the key
    * @return the list
    * @throws Exception the exception
    */
    List<InetSocketAddress> lookup(String key) throws Exception;


    **
    * Close.
    * @throws Exception
    */
    void close() throws Exception;
    }

    这个模块有个核心接口 RegistryService,如上图所示:

    • register:TC 使用,进行服务注册。

    • unregister:TC 使用,一般在 JVM 关闭钩子,ShutdownHook 中调用。

    • subscribe:TM RM 使用,注册监听事件,用来监听地址的变化。

    • unsubscribe:TM RM 使用,取消注册监听事件, 一般在 JVM 关闭钩子,ShutdownHook 中调用。。

    • lookup:TM RM 使用,根据 key 查找服务地址列表。

    • close:都可以使用,用于关闭 Register 资源。

    如果需要添加自己定义的服务注册/发现,那么实现这个接口即可。截止目前在社区的不断开发推动下,已经有七种服务注册/发现,分别是 consul,etcd3,sofa,redis, zk, nacos, eruka。下面简单介绍下 redis 的实现:

    register
      @Override
      public void register(InetSocketAddress address) {
      校验地址是否合法
      NetUtil.validAddress(address);
      String serverAddr = NetUtil.toStringAddress(address);
      获取 Redis 的实例
      Jedis jedis = jedisPool.getResource();
      try {
      将地址注册到当前 Redis 上面。
      jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName());
      发送注册成功的通知
      jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
      } finally {
      jedis.close();
      }
      }

      流程如下:

      1. 校验地址是否合法

      2. 获取 Redis 的实例,然后将地址注册到当前 Redis 上面。

      3. 发送注册成功的通知

      unregister 接口类似,就是反方向操作, 这里不做详解。

      lookup
        @Override
        public List<InetSocketAddress> lookup(String key) {
        Configuration config = ConfigurationFactory.getInstance();
        获取当前 clusterName 名字
        String clusterName = config.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key);
        if (null == clusterName) {
        return null;
        }
        判断当前 cluster 是否已经获取过了,如果获取过就从map中取
        if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
        Jedis jedis = jedisPool.getResource();
        Map<String, String> instances = null;
        从 Redis 拿到地址数据,将其转换成我们所需要的
        try {
        instances = jedis.hgetAll(getRedisRegistryKey());
        } finally {
        jedis.close();
        }
        if (null != instances && !instances.isEmpty()) {
        Set<InetSocketAddress> newAddressSet = new HashSet<>();
        for (Map.Entry<String, String> instance : instances.entrySet()) {
        String serverAddr = instance.getKey();
        newAddressSet.add(NetUtil.toInetSocketAddress(serverAddr));
        }
        CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
        }
        将数据变更的 Listener 注册到 Redis
        subscribe(clusterName, new RedisListener() {
        @Override
        public void onEvent(String msg) {
        String[] msgr = msg.split("-");
        String serverAddr = msgr[0];
        String eventType = msgr[1];
        switch (eventType) {
        case RedisListener.REGISTER:
        CLUSTER_ADDRESS_MAP.get(clusterName).add(NetUtil.toInetSocketAddress(serverAddr));
        break;
        case RedisListener.UN_REGISTER:
        CLUSTER_ADDRESS_MAP.get(clusterName).remove(NetUtil.toInetSocketAddress(serverAddr));
        break;
        default:
        throw new ShouldNeverHappenException("unknown redis msg:" + msg);
        }
        }
        });
        }
        return new ArrayList<>(CLUSTER_ADDRESS_MAP.get(clusterName));
        }

        订阅的过程如下:

        1. 获取当前 clusterName 名字

        2. 判断当前 cluster 是否已经获取过了,如果获取过就从 map 中取。

        3. 从 Redis 拿到地址数据,将其转换成我们所需要的数据。

        4. 将数据变动的 Listener 注册到 Redis

        其实这里面有个问题, 如果获取了服务器列表, 但是还没来得及注册订阅时, 发生了服务器列表变化, 那么客户端会感知不到, 但是这个问题在 Redis 中确实没有什么好的办法解决, 毕竟 Redis 没有提供机制来解决这个问题。但是 etcd3 中是有机制来解决的, 获取数据时能拿到当时的版本号, 然后订阅时从该版本号开始即可。然后我看了一下基于 etcd3 的 RegistryService
        实现, 发现它并没有使用该机制。于是我提了一个 issue[1]PR[2], 感兴趣的同学可以去看一下。

        subscribe
          @Override
          public void subscribe(String cluster, RedisListener listener) {
          存储该 listener
          String redisRegistryKey = REDIS_FILEKEY_PREFIX + cluster;
          LISTENER_SERVICE_MAP.putIfAbsent(cluster, new ArrayList<>());
          LISTENER_SERVICE_MAP.get(cluster).add(listener);
          threadPoolExecutor.submit(new Runnable() {
          @Override
          public void run() {
          try {
          Jedis jedis = jedisPool.getResource();
          try {
          向 Redis 注册
          jedis.subscribe(new NotifySub(LISTENER_SERVICE_MAP.get(cluster)), redisRegistryKey);
          } finally {
          jedis.close();
          }
          } catch (Exception e) {
          LOGGER.error(e.getMessage(), e);
          }
          }
          });
          }

          流程如下:

          1. 存储该 listener

          2. 向 Redis 注册

          RegistryService
          的主要功能就这些了, TM 和 TC 是通过 lookup 找到服务器列表之后, 会根据设定的负载均衡策略请求 TC, 接下来我们看一看 loadbalance。

          loadbalance
            public interface LoadBalance {


            **
            * Select t.
            *
            * @param <T> the type parameter
            * @param invokers the invokers
            * @return the t
            * @throws Exception the exception
            */
            <T> T select(List<T> invokers) throws Exception;
            }

            这个接口的实现比较简单, 目前就只有随机和轮训。

            Config

            配置模块也是一个比较基础,比较简单的模块。我们需要配置一些常用的参数比如:Netty 的 select 线程数量,work 线程数量,session 允许最大为多少等等,当然这些参数在 Seata 中都有自己的默认设置。

            同样的在 Seata 中也提供了一个接口 Configuration,通过它来存取配置内容:

              public interface Configuration<T> {
              这里只保留了 getShort 其他都类似
              **
              * Gets short.
              *
              * @param dataId the data id
              * @param defaultValue the default value
              * @param timeoutMills the timeout mills
              * @return the short
              */
              short getShort(String dataId, int defaultValue, long timeoutMills);


              **
              * Gets short.
              *
              * @param dataId the data id
              * @param defaultValue the default value
              * @return the int
              */
              short getShort(String dataId, short defaultValue);


              **
              * Gets short.
              *
              * @param dataId the data id
              * @return the int
              */
              short getShort(String dataId);


              **
              * Gets config.
              *
              * @param dataId the data id
              * @param defaultValue the default value
              * @param timeoutMills the timeout mills
              * @return the config
              */
              String getConfig(String dataId, String defaultValue, long timeoutMills);


              **
              * Gets config.
              *
              * @param dataId the data id
              * @param defaultValue the default value
              * @return the config
              */
              String getConfig(String dataId, String defaultValue);


              **
              * Gets config.
              *
              * @param dataId the data id
              * @param timeoutMills the timeout mills
              * @return the config
              */
              String getConfig(String dataId, long timeoutMills);


              **
              * Gets config.
              *
              * @param dataId the data id
              * @return the config
              */
              String getConfig(String dataId);


              **
              * Put config boolean.
              *
              * @param dataId the data id
              * @param content the content
              * @param timeoutMills the timeout mills
              * @return the boolean
              */
              boolean putConfig(String dataId, String content, long timeoutMills);


              **
              * Put config boolean.
              *
              * @param dataId the data id
              * @param content the content
              * @return the boolean
              */
              boolean putConfig(String dataId, String content);


              **
              * Put config if absent boolean.
              *
              * @param dataId the data id
              * @param content the content
              * @param timeoutMills the timeout mills
              * @return the boolean
              */
              boolean putConfigIfAbsent(String dataId, String content, long timeoutMills);


              **
              * Put config if absent boolean.
              *
              * @param dataId the data id
              * @param content the content
              * @return the boolean
              */
              boolean putConfigIfAbsent(String dataId, String content);


              **
              * Remove config boolean.
              *
              * @param dataId the data id
              * @param timeoutMills the timeout mills
              * @return the boolean
              */
              boolean removeConfig(String dataId, long timeoutMills);


              **
              * Remove config boolean.
              *
              * @param dataId the data id
              * @return the boolean
              */
              boolean removeConfig(String dataId);


              **
              * Add config listener.
              *
              * @param dataId the data id
              * @param listener the listener
              */
              void addConfigListener(String dataId, T listener);


              **
              * Remove config listener.
              *
              * @param dataId the data id
              * @param listener the listener
              */
              void removeConfigListener(String dataId, T listener);


              **
              * Gets config listeners.
              *
              * @param dataId the data id
              * @return the config listeners
              */
              List<T> getConfigListeners(String dataId);


              **
              * Gets config from sys pro.
              *
              * @param dataId the data id
              * @return the config from sys pro
              */
              default String getConfigFromSysPro(String dataId) {
              return System.getProperty(dataId);
              }


              }
              • getShort/getInt/Long/Boolean/Config():通过 dataId 来获取对应的值。

              • putConfig:用于添加配置。

              • removeConfig:删除一个配置。

              • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。

              目前为止有四种方式获取 Config:File(文件获取), Nacos, Apollo, ZK,etcd。在 Seata 中首先现在项目 resources 下保存一个 registry.conf 文件,在该文件中配置具体使用 Config 接口哪个实现类。

                // registry.conf 相关内容
                config {
                # file、nacos 、apollo、zk、consul
                type = "file"


                file {
                name = "file.conf"
                }
                }

                config 相关的内容我们就不多描述了, 就是简单地存取数据, 发生变化时通知各个节点进行改变。

                Store

                存储层的实现对于 Seata 是否高性能,是否可靠非常关键。

                如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。

                在 Seata 中默认提供了文件方式的存储,下面我们定义我们存储的数据为 Session,而我们的 TM 创造的全局事务数据叫 GlobalSession,RM 创造的分支事务叫 BranchSession,一个 GlobalSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。

                Seata 中目前有 2 种实现方案, 一种是基于文件的, 一种是基于 DB 的, 我们接下来会分别介绍。

                File

                基于文件的实现是 FileTransactionStoreManager
                , 它可以使用同步刷盘或异步刷盘的策略,每当有 Session 的状态的更新时,它都会将变化的内容存储起来。为了防止存储文件的无限增殖,当达到一定条件时,它会另打开一个文件从头开始记录,并将之前的文件保存起来。这里有一个非常巧妙的设计,就是该方案既能保证所有超时事务不丢,只有已完成的事务被清除,同时文件的大小也得到了控制。我们会结合代码来介绍 Seata 是如何做到的。

                  @Override
                  public boolean writeSession(LogOperation logOperation, SessionStorable session) {
                  靠锁保证安全
                  writeSessionLock.lock();
                  long curFileTrxNum;
                  try {
                  实际的写数据过程,将编码后的比特数组写入 FileChannel
                  if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
                  return false;
                  }
                  lastModifiedTime = System.currentTimeMillis();
                  curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
                  如果当前事务存储文件已经累计记录一定数量的事务,并且该文件使用时间达标,则进行当前文件的保存和新文件的创建
                  if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 &&
                  (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
                  return saveHistory();
                  }
                  } catch (Exception exx) {
                  LOGGER.error("writeSession error," + exx.getMessage());
                  return false;
                  } finally {
                  writeSessionLock.unlock();
                  }
                  // 实际刷盘过程,根据配置,可以是同步也可以是异步
                  flushDisk(curFileTrxNum, currFileChannel);
                  return true;
                  }

                  上面的代码,就是存储 Session 的入口,其中 logOperation
                  可以是增加、删除、更新,session
                  可以是 GlobalSession,也可以是 BranchSession。其中就 3 个关键函数,writeDataFile
                  saveHistory
                  flushDisk
                  ,我们分别介绍一下它们。

                    private boolean writeDataFile(byte[] bs) {
                    if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {
                    return false;
                    }
                    ByteBuffer byteBuffer = null;


                    // 有一个默认缓存,如果该缓存太小,则临时申请
                    if (bs.length + 4 > MAX_WRITE_BUFFER_SIZE) {
                    //allocateNew
                    byteBuffer = ByteBuffer.allocateDirect(bs.length + 4);
                    } else {
                    byteBuffer = writeBuffer;
                    //recycle
                    byteBuffer.clear();
                    }


                    byteBuffer.putInt(bs.length);
                    byteBuffer.put(bs);
                    return writeDataFileByBuffer(byteBuffer);
                    }


                    private boolean writeDataFileByBuffer(ByteBuffer byteBuffer) {
                    byteBuffer.flip();
                    for (int retry = 0; retry < MAX_WRITE_RETRY; retry++) {
                    try {
                    // 循环写入
                    while (byteBuffer.hasRemaining()) {
                    currFileChannel.write(byteBuffer);
                    }
                    return true;
                    } catch (IOException exx) {
                    LOGGER.error("write data file error:" + exx.getMessage());
                    }
                    }
                    LOGGER.error("write dataFile failed,retry more than :" + MAX_WRITE_RETRY);
                    return false;
                    }

                    writeDataFile
                    的实现很简单,就是同步写入 FileChannel。接下来看一下最妙的 saveHistory

                      private boolean saveHistory() throws IOException {
                      boolean result;
                      try {
                      // 找到内存中保存的所有超时的事务,这些事务是需要回滚的,不能清除,但是其他完成的事务是可以清除的,这个保存过程实际上就是将超时事务追加到当前文件的结尾
                      result = findTimeoutAndSave();
                      // 然后异步关闭文件
                      writeDataFileRunnable.putRequest(new CloseFileRequest(currFileChannel, currRaf));
                      // 同时,给文件改名为historyFullFileName,替换掉旧的historyFullFile,同一时刻,Seata 只会有 2 个事务存储文件,一个是currentDataFile代表正在使用的文件,一个是historyFullFile,存储了过往的所有可能有用的 Session
                      Files.move(currDataFile.toPath(), new File(hisFullFileName).toPath(), StandardCopyOption.REPLACE_EXISTING);
                      } catch (IOException exx) {
                      LOGGER.error("save history data file error," + exx.getMessage());
                      result = false;
                      } finally {
                      initFile(currFullFileName);
                      }
                      return result;
                      }
                      // 找到所有超时的 Session 存储起来
                      private boolean findTimeoutAndSave() throws IOException {
                      List<GlobalSession> globalSessionsOverMaxTimeout =
                      sessionManager.findGlobalSessions(new SessionCondition(MAX_TRX_TIMEOUT_MILLS));
                      if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) {
                      return true;
                      }
                      List<byte[]> listBytes = new ArrayList<>();
                      int totalSize = 0;
                      // 1. find all data and merge
                      for (GlobalSession globalSession : globalSessionsOverMaxTimeout) {
                      TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD);
                      byte[] data = globalWriteStore.encode();
                      listBytes.add(data);
                      totalSize += data.length + INT_BYTE_SIZE;
                      List<BranchSession> branchSessIonsOverMaXTimeout = globalSession.getSortedBranches();
                      if (null != branchSessIonsOverMaXTimeout) {
                      for (BranchSession branchSession : branchSessIonsOverMaXTimeout) {
                      TransactionWriteStore branchWriteStore =
                      new TransactionWriteStore(branchSession, LogOperation.BRANCH_ADD);
                      data = branchWriteStore.encode();
                      listBytes.add(data);
                      totalSize += data.length + INT_BYTE_SIZE;
                      }
                      }
                      }
                      // 2. batch write
                      ByteBuffer byteBuffer = ByteBuffer.allocateDirect(totalSize);
                      for (byte[] bytes : listBytes) {
                      byteBuffer.putInt(bytes.length);
                      byteBuffer.put(bytes);
                      }
                      if (writeDataFileByBuffer(byteBuffer)) {
                      currFileChannel.force(false);
                      return true;
                      }
                      return false;
                      }

                      现在我们知道 Seata 同时最多有 2 个存储文件,一个是 currentDataFile 一个是 historyFullFile,currentDataFile 存储了最新的数据,而 historyFullFile 相较于 currentDataFile,还存储了之前过期的所有 Session。任何时候,如果 TC 宕机,重启时只要先读取 historyFullFile,再读取 currentDataFile 就能恢复所有数据。

                      替换 historyFullFile 时,因为会将所有超时的 Session 信息先写入 currentDataFile,然后才会将 currentDataFile 改名为 historyFullFile 并替换掉之前的 oldHistoryFullFile,这样所有过期 Session 就被延续下去了,实际上 Session 过期时间和新建 currentDataFile 的时间是一致的,都是 30 分钟,这样再进行 historyFullFile 替换时,之前的 oldHistoryFullFile 实际上只会存在超时 Session 和完成的 Session,所有超时 Session 已经被记录在新的 historyFullFile 中了,而完成的 Session 会在替换时,随着 oldHistoryFullFile 一起被删除。这就是为什么我觉得这个地方的设计十分巧妙。

                      最后刷盘的过程也很简单。根据配置,如果是同步刷盘会用 Future#get
                      阻塞等待,否则异步进行,writeDataFileRunnable
                      内部有一个阻塞队列,会有一个线程循环从中提取任务并执行,应该不难理解吧。

                        private void flushDisk(long curFileNum, FileChannel currFileChannel) {
                        if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
                        SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);
                        writeDataFileRunnable.putRequest(syncFlushRequest);
                        syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);
                        } else {
                        writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));
                        }
                        }
                        DB

                        接下来,我们看一下基于 DB 的实现。

                          @Override
                          public boolean writeSession(LogOperation logOperation, SessionStorable session) {
                          if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
                          logStore.insertGlobalTransactionDO(convertGlobalTransactionDO(session));
                          } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
                          logStore.updateGlobalTransactionDO(convertGlobalTransactionDO(session));
                          } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
                          logStore.deleteGlobalTransactionDO(convertGlobalTransactionDO(session));
                          } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
                          logStore.insertBranchTransactionDO(convertBranchTransactionDO(session));
                          } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
                          logStore.updateBranchTransactionDO(convertBranchTransactionDO(session));
                          } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
                          logStore.deleteBranchTransactionDO(convertBranchTransactionDO(session));
                          } else {
                          throw new StoreException("Unknown LogOperation:" + logOperation.name());
                          }
                          return true;
                          }

                          基于 DB 的实现相较于基于文件的实现就显得朴实无华,logStore
                          实际上就是一个 DAO 层的接口,对应了数据的 CRUD,在重启恢复时只不过是按照条件遍历 DB 中的所有数据,进行 Session 恢复。

                          Lock

                          大家知道数据库实现隔离级别主要是通过锁来实现的,同样的在分布式事务框架 Seata 中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在 Seata 中可以保证写操作的互斥性,而读的隔离级别一般是读未提交,但是提供了达到读已提交隔离的手段。

                          Lock 模块也就是 Seata 实现隔离级别的核心模块。在 Lock 模块中提供了一个接口用于管理我们的锁:

                            public interface LockManager {


                            /**
                            * Acquire lock boolean.
                            *
                            * @param branchSession the branch session
                            * @return the boolean
                            * @throws TransactionException the transaction exception
                            */
                            boolean acquireLock(BranchSession branchSession) throws TransactionException;


                            /**
                            * Un lock boolean.
                            *
                            * @param branchSession the branch session
                            * @return the boolean
                            * @throws TransactionException the transaction exception
                            */
                            boolean releaseLock(BranchSession branchSession) throws TransactionException;


                            /**
                            * Un lock boolean.
                            *
                            * @param globalSession the global session
                            * @return the boolean
                            * @throws TransactionException the transaction exception
                            */
                            boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;


                            /**
                            * Is lockable boolean.
                            *
                            * @param xid the xid
                            * @param resourceId the resource id
                            * @param lockKey the lock key
                            * @return the boolean
                            * @throws TransactionException the transaction exception
                            */
                            boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;


                            /**
                            * Clean all locks.
                            *
                            * @throws TransactionException the transaction exception
                            */
                            void cleanAllLocks() throws TransactionException;


                            }
                            • acquireLock:用于对我们的 BranchSession 加锁,这里虽然是传的分支事务 Session,实际上是对分支事务操作的数据行加锁,成功返回 true。

                            • isLockable:根据事务 ID,资源 ID,锁住的 Key 来查询是否已经加锁。

                            • releaseLock: 释放分支事务的所有锁。

                            • releaseGlobalSessionLock:释放全局事务的所有分支事务的锁。

                            • cleanAllLocks:清除所有的锁。

                            在 Seata 中, LockManager 下层有使用的锁有两种实现, 一种是基于内存的锁(Session 存储模式为 File 时使用), 一种是基于 DB 的(Session 存储模式为 DB 时使用),它们都实现了 Locker
                            接口:

                              public interface Locker {


                              /**
                              * Acquire lock boolean.
                              *
                              * @param rowLock the row lock
                              * @return the boolean
                              */
                              boolean acquireLock(List<RowLock> rowLock) ;


                              /**
                              * Un lock boolean.
                              *
                              * @param rowLock the row lock
                              * @return the boolean
                              */
                              boolean releaseLock(List<RowLock> rowLock);


                              /**
                              * Is lockable boolean.
                              *
                              * @param rowLock the row lock
                              * @return the boolean
                              */
                              boolean isLockable(List<RowLock> rowLock);


                              /**
                              * Clean all locks boolean.
                              *
                              * @return the boolean
                              */
                              void cleanAllLocks();
                              }

                              我们可以看到, 在 Locker 中将 branchSession 的概念剥离出去了, 只保留了 RowLock 的概念, 责任更加单一, 接下来我们分别看看它的实现类。

                              MemoryLocker

                              内存锁的实现全都存在一个锁 Map 中, 它是整个 Locker 的实现核心, 我们先来看一下它的结构:

                                private static final ConcurrentHashMap<String /* resourceId */,
                                ConcurrentHashMap<String /* tableName */,
                                ConcurrentHashMap<Integer /* bucketId */,
                                ConcurrentHashMap<String /* pk */, Long/* transactionId */>>>>
                                LOCK_MAP
                                = new ConcurrentHashMap<>();

                                我们可以看到, 通过这个 Map 将锁的粒度控制的很小, 最外层 Map 的 key 是 resourceId, 也就是对应了一个 RM, 然后第二层 Map 的 key 是表名, 对应了 RM 上操作的一张表, 下一层 Map 的 key 是 BucketID, Seata 根据表主键哈希值进行了分桶, 让冲突的概率降低, 默认有 128 个桶, 最后一层 Map 的 key 才是主键, Value 是持有该主键锁的事务 ID。

                                明确了锁存储的数据结构后, 再分析加解锁过程就清晰多了:

                                  // MemoryLocker
                                  @Override
                                  public boolean acquireLock(List<RowLock> rowLocks) {
                                  if (CollectionUtils.isEmpty(rowLocks)) {
                                  //no lock
                                  return true;
                                  }
                                  String resourceId = branchSession.getResourceId();
                                  long transactionId = branchSession.getTransactionId();


                                  ConcurrentHashMap<ConcurrentHashMap<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
                                  ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
                                  // 确认 RM 对应的 Map 是否已经构建
                                  if (dbLockMap == null) {
                                  LOCK_MAP.putIfAbsent(resourceId,
                                  new ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>>());
                                  dbLockMap = LOCK_MAP.get(resourceId);
                                  }


                                  for (RowLock lock : rowLocks) {
                                  String tableName = lock.getTableName();
                                  String pk = lock.getPk();
                                  ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>> tableLockMap = dbLockMap.get(tableName);
                                  // 确认表对应的 Map 是否已经构建好
                                  if (tableLockMap == null) {
                                  dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>());
                                  tableLockMap = dbLockMap.get(tableName);
                                  }
                                  int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
                                  ConcurrentHashMap<String, Long> bucketLockMap = tableLockMap.get(bucketId);
                                  // 确认 bucket map 是否已经构建好
                                  if (bucketLockMap == null) {
                                  tableLockMap.putIfAbsent(bucketId, new ConcurrentHashMap<String, Long>());
                                  bucketLockMap = tableLockMap.get(bucketId);
                                  }
                                  // 实际加锁过程
                                  Long previousLockTransactionId = bucketLockMap.putIfAbsent(pk, transactionId);
                                  if (previousLockTransactionId == null) {
                                  //No existing lock, and now locked by myself
                                  Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
                                  if (keysInHolder == null) {
                                  bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
                                  keysInHolder = bucketHolder.get(bucketLockMap);
                                  }
                                  keysInHolder.add(pk);
                                  } else if (previousLockTransactionId == transactionId) {
                                  // Locked by me before
                                  continue;
                                  } else {
                                  LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + previousLockTransactionId);
                                  try {
                                  // Release all acquired locks.
                                  branchSession.unlock();
                                  } catch (TransactionException e) {
                                  throw new FrameworkException(e);
                                  }
                                  return false;
                                  }
                                  }
                                  return true;
                                  }

                                  我们可以看到, 加锁的过程无非就是确认各级 Map 中是否有自己要的数据, 如果没有就用 putIfAbsent 添加进去, 最后到主键所在的 bucketMap 时, 才是真正加锁并确认的过程:

                                  1. 使用 putIfAbsent 将自己的 transactionId 填入 bucketLockMap

                                  2. 如果 previousLockTransactionId 为空, 说明自己获得了锁, 把自己获得的锁记录在 branchSession 中, 方便释放时查找

                                  3. 如果 previousLockTransactionId 和自己的 transactionId 相同, 说明这个锁之前就被自己持有了, 直接返回即可

                                  4. 否则, 发生了锁冲突, 释放自己之前获取到的所有锁

                                  其实, 这个实现中原来有一个死锁的 bug, 之前给 bucket Map 的加锁过程, 使用了 Synchronized block, 如果两个分支 Session 要同时锁一个表的相同数据, 并且加锁的顺序不同(BS1: row1, row2, row3; BS2: row3, row2, row1), 就会发生死锁。

                                  导致这个 bug 的原因是 Synchronized block 的作用范围有误, 将解锁过程也包在了该代码块中。所以, 当时我就提了 issue[3]pr[4], 有兴趣的同学可以去看一看。

                                  释放锁的过程就很简单了, 遍历 branchSession 中持有的所有锁, 并依次释放它们。

                                    // MemoryLocker
                                    @Override
                                    public boolean releaseLock(List<RowLock> rowLock) {
                                    // 取出所有持有的锁
                                    ConcurrentHashMap<ConcurrentHashMap<String, Long>, Set<String>> lockHolder = branchSession.getLockHolder();
                                    if (lockHolder == null || lockHolder.size() == 0) {
                                    return true;
                                    }
                                    Iterator<Map.Entry<ConcurrentHashMap<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();
                                    // 挨个释放锁
                                    while (it.hasNext()) {
                                    Map.Entry<ConcurrentHashMap<String, Long>, Set<String>> entry = it.next();
                                    ConcurrentHashMap<String, Long> bucket = entry.getKey();
                                    Set<String> keys = entry.getValue();
                                    for (String key : keys) {
                                    // remove lock only if it locked by myself
                                    bucket.remove(key, branchSession.getTransactionId());
                                    }
                                    }
                                    lockHolder.clear();
                                    return true;
                                    }

                                    这里大家可能会有疑问, 存在内存中的锁, 如果发生了崩溃, 重启的时候锁不就没了么, 其实 Seata 在重启并恢复 Session 的同时, 也会按顺序恢复各个 Session 的锁, 下面只会展示核心代码。

                                      /**
                                      * io.seata.server.session.SessionHolder#reload
                                      */
                                      protected static void reload() {
                                      // ...
                                      Collection<GlobalSession> reloadedSessions = ROOT_SESSION_MANAGER.allSessions();
                                      if (reloadedSessions != null && !reloadedSessions.isEmpty()) {
                                      reloadedSessions.forEach(globalSession -> {
                                      GlobalStatus globalStatus = globalSession.getStatus();
                                      switch (globalStatus) {
                                      case UnKnown:
                                      case Committed:
                                      case CommitFailed:
                                      case Rollbacked:
                                      case RollbackFailed:
                                      case TimeoutRollbacked:
                                      case TimeoutRollbackFailed:
                                      case Finished:
                                      throw new ShouldNeverHappenException("Reloaded Session should NOT be " + globalStatus);
                                      case AsyncCommitting:
                                      try {
                                      // 恢复未完成的异步提交过程
                                      globalSession.addSessionLifecycleListener(getAsyncCommittingSessionManager());
                                      getAsyncCommittingSessionManager().addGlobalSession(globalSession);
                                      } catch (TransactionException e) {
                                      throw new ShouldNeverHappenException(e);
                                      }
                                      break;
                                      default: {
                                      ArrayList<BranchSession> branchSessions = globalSession.getSortedBranches();
                                      // Lock, 重新加锁
                                      branchSessions.forEach(branchSession -> {
                                      try {
                                      branchSession.lock();
                                      } catch (TransactionException e) {
                                      throw new ShouldNeverHappenException(e);
                                      }
                                      });
                                      // ...
                                      }
                                      }


                                      });
                                      }
                                      }
                                      DataBaseLocker

                                      和 SessionManager 的实现相同, DataBaseLocker 的加锁过程实际上就是一个对 DB 增删数据。因为这部分比较简单, 所以我们只展示加锁的最核心内容:

                                        // LockStoreDataBaseDAO
                                        protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
                                        PreparedStatement ps = null;
                                        try {
                                        //insert
                                        String insertLockSQL = LockStoreSqls.getInsertLockSQL(lockTable, dbType);
                                        ps = conn.prepareStatement(insertLockSQL);
                                        ps.setString(1, lockDO.getXid());
                                        ps.setLong(2, lockDO.getTransactionId());
                                        ps.setLong(3, lockDO.getBranchId());
                                        ps.setString(4, lockDO.getResourceId());
                                        ps.setString(5, lockDO.getTableName());
                                        ps.setString(6, lockDO.getPk());
                                        ps.setString(7, lockDO.getRowKey());
                                        return ps.executeUpdate() > 0;
                                        } catch (SQLException e) {
                                        throw new StoreException(e);
                                        } finally {
                                        if (ps != null) {
                                        try {
                                        ps.close();
                                        } catch (SQLException e) {
                                        }
                                        }
                                        }
                                        }

                                        Rpc

                                        保证 Seata 高性能的关键之一也是使用了 Netty 作为 RPC 框架,采用默认配置的线程模型如下图所示:如果采用默认的基本配置, 那么会有一个 Acceptor 线程用于处理客户端的链接,会有 cpu*2 数量的 NIO-Thread,在这些 NIO-Thread 线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和 TM 注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为 100,最大为 500。

                                        关于 Netty 的使用基础, 我们这里就不详细介绍了, 简单说就是对于每个连接都会绑定上数据的 handler, 它会按照责任链的原则, 顺着 handler 的绑定顺序, 处理数据, 这里简单看下它都绑定了什么 handler:

                                          // Rpc Server 和 Rpc Client
                                          ch.pipeline()
                                          .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                                          .addLast(new ProtocolV1Decoder())
                                          .addLast(new ProtocolV1Encoder())
                                          .addList(this);

                                          我们可以看到, 它们都绑定了心跳组件 IdleStateHandler, 然后是编解码器, 最后是 Server(TC) 和 Client(TM RM), 它们会拿到原始的请求和回应数据, 据此来进行业务交互。

                                          前面介绍 Discover 模块时, 我们知道 Server 是将自己注册到注册中心, 然后 Client 订阅更新, 并得到 Server 的列表, 最后通过负载均衡选择一个 Server 进行连接。当连接建立成功后, Server 会保存所有的连接, 在需要进行分支回滚和提交时, 从所有 RM 的连接记录中, 找到对应 RM 的所有连接, 它会首先寻找最原始的 RM 节点, 如果该节点宕机了, 它会找到该 RM 的其他节点, 然后发送分支提交请求。

                                          HA-Cluster

                                          尚未实现

                                          Metrics

                                          统计接口目前的实现也很简单, 就是在内存中计数, 然后支持通过 HTTP 获取统计数据,这部分很简单我就不展示了。

                                          Coordinator Core

                                          在 TC 端, 大部分工作都是响应 TM 的请求, 然后发送提交回滚请求给 RM,下达提交或回滚命令, 这些部分我们会在后面的 AT 模式串讲 和 TCC 模式串讲中介绍, 本节主要看一下在 TC 模块中, 自主进行的一些工作。

                                          当 TC 启动时, 先恢复本机的 Session, 然后启动 RPC Server, 最后注册自己的地址到注册中心, 这些我们前面已经介绍过了, 除此之外, TC 还会启动几个后台线程, 这些线程保证了 TC 的协调工作能够在发生错误时, 最终能顺利完成, 我们来看一下这部分的代码:

                                            /**
                                            * Init.
                                            */
                                            public void init() {
                                            retryRollbacking.scheduleAtFixedRate(() -> {
                                            try {
                                            handleRetryRollbacking();
                                            } catch (Exception e) {
                                            LOGGER.info("Exception retry rollbacking ... ", e);
                                            }
                                            }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);


                                            retryCommitting.scheduleAtFixedRate(() -> {
                                            try {
                                            handleRetryCommitting();
                                            } catch (Exception e) {
                                            LOGGER.info("Exception retry committing ... ", e);
                                            }
                                            }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);


                                            asyncCommitting.scheduleAtFixedRate(() -> {
                                            try {
                                            handleAsyncCommitting();
                                            } catch (Exception e) {
                                            LOGGER.info("Exception async committing ... ", e);
                                            }
                                            }, 0, ASYN_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);


                                            timeoutCheck.scheduleAtFixedRate(() -> {
                                            try {
                                            timeoutCheck();
                                            } catch (Exception e) {
                                            LOGGER.info("Exception timeout checking ... ", e);
                                            }
                                            }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);


                                            undoLogDelete.scheduleAtFixedRate(() -> {
                                            try {
                                            undoLogDelete();
                                            } catch (Exception e) {
                                            LOGGER.info("Exception undoLog deleting ... ", e);
                                            }
                                            }, UNDOLOG_DELAY_DELETE_PERIOD, UNDOLOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
                                            }

                                            我们可以看到, 这些后台任务分别是回滚重试, 提交重试, 异步提交, 超时检测, 删除没用的 AT 模式 undo log。

                                            TM

                                            TM 和 TC 一样是一个共通的模块, 无论是 AT 模式还是 TCC 模式都需要使用 TM 模块。

                                            首先 TM 在启动的时候会去连接 TC Server, 然后然后通过该 TM Client 与 TC 模块进行通讯。在 TM 模块中最核心的接口就是 GlobalTransaction
                                            , 里面包含了全局事务的创建, 提交, 回滚过程, 其实质就是向 TC 发送 RPC 请求。

                                              public class DefaultGlobalTransaction implements GlobalTransaction {
                                              // 只保留核心内容...
                                              @Override
                                              public void begin(int timeout, String name) throws TransactionException {
                                              if (role != GlobalTransactionRole.Launcher) {
                                              check();
                                              if (LOGGER.isDebugEnabled()) {
                                              LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
                                              }
                                              return;
                                              }
                                              if (xid != null) {
                                              throw new IllegalStateException();
                                              }
                                              if (RootContext.getXID() != null) {
                                              throw new IllegalStateException();
                                              }
                                              xid = transactionManager.begin(null, null, name, timeout);
                                              status = GlobalStatus.Begin;
                                              RootContext.bind(xid);
                                              if (LOGGER.isInfoEnabled()) {
                                              LOGGER.info("Begin new global transaction [" + xid + "]");
                                              }


                                              }


                                              @Override
                                              public void commit() throws TransactionException {
                                              if (role == GlobalTransactionRole.Participant) {
                                              // Participant has no responsibility of committing
                                              if (LOGGER.isDebugEnabled()) {
                                              LOGGER.debug("Ignore Commit(): just involved in global transaction [" + xid + "]");
                                              }
                                              return;
                                              }
                                              if (xid == null) {
                                              throw new IllegalStateException();
                                              }


                                              status = transactionManager.commit(xid);
                                              if (RootContext.getXID() != null) {
                                              if (xid.equals(RootContext.getXID())) {
                                              RootContext.unbind();
                                              }
                                              }
                                              if (LOGGER.isInfoEnabled()) {
                                              LOGGER.info("[" + xid + "] commit status:" + status);
                                              }


                                              }


                                              @Override
                                              public void rollback() throws TransactionException {
                                              if (role == GlobalTransactionRole.Participant) {
                                              // Participant has no responsibility of committing
                                              if (LOGGER.isDebugEnabled()) {
                                              LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
                                              }
                                              return;
                                              }
                                              if (xid == null) {
                                              throw new IllegalStateException();
                                              }


                                              status = transactionManager.rollback(xid);
                                              if (RootContext.getXID() != null) {
                                              if (xid.equals(RootContext.getXID())) {
                                              RootContext.unbind();
                                              }
                                              }
                                              if (LOGGER.isInfoEnabled()) {
                                              LOGGER.info("[" + xid + "] rollback status:" + status);
                                              }
                                              }
                                              }

                                              我们可以看到, 这个接口中实际上没做什么实际的事, 它调用 transactionManager 发送消息, 然后将涉及到的全局事务 XID 保存起来, 我们看看它把数据存在什么地方了:

                                                public class ThreadLocalContextCore implements ContextCore {


                                                private ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>() {
                                                @Override
                                                protected Map<String, String> initialValue() {
                                                return new HashMap<String, String>();
                                                }


                                                };


                                                @Override
                                                public String put(String key, String value) {
                                                return threadLocal.get().put(key, value);
                                                }


                                                @Override
                                                public String get(String key) {
                                                return threadLocal.get().get(key);
                                                }


                                                @Override
                                                public String remove(String key) {
                                                return threadLocal.get().remove(key);
                                                }
                                                }

                                                看上去, 它是将 XID 存在了 ThreadLocal 中, 这样在整个 RPC 调用的上下文中都能获取到 XID。接下来, 我们看看下层的 transactionManager 都做了什么:

                                                  public class DefaultTransactionManager implements TransactionManager {
                                                  // All PRCs here
                                                  @Override
                                                  public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
                                                  throws TransactionException {
                                                  GlobalBeginRequest request = new GlobalBeginRequest();
                                                  request.setTransactionName(name);
                                                  request.setTimeout(timeout);
                                                  GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
                                                  if (response.getResultCode() == ResultCode.Failed) {
                                                  throw new TransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
                                                  }
                                                  return response.getXid();
                                                  }


                                                  @Override
                                                  public GlobalStatus commit(String xid) throws TransactionException {
                                                  GlobalCommitRequest globalCommit = new GlobalCommitRequest();
                                                  globalCommit.setXid(xid);
                                                  GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
                                                  return response.getGlobalStatus();
                                                  }


                                                  @Override
                                                  public GlobalStatus rollback(String xid) throws TransactionException {
                                                  GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
                                                  globalRollback.setXid(xid);
                                                  GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
                                                  return response.getGlobalStatus();
                                                  }
                                                  }

                                                  可以看到, TransactionManager 才是真正做实事的, 消息的发送工作都在这里完成。好了, 至此我们知道了哪个接口管理着全局事务的记录, 哪个接口真正进行 RPC 调用, 那么谁才是这些接口的真正调用者呢? Seata 使用了模板方法模式来进行这部分工作:

                                                    public class TransactionalTemplate {


                                                    /**
                                                    * Execute object.
                                                    *
                                                    * @param business the business
                                                    * @return the object
                                                    * @throws TransactionalExecutor.ExecutionException the execution exception
                                                    */
                                                    public Object execute(TransactionalExecutor business) throws Throwable {
                                                    // 1. get or create a transaction
                                                    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();


                                                    // 1.1 get transactionInfo
                                                    TransactionInfo txInfo = business.getTransactionInfo();
                                                    if (txInfo == null) {
                                                    throw new ShouldNeverHappenException("transactionInfo does not exist");
                                                    }
                                                    try {


                                                    // 2. begin transaction
                                                    beginTransaction(txInfo, tx);


                                                    Object rs = null;
                                                    try {


                                                    // Do Your Business
                                                    rs = business.execute();


                                                    } catch (Throwable ex) {


                                                    // 3.the needed business exception to rollback.
                                                    completeTransactionAfterThrowing(txInfo,tx,ex);
                                                    throw ex;
                                                    }


                                                    // 4. everything is fine, commit.
                                                    commitTransaction(tx);


                                                    return rs;
                                                    } finally {
                                                    //5. clear
                                                    triggerAfterCompletion();
                                                    cleanUp();
                                                    }
                                                    }
                                                    }

                                                    该模板的工作流程如下:

                                                    1. 看看当前是不是已经在一个分布式事务中了, 如果是, 则复用现存的全局事务, 否则创建新的

                                                      • 什么时候会出现已经存在全局事务的情况呢? 假设 A 调用了 B, A 创建了全局事务 GT1, B 碰巧也执行了上述的模板, 这时候 B 就不会创建新的全局事务, 而是使用 GT1, 这实际上是前面提到的事物的传播

                                                    2. 如果是自己创建的全局事务, 则发 RPC 开始事务, 如果不是自己创建的则什么都不干

                                                    3. 执行真正的业务逻辑

                                                    4. 如果发生了异常, 如果自己创建全局事务, 才负责回滚, 否则就只管异常外抛

                                                    5. 如果没发生异常, 如果自己创建全局事务, 才负责提交, 否则就什么都不做

                                                    6. 清理工作

                                                    我们看到, 该模板实际上是业务服务的完整执行流程, 那我们每次都要自己在代码中通过该模板接口执行自己的业务代码吗? 当然不用, 实际上 Seata 基于 Spring 切面, 已经帮我们做了这些事, 我们只需要使用 GlobalTransactional 注解就够了, 接下来我们看看这部分内容:

                                                      public class GlobalTransactionalInterceptor implements MethodInterceptor {
                                                      // 只保留核心代码
                                                      @Override
                                                      public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
                                                      Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
                                                      Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
                                                      final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);


                                                      final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
                                                      final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
                                                      if (globalTransactionalAnnotation != null) {
                                                      return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                                                      } else if (globalLockAnnotation != null) {
                                                      return handleGlobalLock(methodInvocation);
                                                      } else {
                                                      return methodInvocation.proceed();
                                                      }
                                                      }


                                                      private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
                                                      return globalLockTemplate.execute(() -> {
                                                      try {
                                                      return methodInvocation.proceed();
                                                      } catch (Throwable e) {
                                                      if (e instanceof Exception) {
                                                      throw (Exception)e;
                                                      } else {
                                                      throw new RuntimeException(e);
                                                      }
                                                      }
                                                      });
                                                      }


                                                      private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                                      final GlobalTransactional globalTrxAnno) throws Throwable {
                                                      try {
                                                      return transactionalTemplate.execute(new TransactionalExecutor() {
                                                      @Override
                                                      public Object execute() throws Throwable {
                                                      return methodInvocation.proceed();
                                                      }


                                                      public String name() {
                                                      String name = globalTrxAnno.name();
                                                      if (!StringUtils.isNullOrEmpty(name)) {
                                                      return name;
                                                      }
                                                      return formatMethod(methodInvocation.getMethod());
                                                      }


                                                      @Override
                                                      public TransactionInfo getTransactionInfo() {
                                                      TransactionInfo transactionInfo = new TransactionInfo();
                                                      transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                                                      transactionInfo.setName(name());
                                                      Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                                                      for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                                                      rollbackRules.add(new RollbackRule(rbRule));
                                                      }
                                                      for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                                                      rollbackRules.add(new RollbackRule(rbRule));
                                                      }
                                                      for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                                                      rollbackRules.add(new NoRollbackRule(rbRule));
                                                      }
                                                      for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                                                      rollbackRules.add(new NoRollbackRule(rbRule));
                                                      }
                                                      transactionInfo.setRollbackRules(rollbackRules);
                                                      return transactionInfo;
                                                      }
                                                      });
                                                      } catch (TransactionalExecutor.ExecutionException e) {
                                                      TransactionalExecutor.Code code = e.getCode();
                                                      switch (code) {
                                                      case RollbackDone:
                                                      throw e.getOriginalException();
                                                      case BeginFailure:
                                                      failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                                                      throw e.getCause();
                                                      case CommitFailure:
                                                      failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                                                      throw e.getCause();
                                                      case RollbackFailure:
                                                      failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                                                      throw e.getCause();
                                                      default:
                                                      throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);


                                                      }
                                                      }
                                                      }
                                                      }

                                                      我们可以看到, 上面的就是全局事务的拦截器, 它扫描 GlobalTransactional
                                                      GlobalLock
                                                      , 如果是 GlobalTransactional
                                                      则用 transactionalTemplate 来执行真正的业务代码, 此外还从注解中拿出配置的回滚条件, 超时时间等配置, 给 transactionalTemplate 使用。

                                                      大家也看到了这个拦截器中, 还有一个 globalLockTemplate, 实际上这个是在 RM 中使用的, 至于为什么, 使用我们在前面的理论环节已经介绍了, 这里就不赘述了, 而且该模板代码也很简单, 就是加锁->执行->放锁, 并且这里的加锁和放锁只是改变 Context 中的标志位, 真正通过 RPC 进行锁确认的过程, 我们后面会介绍。

                                                      至此, 我们知道了 TM 是如何觉察到全局事务需求(GlobalTransactional 注解), 如何创建全局事务(RPC 调用 TC 接口), 如何通过模板方法模式完成对业务的封装(GlobalTransactionTemplate)。那么还有一个问题, 事务信息是怎么传递的呢, TM 怎么将全局事务 XID 传递给 RPC 的提供者的呢? 这部分, 根据 RPC 框架的不同, 需要不同的实现, 但是本质上都是一样的, 拦截 RPC 的调用过程, 在 RPC 请求中加一个隐藏属性来存储 XID, RPC 的提供方从请求中获取到该隐藏属性, 然后存储在事务 Context 的 ThreadLocal 中, 我们以 Dubbo 为例, 看一看它是怎么做的:

                                                        @Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
                                                        public class TransactionPropagationFilter implements Filter {


                                                        private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);


                                                        @Override
                                                        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
                                                        String xid = RootContext.getXID();
                                                        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
                                                        if (LOGGER.isDebugEnabled()) {
                                                        LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
                                                        }
                                                        boolean bind = false;
                                                        if (xid != null) {
                                                        // 如果当前存在 xid 说明是 RPC 发起方, 将 xid 存在 RPC context 中, 它会随着 RPC request 一起发送
                                                        RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
                                                        } else {
                                                        // 否则他就是 RPC 提供方, 它从 rpc context 中拿到 xid, 并设置到事务 context 中
                                                        if (rpcXid != null) {
                                                        RootContext.bind(rpcXid);
                                                        bind = true;
                                                        if (LOGGER.isDebugEnabled()) {
                                                        LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                                                        }
                                                        }
                                                        }
                                                        try {
                                                        return invoker.invoke(invocation);


                                                        } finally {
                                                        // 最后清除事务 context 中绑定的 xid, 防止 ThreadLocal 内容污染下次调用
                                                        if (bind) {
                                                        String unbindXid = RootContext.unbind();
                                                        if (LOGGER.isDebugEnabled()) {
                                                        LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                                                        }
                                                        if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                                                        LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                                                        if (unbindXid != null) {
                                                        RootContext.bind(unbindXid);
                                                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                                                        }
                                                        }
                                                        }
                                                        }
                                                        }
                                                        }

                                                        看过我之前发的 Dubbo[5] 文章的同学, 可能还记得 Dubbo 是通过一个 Filter 的概念来表示 AOP 特性的,Filter 的注入基于 Dubbo 的 SPI, 而 Seata 这里所做的就是实现了一个 Dubbo Filter, 把事务 Context 和 RPCContext 中的数据做一下绑定。其他 RPC 框架的支持方案基本类似, 这里就不再赘述了, 值得一提的是, 如果全局事务中的各个微服务是通过 HTTP 请求来互相调用的话, 可以将 XID 存储在 HTTP Header 中, 在官方的提供的 Sample 中有一份样例代码:

                                                          // 实现 servlet filter, 这样服务提供者能从 Http request 中获取 xid 并绑定进事务 Context
                                                          @Component
                                                          public class SeataFilter implements Filter {
                                                          @Override
                                                          public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
                                                          HttpServletRequest req = (HttpServletRequest) servletRequest;
                                                          String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
                                                          boolean isBind = false;
                                                          if (StringUtils.isNotBlank(xid)) {
                                                          RootContext.bind(xid);
                                                          isBind = true;
                                                          }
                                                          try {
                                                          filterChain.doFilter(servletRequest, servletResponse);
                                                          } finally {
                                                          if (isBind) {
                                                          RootContext.unbind();
                                                          }
                                                          }
                                                          }
                                                          }
                                                          // 实现 ClientHttpRequestInterceptor, 服务请求方讲 xid 塞入 http request header
                                                          public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {


                                                          public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
                                                          HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
                                                          String xid = RootContext.getXID();
                                                          if (StringUtils.isNotEmpty(xid)) {
                                                          requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
                                                          }


                                                          return clientHttpRequestExecution.execute(requestWrapper, bytes);
                                                          }
                                                          }
                                                          // 找到所有 RestTemplate 将 SeataRestTemplateInterceptor 注入
                                                          @Configuration
                                                          public class SeataRestTemplateAutoConfiguration {
                                                          @Autowired(required = false)
                                                          private Collection<RestTemplate> restTemplates;
                                                          @Autowired
                                                          private SeataRestTemplateInterceptor seataRestTemplateInterceptor;


                                                          @PostConstruct
                                                          public void init() {
                                                          if (this.restTemplates != null) {
                                                          Iterator var1 = this.restTemplates.iterator();
                                                          while (var1.hasNext()) {
                                                          RestTemplate restTemplate = (RestTemplate) var1.next();
                                                          List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
                                                          interceptors.add(this.seataRestTemplateInterceptor);
                                                          restTemplate.setInterceptors(interceptors);
                                                          }
                                                          }
                                                          }
                                                          }

                                                          至此, TM 的重要功能就介绍完了, 值得注意的是事务的传播过程不仅仅是 TM -> RM, RM -> RM 也会进行。接下来, 我们看一看 Seata 两种分支事务类型 AT 和 TCC 的实现方案。

                                                          参考内容

                                                          [1] https://www.jianshu.com/p/4cb127b737cf

                                                          [2] https://mp.weixin.qq.com/s/EzmZ-DAi-hxJhRkFvFhlJQ

                                                          [3] https://my.oschina.net/keking/blog/3011509

                                                          [4] https://zhuanlan.zhihu.com/p/64484721

                                                          [5] https://zhuanlan.zhihu.com/p/61981170

                                                          [6] https://zhuanlan.zhihu.com/p/78269431

                                                          [7] https://zhuanlan.zhihu.com/p/72060852

                                                          [8] https://zhuanlan.zhihu.com/p/63552935

                                                          [9] https://www.bookstack.cn/read/fescar-0.4.0/spilt.1.0.md

                                                          [10] https://github.com/seata/seata/wiki

                                                          [11] https://juejin.im/post/5caabe29e51d452b1e719265


                                                          引用链接

                                                          [1]

                                                          issue: https://github.com/seata/seata/issues/1623

                                                          [2]

                                                          PR: https://github.com/seata/seata/pull/1629

                                                          [3]

                                                          issue: https://github.com/seata/seata/issues/1603

                                                          [4]

                                                          pr: https://github.com/seata/seata/pull/1605

                                                          [5]

                                                          Dubbo: /Dubbo

                                                          文章转载自贝贝猫技术分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                          评论