暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dubbo源码分析(八)集群容错机制

清幽之地的博客 2021-04-20
455

前言

在上一章节,我们曾提到这样一个问题: 当调用服务失败后,我们怎么处理当前的请求?抛出异常亦或是重试?

为了解决这个问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。

一、合并

在服务引用的过程中,我们最终会将一个或多个服务提供者Invoker封装成服务目录对象,但最后还要将它合并转换成Cluster Invoker对象。 Invokerinvoker=cluster.join(directory);

这里的cluster就是扩展点自适应类,在Dubbo中默认是Failover,所以上面代码会调用到:

  1. public class FailoverCluster implements Cluster {


  2.    public final static String NAME = "failover";

  3.    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {

  4.        return new FailoverClusterInvoker<T>(directory);

  5.    }

  6. }

上面的代码很简单,所以最后的Invoker对象指向的是 FailoverClusterInvoker
实例。它也是一个Invoker,它继承了抽象的 AbstractClusterInvoker

我们看下 AbstractClusterInvoker
类中的invoke方法。

  1. public abstract class AbstractClusterInvoker<T> implements Invoker<T> {



  2.    public Result invoke(final Invocation invocation) throws RpcException {


  3.        LoadBalance loadbalance = null;

  4.        //调用服务目录,获取所有的服务提供者Invoker对象

  5.        List<Invoker<T>> invokers = directory.list(invocation);

  6.        if (invokers != null && !invokers.isEmpty()) {

  7.            //加载负载均衡组件

  8.            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).

  9.                getExtension(invokers.get(0).getUrl().

  10.                getMethodParameter(invocation.getMethodName(), "loadbalance", "random"));

  11.        }

  12.        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

  13.        //调用子类实现 ,不同的集群容错机制

  14.        return doInvoke(invocation, invokers, loadbalance);

  15.    }

  16. }

以上代码也很简单,我们分为三个步骤来看

  • 调用服务目录,获取所有的服务提供者列表

  • 加载负载均衡组件

  • 调用子类实现,转发请求

关于负载均衡我们后续再深入了解,这是只知道它负责从多个Invoker中选取一个返回就行。

二、集群容错策略

Dubbo为我们提供了多种集群容错机制。主要如下:

  • Failover Cluster - 失败自动切换

FailoverClusterInvoker在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。

  • Failfast Cluster - 快速失败

FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。

  • Failsafe Cluster - 失败安全

FailsafeClusterInvoker 当调用过程中出现异常时,仅会打印异常,而不会抛出异常。

  • Failback Cluster - 失败自动恢复

FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。

  • Forking Cluster - 并行调用多个服务提供者

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。

  • BroadcastClusterInvoker - 广播

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。

三、自动切换

FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。我们重点看它的 doInvoke
方法。

  1. public Result doInvoke(Invocation invocation,

  2.        final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {


  3.    List<Invoker<T>> copyinvokers = invokers;

  4.    //检查invokers是否为空

  5.    checkInvokers(copyinvokers, invocation);

  6.    //获取重试次数 这里默认是3次

  7.    int len = getUrl().getMethodParameter(invocation.getMethodName(), "retries",2) + 1;

  8.    if (len <= 0) {

  9.        len = 1;

  10.    }

  11.    //异常信息对象

  12.    RpcException le = null; // last exception.

  13.    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());

  14.    Set<String> providers = new HashSet<String>(len);


  15.    //循环调用 失败重试len次

  16.    for (int i = 0; i < len; i++) {

  17.        if (i > 0) {

  18.            checkWhetherDestroyed();

  19.            //重新获取服务提供者列表

  20.            copyinvokers = list(invocation);

  21.            //再次检查

  22.            checkInvokers(copyinvokers, invocation);

  23.        }

  24.        //通过loadbalance选取一个Invoker

  25.        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);

  26.        invoked.add(invoker);

  27.        RpcContext.getContext().setInvokers((List) invoked);

  28.        try {

  29.            //调用服务

  30.            Result result = invoker.invoke(invocation);

  31.            if (le != null && logger.isWarnEnabled()) {

  32.                logger.warn("");

  33.            }

  34.            return result;

  35.        } catch (RpcException e) {

  36.            if (e.isBiz()) {

  37.                throw e;

  38.            }

  39.            le = e;

  40.        } catch (Throwable e) {

  41.            le = new RpcException(e.getMessage(), e);

  42.        } finally {

  43.            providers.add(invoker.getUrl().getAddress());

  44.        }

  45.    }

  46.    //重试失败

  47.    throw new RpcException("");

  48. }

