暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dubbo 服务引用

光明和黑暗互相吸引 2019-03-16
208

Dubbo 服务引用

服务引用


  • 大家都知道Dubbo是由consumer,provider,registry这三大部分组成



  • 那么consumer是如何发现provider并调用的呢,就是通过服务引用来实现的,也就是通过发现服务,然后进行调用


服务引用的流程 

  • dubbo服务引用的流程大概如上图,不难发现其流程跟dubbo服务暴露互逆,(关于Dubbo服务暴露Dubbo服务暴露)但最终也是通过invoker来完成我们服务引用

  • dubbo服务引用最终通过ProxyFactory将Invoker转化为调用的Service

  • dubbo服务引用过程与dubbo服务暴露相似,都是通过SPI,适配相应的协议,并将服务注册到注册中心,并最终完成服务引用

源码解析

初始化

  1. public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {


  2. //省略一部分代码


  3. //获取服务接口

  4. @Override

  5. public Object getObject() {

  6. return get();

  7. }


  8. @Override

  9. @SuppressWarnings({"unchecked"})

  10. public void afterPropertiesSet() throws Exception {

  11. //此处省略 配置校验代码

  12. Boolean b = isInit();

  13. if (b == null && getConsumer() != null) {

  14. b = getConsumer().isInit();

  15. }

  16. if (b != null && b) {

  17. //发现服务

  18. getObject();

  19. }

  20. }

  21. }

复制
  • 首先我们来看一下ReferenceBean, ReferenceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener这里同服务暴露一样,通过spring在初始化的时候进行服务引用

服务引用

  • 我们看到这里都调用了getObject()方法,其实是调用了ReferenceConfig中的get()方法,接下来我们一起看下ReferenceConfig中的get()方法

  1. public synchronized T get() {

  2. //配置校验

  3. checkAndUpdateSubConfigs();

  4. //如果该服务已被销毁,则抛出异常

  5. if (destroyed) {

  6. throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");

  7. }

  8. //如果服务为空,则进行初始化,否则直接返回

  9. if (ref == null) {

  10. init();

  11. }

  12. return ref;

  13. }

复制
  • 这里看到ReferenceConfig.get方法上加了一个锁,用来保证不会重复发现服务,而该方法的核心在于init()方法

  1. private void init() {

  2. if (initialized) {

  3. return;

  4. }

  5. initialized = true;

  6. checkStubAndLocal(interfaceClass);

  7. //校验mock

  8. checkMock(interfaceClass);

  9. Map<String, String> map = new HashMap<String, String>();


  10. //省略对参数解析设置 ...


  11. //创建代理对象

  12. ref = createProxy(map);


  13. ApplicationModel.initConsumerModel(getUniqueServiceName(), buildConsumerModel(attributes));

  14. }

