暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dubbo 服务暴露

光明和黑暗互相吸引 2019-01-25
342

Dubbo 服务暴露

服务暴露


  • 大家都知道Dubbo是由consumer,provider,registry这三大部分组


  • 那么provider的如果将服务提供给consumer调用呢,就是通过服务暴露来实现的,也就是把我们原来单机架构中的接口,对外部暴露


服务暴露的流程


  • dubbo服务暴露的流程大概如上图,在dubbo中,所有的服务都会被包装成一个invoker,这一点也将贯穿今后整个学习

  • dubbo服务暴露可以理解为两部分:本地暴露,远程暴露

  • 本地暴露的接口通常用于我们直接invoke dubbo的接口,以及有些时候我们的服务既是provider又是consumer,避免远程调用造成的资源浪费

  • 远程暴露则是将服务信息注册到registry,并且将服务通过网络提供给其他应用调用


源码解析

初始化

  1. public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {

  2. private static final long serialVersionUID = 213195494150089726L;

  3.    /**

  4.     *此处省略其他代码

  5.     **/


  6.    @Override

  7.    public void onApplicationEvent(ContextRefreshedEvent event) {

  8.        if (!isExported() && !isUnexported()) {

  9.            if (logger.isInfoEnabled()) {

  10.                logger.info("The service ready on spring started. service: " + getInterface());

  11.            }

  12.            //服务暴露

  13.            export();

  14.        }

  15.    }


  16.    @Override

  17.    @SuppressWarnings({"unchecked", "deprecation"})

  18.    public void afterPropertiesSet() throws Exception {

  19.         //此处省略....

  20.        if (!supportedApplicationListener) {

  21.            //服务暴露

  22.            export();

  23.        }

  24.    }

  25. }

  • 首先我们来看一下ServiceBean,ServiceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener有没有觉得很熟悉,实现这几个类就能在spring初始化的时候do something