我们可以看到,它的重点是invoker的调用是在一个循环方法中。只要不return,就会一直调用,重试 len 次。我们总结下它的过程:

  • 检查invokers是否为空

  • 获取重试次数,默认为3

  • 进入循环

  • 如果是重试,再次获取服务提供者列表,并校验

  • 选取Invoker,并调用

  • 无异常,返回结果,循环结束

  • 捕获到异常,继续循环调用直至重试最大次数

四、快速失败

FailfastClusterInvoker就很简单了,它只会进行一次调用,失败后立即抛出异常。

  1. public Result doInvoke(Invocation invocation,

  2.        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {


  3.    checkInvokers(invokers, invocation);

  4.    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

  5.    try {

  6.        return invoker.invoke(invocation);

  7.    } catch (Throwable e) {

  8.        if (e instanceof RpcException && ((RpcException) e).isBiz()) {

  9.            throw (RpcException) e;

  10.        }

  11.        throw new RpcException("....");

  12.    }

  13. }

五、失败安全

FailsafeClusterInvoker跟上面这个差异不大,它调用失败后并不抛出异常。而是打印异常信息并返回一个空的结果对象。

  1. public Result doInvoke(Invocation invocation,

  2.    List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {


  3.    try {

  4.        checkInvokers(invokers, invocation);

  5.        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

  6.        return invoker.invoke(invocation);

  7.    } catch (Throwable e) {

  8.        logger.error("Failsafe ignore exception: " + e.getMessage(), e);

  9.        return new RpcResult();

  10.    }

  11. }

六、自动恢复

FailbackClusterInvoker 会在调用失败后,也是打印异常信息并返回一个空的结果对象,但是还没结束,它还会偷偷开启一个定时任务,再次去调用。

  1. protected Result doInvoke(Invocation invocation,

  2.        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {


  3.    try {

  4.        checkInvokers(invokers, invocation);

  5.        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

  6.        return invoker.invoke(invocation);

  7.    } catch (Throwable e) {

  8.        logger.error("Failback to invoke method " + invocation.getMethodName() + ",

  9.            wait for retry in background. Ignored exception: "

  10.                + e.getMessage() + ", ", e);

  11.        //添加失败信息

  12.        addFailed(invocation, this);

  13.        return new RpcResult();

  14.    }

  15. }

我们可以看到,调用失败后,除了打印异常信息和返回空结果对象之外,还有一个方法 addFailed
 它就是开启定时任务的地方。

1、开启定时任务

首先,定义一个包含2个线程的线程池对象。

Executors.newScheduledThreadPool(2,newNamedThreadFactory("failback-cluster-timer",true));

然后,延迟5秒后,每隔5秒调用 retryFailed
方法,直到调用成功。

  1. private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {

  2.    if (retryFuture == null) {

  3.        synchronized (this) {

  4.            if (retryFuture == null) {

  5.                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

  6.                    public void run() {

  7.                        try {

  8.                            //重试方法

  9.                            retryFailed();

  10.                        } catch (Throwable t) {

  11.                            logger.error("Unexpected error occur at collect statistic", t);

  12.                        }

  13.                    }

  14.                }, 5000, 5000, TimeUnit.MILLISECONDS);

  15.            }

  16.        }

  17.    }

  18.    //ConcurrentHashMap 添加失败任务

  19.    failed.put(invocation, router);

  20. }

最后,我们需要注意 failed.put(invocation,router);
 它将当前失败的任务添加到failed,它是一个ConcurrentHashMap对象。

2、重试

重试的逻辑也不复杂,从failed对象中获取失败的记录,调用即可。

  1. void retryFailed() {


  2.    //如果为空,说明已经没有了失败的任务

  3.    if (failed.size() == 0) {

  4.        return;

  5.    }

  6.    //遍历failed,对失败的调用进行重试

  7.    Set<Entry<Invocation, AbstractClusterInvoker<?>>> failedSet = failed.entrySet();    

  8.    for (Entry<Invocation, AbstractClusterInvoker<?>> entry : failedSet) {

  9.        Invocation invocation = entry.getKey();

  10.        Invoker<?> invoker = entry.getValue();

  11.        try {

  12.            // 再次进行调用

  13.            invoker.invoke(invocation);

  14.            // 调用成功后,从 failed 中移除 invoker

  15.            failed.remove(invocation);

  16.        } catch (Throwable e) {

  17.            logger.error("......", e);

  18.        }

  19.    }

  20. }

如上代码,其中的重点是调用成功后,要将invocation移除。当再次调用到这个方法,开头的条件判断成立,就直接返回,不再继续调用。

3、问题

实际上,这套自动恢复的机制是有点小问题的。只要有一次调用失败,就会开启定时任务不断重试调用,直至成功。但问题是,即便重试调用成功后,定时任务并不会关闭,会持续的调用 retryFailed
方法。虽然这个方法有个判断,会直接返回。

如果服务调用失败次数多了之后,就会有大量的线程以5s的间隔,不断调用这个方法。笔者建议,如果有此类需求,不要直接用Dubbo中的这个Cluster。最好利用SPI机制重写一个方法来实现。

七、并行调用

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。

  1. public Result doInvoke(final Invocation invocation,

  2.        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {


  3.    final List<Invoker<T>> selected;

  4.    //获取最大并行数 默认为2

  5.    final int forks = getUrl().getParameter("forks", 2);

  6.    //超时时间

  7.    final int timeout = getUrl().getParameter("timeout", 1000);

  8.    if (forks <= 0 || forks >= invokers.size()) {

  9.        selected = invokers;

  10.    } else {

  11.        selected = new ArrayList<Invoker<T>>();

  12.        //选择Invoker 并添加到selected

  13.        for (int i = 0; i < forks; i++) {

  14.            Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);

  15.            if (!selected.contains(invoker)) {//Avoid add the same invoker several times.

  16.                selected.add(invoker);

  17.            }

  18.        }

  19.    }

  20.    RpcContext.getContext().setInvokers((List) selected);

  21.    final AtomicInteger count = new AtomicInteger();

  22.    //阻塞队列 先进先出

  23.    final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();

  24.    for (final Invoker<T> invoker : selected) {

  25.        executor.execute(new Runnable() {

  26.            @Override

  27.            public void run() {

  28.                try {

  29.                    //调用服务 将结果放入队列

  30.                    Result result = invoker.invoke(invocation);

  31.                    ref.offer(result);

  32.                } catch (Throwable e) {

  33.                    //如果异常调用次数大于等于最大并行数

  34.                    int value = count.incrementAndGet();

  35.                    if (value >= selected.size()) {

  36.                        ref.offer(e);

  37.                    }

  38.                }

  39.            }

  40.        });

  41.    }

  42.    try {

  43.        //从队列中获取结果

  44.        Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);

  45.        if (ret instanceof Throwable) {

  46.            Throwable e = (Throwable) ret;

  47.            throw new RpcException("....");

  48.        }

  49.        return (Result) ret;

  50.    } catch (InterruptedException e) {

  51.        throw new RpcException(e.getMessage(), e);

  52.    }

  53. }