复制
  • 这里通过对参数的解析来创建服务代理, createProxy()方法是整个服务引用初始化的关键

  1. private T createProxy(Map<String, String> map) {

  2. URL tmpUrl = new URL("temp", "localhost", 0, map);

  3. final boolean isJvmRefer;

  4. if (isInjvm() == null) {

  5. if (url != null && url.length() > 0) { // 如果指定了url,则不要进行本地引用

  6. isJvmRefer = false;

  7. } else {

  8. // 默认情况下,引用本地服务(如果有)

  9. isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);

  10. }

  11. } else {

  12. isJvmRefer = isInjvm();

  13. }


  14. if (isJvmRefer) {

  15. URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);

  16. invoker = refprotocol.refer(interfaceClass, url);

  17. if (logger.isInfoEnabled()) {

  18. logger.info("Using injvm service " + interfaceClass.getName());

  19. }

  20. } else {

  21. if (url != null && url.length() > 0) { // 用户指定的URL,可以是对等地址,也可以是注册中心的地址.

  22. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);

  23. if (us != null && us.length > 0) {

  24. for (String u : us) {

  25. URL url = URL.valueOf(u);

  26. if (StringUtils.isEmpty(url.getPath())) {

  27. url = url.setPath(interfaceName);

  28. }

  29. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {

  30. urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));

  31. } else {

  32. urls.add(ClusterUtils.mergeUrl(url, map));

  33. }

  34. }

  35. }

  36. } else { // x来自注册中心配置的URL

  37. checkRegistry();

  38. List<URL> us = loadRegistries(false);

  39. if (CollectionUtils.isNotEmpty(us)) {

  40. for (URL u : us) {

  41. URL monitorUrl = loadMonitor(u);

  42. if (monitorUrl != null) {

  43. map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));

  44. }

  45. urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));

  46. }

  47. }

  48. if (urls.isEmpty()) {

  49. throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" > to your spring config.");

  50. }

  51. }

  52. //这里的refprotocol.refer即通过registryProtocol来进行发现

  53. if (urls.size() == 1) {

  54. invoker = refprotocol.refer(interfaceClass, urls.get(0));

  55. } else {

  56. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();

  57. URL registryURL = null;

  58. for (URL url : urls) {

  59. invokers.add(refprotocol.refer(interfaceClass, url));

  60. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {

  61. registryURL = url; // 使用最后一个注册表网址

  62. }

  63. }

  64. if (registryURL != null) { // 注册表网址可用

  65. // 仅在寄存器的群集可用时才使用RegistryAwareCluster

  66. URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);

  67. //调用者包装关系将是:RegistryAwareClusterInvoker(StaticDirectory) - > FailoverClusterInvoker(RegistryDirectory,将执行路由) - > Invoker invoker = cluster.join(new StaticDirectory(u, invokers));

  68. } else { // 不是注册表网址,必须直接调用。

  69. //这里要注意 cluster 最终都会被包装成 MockClusterWrapper(SPI的依赖注入)

  70. invoker = cluster.join(new StaticDirectory(invokers));

  71. }

  72. }

  73. }


  74. Boolean c = check;

  75. if (c == null && consumer != null) {

  76. c = consumer.isCheck();

  77. }

  78. if (c == null) {

  79. c = true; // default true

  80. }

  81. if (c && !invoker.isAvailable()) {

  82. // 如果提供者暂时不可用,则允许消费者稍后重试

  83. initialized = false;

  84. throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());

  85. }

  86. if (logger.isInfoEnabled()) {

  87. logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());

  88. }

  89. /**

  90. * @since 2.7.0

  91. * ServiceData Store

  92. */

  93. MetadataReportService metadataReportService = null;

  94. if ((metadataReportService = getMetadataReportService()) != null) {

  95. URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);

  96. metadataReportService.publishConsumer(consumerURL);

  97. }

  98. // create service proxy

  99. return (T) proxyFactory.getProxy(invoker);

  100. }

复制
  • 这里可以看到dubbo在服务引用中也可以使用本地服务的发现,但是可看到这一块已经被标记为过时,我的理解是dubbo作为一个RPC框架,本地服务还通过dubbo去调用,肯定与dubbo本身的意义不相匹配,所以便不推荐使用

  • 这块代码我们可以发现同服务暴露一样,会将consumer注册到所有配置的注册中心上去,而refprotocol.refer则是服务引用的核心代码

  • cluster对invoker进行了一层包装,以便应对后续服务调用中出现的异常情况进行处理

  • 最后我们的invoker将通过代理工厂转换为可以调用的代理服务

RegistryProtocal中的refer

  1. @Override

  2. @SuppressWarnings("unchecked")

  3. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

  4. url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY);

  5. //获取注册中心

  6. Registry registry = registryFactory.getRegistry(url);

  7. //如果是注册中心的服务,直接返回注册中心类型的invoker

  8. if (RegistryService.class.equals(type)) {

  9. return proxyFactory.getInvoker((T) registry, type, url);

  10. }


  11. // group="a,b" or group="*"

  12. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));

  13. String group = qs.get(Constants.GROUP_KEY);

  14. if (group != null && group.length() > 0) {

  15. if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {

  16. return doRefer(getMergeableCluster(), registry, type, url);

  17. }

  18. }

  19. //发现服务

  20. return doRefer(cluster, registry, type, url);

  21. }


  22. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

  23. //创建并设置注册目录对象

  24. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

  25. directory.setRegistry(registry);

  26. directory.setProtocol(protocol);

  27. // all attributes of REFER_KEY

  28. Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

  29. URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);

  30. if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {

  31. directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));

  32. //注册服务

  33. registry.register(directory.getRegisteredConsumerUrl());

  34. }

  35. directory.buildRouterChain(subscribeUrl);

  36. //订阅服务

  37. directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,

  38. PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

  39. //装饰Invoker

  40. Invoker invoker = cluster.join(directory);

  41. ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);

  42. return invoker;

  43. }