暴露服务

  • 我们看到这里都调用了export()方法

  1.    public synchronized void export() {

  2.        if (provider != null) {

  3.            if (export == null) {

  4.                export = provider.getExport();

  5.            }

  6.            if (delay == null) {

  7.                delay = provider.getDelay();

  8.            }

  9.        }

  10.        if (export != null && !export) {

  11.            return;

  12.        }


  13.        if (delay != null && delay > 0) {

  14.            delayExportExecutor.schedule(new Runnable() {

  15.                @Override

  16.                public void run() {

  17.                    doExport();

  18.                }

  19.            }, delay, TimeUnit.MILLISECONDS);

  20.        } else {

  21.            doExport();

  22.        }

  23.    }

  • 这里看到ServiceConfig.export方法上加了一个锁,用来保证不会重复暴露服务,抛开上面的逻辑判断,在第一次初始化的时候,是直接走到了doExport()方法

  1.    protected synchronized void doExport() {

  2.        //省略判断代码

  3.        doExportUrls();

  4.    }


  5.       private void doExportUrls() {

  6.        //加载注册中心的配置

  7.        List<URL> registryURLs = loadRegistries(true);

  8.        //把使用的协议注册到注册中心

  9.        for (ProtocolConfig protocolConfig : protocols) {

  10.            doExportUrlsFor1Protocol(protocolConfig, registryURLs);

  11.        }

  12.    }

  • 这里dubbo支持多协议,可以看到通过for循环可以把配置的多种协议都导出,进行暴露

  1.    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

  2.         //省略判断代码

  3.        // 导出服务

  4.        String contextPath = protocolConfig.getContextpath();

  5.        if ((contextPath == null || contextPath.length() == 0) && provider != null) {

  6.            contextPath = provider.getContextpath();

  7.        }


  8.        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);

  9.        Integer port = this.findConfigedPorts(protocolConfig, name, map);

  10.        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);


  11.        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)

  12.                .hasExtension(url.getProtocol())) {

  13.            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)

  14.                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);

  15.        }


  16.        String scope = url.getParameter(Constants.SCOPE_KEY);

  17.        // 没有配置时不导出

  18.        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

  19.            // 导出本地服务

  20.            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {

  21.                exportLocal(url);

  22.            }

  23.            // 导出远程服务

  24.            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {

  25.                if (logger.isInfoEnabled()) {

  26.                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);

  27.                }

  28.                if (registryURLs != null && !registryURLs.isEmpty()) {

  29.                    //将服务都注册到当前已有的注册中心上去

  30.                    for (URL registryURL : registryURLs) {

  31.                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));

  32.                        //判断是否有监控中心

  33.                        URL monitorUrl = loadMonitor(registryURL);

  34.                        if (monitorUrl != null) {

  35.                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());

  36.                        }

  37.                        if (logger.isInfoEnabled()) {

  38.                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);

  39.                        }

  40.                            //对于providers,这用于启用自定义代理以生成invoker

  41.                        String proxy = url.getParameter(Constants.PROXY_KEY);

  42.                        if (StringUtils.isNotEmpty(proxy)) {

  43.                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);

  44.                        }


  45.                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

  46.                        //包装调用者和所有元数据的Invoker包装器

  47.                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);


  48.                        Exporter<?> exporter = protocol.export(wrapperInvoker);

  49.                        exporters.add(exporter);

  50.                    }

  51.                } else {

  52.                    //没有注册中心直接暴露

  53.                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

  54.                    //包装调用者和所有元数据的Invoker包装器

  55.                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

  56.                    Exporter<?> exporter = protocol.export(wrapperInvoker);

  57.                    exporters.add(exporter);

  58.                }

  59.            }

  60.        }

  61.        this.urls.add(url);

  62.    }


  63.    //暴露本地服务

  64.    private void exportLocal(URL url) {

  65.        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {

  66.            //手动暴露一个本地服务

  67.            URL local = URL.valueOf(url.toFullString())

  68.                    .setProtocol(Constants.LOCAL_PROTOCOL)

  69.                    .setHost(LOCALHOST)

  70.                    .setPort(0);

  71.            Exporter<?> exporter = protocol.export(

  72.                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

  73.            exporters.add(exporter);

  74.            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");

  75.        }

  76.    }

  • 这你scope配置默认值是null,则本地服务和远程服务都导出,另外如果没有配置注册中心,将直接将接口暴露出去,我们可以根据自己所在的场景,选择都暴露还是指定暴露

  • 由于dubbo也是支持多注册中心的,所以可以通过for循环,将多个服务都注册到当前已有的注册中心上去

  • 在exportLocal方法这是将配置中解析好的url参数手动修改成本地协议进行服务暴露

  • ProxyFactory是通过SPI获取JavassistProxyFactory靠Javassist字节码技术动态的生成Invoker类,大家有兴趣的可以下去了解一下

暴露的细节

  1. public class ProtocolFilterWrapper implements Protocol {


  2.    private final Protocol protocol;

  3.    //装饰者模式

  4.    public ProtocolFilterWrapper(Protocol protocol) {

  5.        if (protocol == null) {

  6.            throw new IllegalArgumentException("protocol == null");

  7.        }

  8.        this.protocol = protocol;

  9.    }

  10. }

  • 在ServiceConfig中,通过SPI获取相应的Protocol,SPI中会对实现类进行装饰,每次执行protocol.exprot()方法的时候,其实都是执行的ProtocolFilterWrapper的protocol.exprot方法

  1.    @Override

  2.    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

  3.        //如果是注册中心协议直接导出

  4.        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {

  5.            return protocol.export(invoker);

  6.        }

  7.        //如果不是则执行整个filter的责任链

  8.        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));

  9.    }


  10.    //责任链模式,对filter进行逐个执行

  11.    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

  12.        Invoker<T> last = invoker;

  13.        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

  14.        if (!filters.isEmpty()) {

  15.            for (int i = filters.size() - 1; i >= 0; i--) {

  16.                final Filter filter = filters.get(i);

  17.                final Invoker<T> next = last;

  18.                last = new Invoker<T>() {


  19.                    @Override

  20.                    public Class<T> getInterface() {

  21.                        return invoker.getInterface();

  22.                    }


  23.                    @Override

  24.                    public URL getUrl() {

  25.                        return invoker.getUrl();

  26.                    }


  27.                    @Override

  28.                    public boolean isAvailable() {

  29.                        return invoker.isAvailable();

  30.                    }


  31.                    @Override

  32.                    public Result invoke(Invocation invocation) throws RpcException {

  33.                        Result result = filter.invoke(next, invocation);

  34.                        if (result instanceof AsyncRpcResult) {

  35.                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;

  36.                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));

  37.                            return asyncResult;

  38.                        } else {

  39.                            return filter.onResponse(result, invoker, invocation);

  40.                        }

  41.                    }


  42.                    @Override

  43.                    public void destroy() {

  44.                        invoker.destroy();

  45.                    }


  46.                    @Override

  47.                    public String toString() {

  48.                        return invoker.toString();

  49.                    }

  50.                };

  51.            }

  52.        }

  53.        return last;

  54.    }

  • 由上述代码我们可以看到,dubbo中运用装饰者模式和责任链模式,对我们提供的服务做了一次封装,最终转换成我们需要的invoker对外暴露

  • 要注意到的是,当我们的协议是registry也就是注册协议的时候,是不需要进行构建责任链的

