暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「DUBBO系列」集群容错之Failover

JAVA前线 2020-05-28
689

IT徐胖子原创本文未授权请勿转载


1 文章概述

假设服务提供者提供A服务,但是A服务并不稳定,如果服务消费者无法正常消费A服务就需要做降级处理,不再消费A服务而是返回一个mock值,这就是所谓服务降级。

但是在降级之前,消费者有可能重试消费服务A,或者直接返回空结果,或者延时重试调用,这就是所谓集群容错策略。集群容错策略有很多,本文我们分析故障转移策略。

Failover策略被称为故障转移策略,这是集群容错默认策略。当第一次消费服务失败后消费者会根据Failover策略选择其它生产者再次调用,默认重试请求2次。

    <beans>
    <dubbo:application name="xpz-consumer" >
    <dubbo:registry address="zookeeper://127.0.0.1:2181" >
    <dubbo:reference id="helloService" cluster="failover" retries="5" interface="com.itxpz.dubbo.demo.provider.HelloService" >
    </beans>
    复制
      public class Consumer {
      public static void main(String[] args) throws Exception {
      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
      context.start();
      HelloService helloService = (HelloService) context.getBean("helloService");
      String result = helloService.sayHello("IT徐胖子");
      System.out.println("==========客户端收到结果==========" + result);
      }
      }
      复制
        // 通过动态代理机制生成代理对象Proxy
        // Proxy持有InvokerInvocationHandler
        // InvokerInvocationHandler持有invoker = MockClusterInvoker(FailoverClusterInvoker)
        HelloService helloService = (HelloService) context.getBean("helloService");
        复制


        2 源码分析

          public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

          public FailoverClusterInvoker(Directory<T> directory) {
          super(directory);
          }


          @Override
          public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

          // 所有生产者invokers
          List<Invoker<T>> copyInvokers = invokers;
          checkInvokers(copyInvokers, invocation);
          String methodName = RpcUtils.getMethodName(invocation);


          // 获取重试次数
          int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
          if (len <= 0) {
          len = 1;
          }
          RpcException le = null;


          // 已经调用过的生产者
          List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
          Set<String> providers = new HashSet<String>(len);


          // 重试直到达到最大次数
          for (int i = 0; i < len; i++) {
          if (i > 0) {


          // 如果当前实例被销毁则抛出异常
          checkWhetherDestroyed();


          // 根据路由策略选出可用生产者Invokers
          copyInvokers = list(invocation);


          // 重新检查
          checkInvokers(copyInvokers, invocation);
          }


          // 负载均衡选择一个生产者invoker
          Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
          invoked.add(invoker);
          RpcContext.getContext().setInvokers((List) invoked);
          try {
          // 服务消费发起远程调用
          Result result = invoker.invoke(invocation);
          if (le != null && logger.isWarnEnabled()) {
          logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
          }
          return result;
          } catch (RpcException e) {
          // 业务异常直接抛出
          if (e.isBiz()) {
          throw e;
          }
          le = e;
          } catch (Throwable e) {
          le = new RpcException(e.getMessage(), e);
          } finally {
          providers.add(invoker.getUrl().getAddress());
          }
          }
          throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
          }
          }
          复制


          3 文章总结

          Failover策略是默认策略,所以我们在幂等设计时需要更加注意。例如服务生产者已经收到调用请求并已经处理成功,但是由于网络原因返回结果时间被消费者认为超时,所以消费者会继续重试调用,如果服务生产者没有做好幂等就会出现重复处理问题,这个问题值得我们关注。


          长按二维码关注更多精彩文章

          文章转载自JAVA前线,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论