复制
  • 在RegistryProtocal中,我们看到了cluster.join(directory),在ReferenceConfig中也出现过,在ReferenceConfig中没有注册中心的时候将直接使用装饰invoker,以供我们接下来服务调用来做集群容错

  • 服务引用在RegistryProtocal中的核心方法即为doRefer方法

RegistryDirectory

  1. /**

  2. * 将网址转换为调用者,如果网址已被引用,则不会重新引用。

  3. *

  4. * @param urls

  5. * @return invokers

  6. */

  7. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

  8. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();

  9. if (urls == null || urls.isEmpty()) {

  10. return newUrlInvokerMap;

  11. }

  12. Set<String> keys = new HashSet<>();

  13. String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);

  14. for (URL providerUrl : urls) {

  15. // 如果在参考侧配置协议,则仅选择匹配协议

  16. if (queryProtocols != null && queryProtocols.length() > 0) {

  17. boolean accept = false;

  18. String[] acceptProtocols = queryProtocols.split(",");

  19. for (String acceptProtocol : acceptProtocols) {

  20. if (providerUrl.getProtocol().equals(acceptProtocol)) {

  21. accept = true;

  22. break;

  23. }

  24. }

  25. if (!accept) {

  26. continue;

  27. }

  28. }

  29. if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {

  30. continue;

  31. }

  32. if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {

  33. logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +

  34. " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +

  35. " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +

  36. ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));

  37. continue;

  38. }

  39. URL url = mergeUrl(providerUrl);


  40. String key = url.toFullString(); // 参数URL已排序

  41. if (keys.contains(key)) { //重复的网址

  42. continue;

  43. }

  44. keys.add(key);

  45. // 缓存键是不与消费者方参数合并的URL,无论消费者如何组合参数,如果服务器URL更改,则再次引用

  46. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // 本地发现

  47. Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

  48. if (invoker == null) { // 不在缓存中,请再次发现

  49. try {

  50. boolean enabled = true;

  51. if (url.hasParameter(Constants.DISABLED_KEY)) {

  52. enabled = !url.getParameter(Constants.DISABLED_KEY, false);

  53. } else {

  54. enabled = url.getParameter(Constants.ENABLED_KEY, true);

  55. }

  56. if (enabled) {

  57. invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);

  58. }

  59. } catch (Throwable t) {

  60. logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);

  61. }

  62. if (invoker != null) { // Put new invoker in cache

  63. newUrlInvokerMap.put(key, invoker);

  64. }

  65. } else {

  66. newUrlInvokerMap.put(key, invoker);

  67. }

  68. }

  69. keys.clear();

  70. return newUrlInvokerMap;

  71. }

复制
  • 那我们的服务最后是如何通相应协议打开consumer和provider的链接呢,关键代码就在RegistryDirectory的toInvokers方法,将url转换成具体的invoker,这个方法在订阅服务的时候会被触发,并且这里做了一层缓存,防止服务被多次引用

DubboProtocal中的refer

  1. @Override

  2. public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

  3. optimizeSerialization(url);


  4. // create rpc invoker.

  5. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

  6. invokers.add(invoker);


  7. return invoker;

  8. }

复制
  • 这里我们以Dubbo协议为例,看到DubboProtocal中的refer很简单,就是创建一个netty客户端,与provider进行连接返回一个Invoker即完成了一次服务的引用

  • 最后通过ProxyFactory的字节码结束,生成代理的可供调用的服务,到这里dubbo服务引用的流程就结束了,可以看出服务引用与服务暴露的过程中有很多类似的地方,其中还有很多细节没有展开,这也将是后续学习的重点


文章转载自光明和黑暗互相吸引,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论