本地暴露

  1.    @Override

  2.    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

  3.        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);

  4.    }    

  • 如果是本地暴露,则通过SPI拿到InjvmProtocol,最终通过injvm协议导出InjvmExporter

远程暴露

  1.    @Override

  2.    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

  3.        URL url = invoker.getUrl();

  4.        // 导出服务.

  5.        String key = serviceKey(url);

  6.        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

  7.        exporterMap.put(key, exporter);

  8.        //导出根服务以进行调度事件

  9.        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);

  10.        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);

  11.        if (isStubSupportEvent && !isCallbackservice) {

  12.            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);

  13.            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {

  14.                if (logger.isWarnEnabled()) {

  15.                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +

  16.                            "], has set stubproxy support event ,but no stub methods founded."));

  17.                }

  18.            } else {

  19.                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);

  20.            }

  21.        }

  22.        //打开服务

  23.        openServer(url);

  24.        optimizeSerialization(url);

  25.        return exporter;

  26.    }    


  27.    //打开服务

  28.    private void openServer(URL url) {

  29.        // find server.

  30.        String key = url.getAddress();

  31.        //客户端可以导出仅供服务器调用的服务

  32.        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);

  33.        if (isServer) {

  34.            ExchangeServer server = serverMap.get(key);

  35.            if (server == null) {

  36.                synchronized (this) {

  37.                    server = serverMap.get(key);

  38.                    if (server == null) {

  39.                        //如果服务不存在,创建服务

  40.                        serverMap.put(key, createServer(url));

  41.                    }

  42.                }

  43.            } else {

  44.                // 服务器支持重置,与override一起使用

  45.                server.reset(url);

  46.            }

  47.        }

  48.    }

  • 在dubbo协议中,我们看到在服务导出的时候会根据配置地址,打开netty服务,也就是通过这一步,开启了RPC端口,使consumer通过TCP协议进行服务调用

服务注册

  1.    @Override

  2.    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

  3.        URL registryUrl = getRegistryUrl(originInvoker);

  4.        // url在本地导出

  5.        URL providerUrl = getProviderUrl(originInvoker);

  6.        // 订阅覆盖数据

  7.        // 同样的服务由于订阅是带有服务名称的缓存密钥,因此会导致订阅信息覆盖。

  8.        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);

  9.        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);

  10.        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);


  11.        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

  12.        //导出invoker

  13.        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

  14.        //将url注册到注册中心

  15.        final Registry registry = getRegistry(originInvoker);

  16.        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);

  17.        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,

  18.                registryUrl, registeredProviderUrl);

  19.        //判断我们是否需要推迟发布

  20.        boolean register = registeredProviderUrl.getParameter("register", true);

  21.        if (register) {

  22.            register(registryUrl, registeredProviderUrl);

  23.            providerInvokerWrapper.setReg(true);

  24.        }

  25.        // Deprecated! Subscribe to override rules in 2.6.x or before.

  26.        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

  27.        exporter.setRegisterUrl(registeredProviderUrl);

  28.        exporter.setSubscribeUrl(overrideSubscribeUrl);

  29.        //确保每次导出时都返回一个新的导出器实例

  30.        return new DestroyableExporter<>(exporter);

  31.    }


  32.    //真正导出服务的地方

  33.     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {

  34.        String key = getCacheKey(originInvoker);

  35.        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);

  36.        if (exporter == null) {

  37.            synchronized (bounds) {

  38.                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);

  39.                if (exporter == null) {


  40.                    final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);

  41.                    //以dubbo协议为例,这里才是真正调用DubboProtocol.exprot的地方

  42.                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);

  43.                    bounds.put(key, exporter);

  44.                }

  45.            }

  46.        }

  47.        return exporter;

  48.    }

  • 大家这里可以跟进源码,会发现,在ServiceConfig中通过proxyFactory生成的Invoker的url指向的协议其实是registry,所以在ServiceConfig中protocol.exprot调用的是RegistryProtocol的exprot方法

  • 在RegistryProtocol中调用了真正的远程服务暴露的方法,即DubboProtocol(以dubbo协议为例),在远程服务暴露成功后,将服务信息注册到registry上去,由此完成了一个服务的导出

  • 至此Dubbo服务暴露中的大致流程已经完成了,后面将会对Dubbo如何通过ProxyFactory生成Invoker,以及Registry是如何进行注册的进行更加深入的学习


文章转载自光明和黑暗互相吸引,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论