以上代码的重点就是阻塞队列LinkedBlockingQueue。如果有结果放入,poll方法会立即返回,完成整个调用。我们再总结下整体流程:

  • 获取最大并行数,默认为2;获取超时时间

  • 选择Invoker,并添加到selected

  • 通过newCachedThreadPool创建多个线程,调用服务。

  • 正常返回后,将结果offer到队列。此时调用流程结束,返回正常信息。

  • 调用服务异常后,判断异常次数是否大于等于最大并行数,条件成立则将异常信息offer到队列,此时调用流程结束,返回异常信息。

八、广播

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。

  1. public Result doInvoke(final Invocation invocation,

  2.        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {


  3.    checkInvokers(invokers, invocation);

  4.    RpcContext.getContext().setInvokers((List) invokers);

  5.    RpcException exception = null;

  6.    Result result = null;


  7.    //循环调用服务

  8.    for (Invoker<T> invoker : invokers) {

  9.        try {

  10.            result = invoker.invoke(invocation);

  11.        } catch (RpcException e) {

  12.            exception = e;

  13.            logger.warn(e.getMessage(), e);

  14.        } catch (Throwable e) {

  15.            exception = new RpcException(e.getMessage(), e);

  16.            logger.warn(e.getMessage(), e);

  17.        }

  18.    }

  19.    //异常

  20.    if (exception != null) {

  21.        throw exception;

  22.    }

  23.    return result;

  24. }


文章转载自清幽之地的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论