dubbo服务暴露

释放双眼,带上耳机,听听看~!

ServiceBean#onApplicationEvent(ContextRefreshedEvent)

ServiceConfig#export()

ServiceConfig#doExport()
首先校验该service的配置是否为空,则加载dubbo:provider、dubbo:module、dubbo:application缺省配置,若还为空则加载dubbo.properties的配置。
配置覆盖策略

ServiceConfig#doExportUrls()


1
2
3
4
5
6
7
8
9
1private void doExportUrls() {
2       //加载注册中心url
3        List<URL> registryURLs = loadRegistries(true);
4        for (ProtocolConfig protocolConfig : protocols) {
5            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
6        }
7    }
8
9

首先看AbstractInterfaceConfig#loadRegistries(boolean)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1protected List<URL> loadRegistries(boolean provider) {
2       //检验registry是否为空
3        checkRegistry();
4        //遍历所有registry
5        List<URL> registryList = new ArrayList<URL>();
6        if (registries != null && !registries.isEmpty()) {
7            for (RegistryConfig config : registries) {
8                /*省略代码,参数校验*/
9                if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
10                  //构造kv属性
11                    Map<String, String> map = new HashMap<String, String>();
12                    appendParameters(map, application);
13                    appendParameters(map, config);
14                    map.put("path", RegistryService.class.getName());
15                    map.put("dubbo", Version.getProtocolVersion());
16                    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
17                    if (ConfigUtils.getPid() > 0) {
18                        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
19                    }
20                    if (!map.containsKey("protocol")) {
21                        if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
22                            map.put("protocol", "remote");
23                        } else {
24                            map.put("protocol", "dubbo");
25                        }
26                    }
27                    //生成url对象
28                    List<URL> urls = UrlUtils.parseURLs(address, map);
29                    for (URL url : urls) {
30                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
31                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
32                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
33                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
34                            registryList.add(url);
35                        }
36                    }
37                }
38            }
39        }
40        return registryList;
41    }
42
43

生成的registry url如下图
dubbo服务暴露
接着回去看doExportUrls,循环调用ServiceConfig#doExportUrlsFor1Protocol(ProtocolConfig , List)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
1private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
2        String name = protocolConfig.getName();
3        if (name == null || name.length() == 0) {
4            name = "dubbo";
5        }
6        //把application、provider、protocol等配置读取到map中
7        Map<String, String> map = new HashMap<String, String>();
8        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
9        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
10        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
11        if (ConfigUtils.getPid() > 0) {
12            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
13        }
14        appendParameters(map, application);
15        appendParameters(map, module);
16        appendParameters(map, provider, Constants.DEFAULT_KEY);
17        appendParameters(map, protocolConfig);
18        appendParameters(map, this);
19        if (methods != null && !methods.isEmpty()) {
20            for (MethodConfig method : methods) {
21            /*省略代码,循环方法级别配置,一般不会配置方法的配置*/
22            } // end of methods for
23        }
24        //是否泛化实现http://dubbo.apache.org/zh-cn/docs/user/demos/generic-service.html
25        //普通service走else
26        if (ProtocolUtils.isGeneric(generic)) {
27            map.put(Constants.GENERIC_KEY, generic);
28            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
29        } else {
30            String revision = Version.getVersion(interfaceClass, version);
31            if (revision != null && revision.length() > 0) {
32                map.put("revision", revision);
33            }
34            //getWrapper通过javassist生成Wrapper类,保存到一个WRAPPER_MAP中
35            //最后拿到所有方法的名称,拼接成method1,method2...,放到最初的那个map中
36            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
37            if (methods.length == 0) {
38                logger.warn("NO method found in service interface " + interfaceClass.getName());
39                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
40            } else {
41                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
42            }
43        }
44        //令牌验证,http://dubbo.apache.org/zh-cn/docs/user/demos/token-authorization.html
45        if (!ConfigUtils.isEmpty(token)) {
46            if (ConfigUtils.isDefault(token)) {
47                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
48            } else {
49                map.put(Constants.TOKEN_KEY, token);
50            }
51        }
52        //是否本地方法,http://dubbo.apache.org/zh-cn/docs/user/demos/local-call.html
53        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
54            protocolConfig.setRegister(false);
55            map.put("notify", "false");
56        }
57        //开始了,服务暴露
58        String contextPath = protocolConfig.getContextpath();
59        //http协议的配置?一般为空串
60        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
61            contextPath = provider.getContextpath();
62        }
63
64        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
65        Integer port = this.findConfigedPorts(protocolConfig, name, map);、
66        //构造URL
67        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
68
69        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
70                .hasExtension(url.getProtocol())) {
71            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
72                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
73        }
74
75        String scope = url.getParameter(Constants.SCOPE_KEY);
76        // scope在dubbo.apache.org dubbo:service没有了?以前老的网站好像写了
77        // scope配置成none不暴露
78        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
79
80            // 不是配置remote就本地暴露(配置remote,表示只远程暴露)
81            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
82                exportLocal(url);
83            }
84            // 不是配置local就远程暴露(配置local,表示只本地暴露)
85            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
86                /*省略代码,logger*/
87                if (registryURLs != null && !registryURLs.isEmpty()) {
88                  //遍历所有注册中心
89                    for (URL registryURL : registryURLs) {
90                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
91                        //监控中心,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-monitor.html
92                        URL monitorUrl = loadMonitor(registryURL);
93                        if (monitorUrl != null) {
94                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
95                        }
96                        /*省略代码,logger*/
97                        // For providers, this is used to enable custom proxy to generate invoker
98                        String proxy = url.getParameter(Constants.PROXY_KEY);
99                        if (StringUtils.isNotEmpty(proxy)) {
100                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
101                        }
102                     //通过代理工厂获得invoker
103                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
104                        //生成invoker的委托
105                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
106                        //远程暴露
107                        Exporter<?> exporter = protocol.export(wrapperInvoker);
108                        exporters.add(exporter);
109                    }
110                } else {
111                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
112                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
113
114                    Exporter<?> exporter = protocol.export(wrapperInvoker);
115                    exporters.add(exporter);
116                }
117            }
118        }
119        this.urls.add(url);
120    }
121
122

小结

  1. 读取各种配置,生成注册中心URL;
  2. 读取各种配置,生成服务的URL;
  3. 通过代理工厂将服务ref对象转化成invoker对象,并且生成委托;
  4. 远程暴露,将多个提供者保存到exporters。

接下来,分别讲一下本地暴露和远程暴露的细节。
这边涉及到dubbo的Adaptive,其实就是通过传入的参数获得一个具体实现。

本地暴露

ServiceConfig#exportLocal(URL)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1private void exportLocal(URL url) {
2        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
3           //转换成local url
4            URL local = URL.valueOf(url.toFullString())
5                    .setProtocol(Constants.LOCAL_PROTOCOL)
6                    .setHost(LOCALHOST)
7                    .setPort(0);
8            //将class放到ThreadLocal
9            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
10            //获得ProxyFactory生成invoker,先往下看
11            //继续看protocol.export
12            Exporter<?> exporter = protocol.export(
13                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
14            //最终会将生成的exporter加入到ServiceConfig的实例对象exporters中
15            exporters.add(exporter);
16            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
17        }
18    }
19
20

ProxyFactory$Adaptive#getInvoker(java.lang.Object , java.lang.Class , com.alibaba.dubbo.common.URL )


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
2        if (arg2 == null) throw new IllegalArgumentException("url == null");
3        com.alibaba.dubbo.common.URL url = arg2;
4        //获得url中的协议
5        String extName = url.getParameter("proxy", "javassist");
6        if (extName == null)
7            /*省略代码,throw异常*/
8        //根据协议名称获得具体的ProxyFactory实现类,类似反射,ExtensionLoader为dubbo的扩展机制,这边不分析
9        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
10        //获得invoker
11        return extension.getInvoker(arg0, arg1, arg2);
12    }
13
14

包装类,StubProxyFactoryWrapper#getInvoker(T proxy, Class type, URL url)


1
2
3
4
5
1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
2        return proxyFactory.getInvoker(proxy, type, url);
3    }
4
5

实际调用类,JavassistProxyFactory#getInvoker(T proxy, Class type, URL url),默认协议Javassist


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
2        // TODO Wrapper类不能正确处理带$的类名
3        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
4        return new AbstractProxyInvoker<T>(proxy, type, url) {
5            @Override
6            protected Object doInvoke(T proxy, String methodName,
7                                      Class<?>[] parameterTypes,
8                                      Object[] arguments) throws Throwable {
9                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
10            }
11        };
12    }
13
14

生成的Wrapper中的invokeMethod方法如下所示,可以看一下AbstractProxyInvoker的调用过程就明白了:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException
2{
3   org.apache.dubbo.demo.provider.DemoServiceImpl w;
4   try{
5       w = ((org.apache.dubbo.demo.provider.DemoServiceImpl)$1);
6   }catch(Throwable e){
7       throw new IllegalArgumentException(e);
8   }
9   try{
10      if( "sayHello".equals( $2 )  &&  $3.length == 1 ) {
11          return ($w)w.sayHello((java.lang.String)$4[0]);
12      }
13      } catch(Throwable e) {      
14          throw new java.lang.reflect.InvocationTargetException(e);  
15  } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
16}
17
18

回来看protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)),和ProxyFactory一样动态生成了Protocol$Adaptive,Protocol$Adaptive#export(com.alibaba.dubbo.rpc.Invoker arg0)方法代码如下:


1
2
3
4
5
6
7
8
9
10
11
1public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
2        /*省略代码,参数null判断,throw异常*/
3        com.alibaba.dubbo.common.URL url = arg0.getUrl();
4        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
5        if (extName == null)
6            /*省略代码,throw异常*/
7        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
8        return extension.export(arg0);
9    }
10
11

上述代码中,Protocol为包装类,如下图所示:
dubbo服务暴露

调用了ProtocolFilterWrapper#export(Invoker invoker)


1
2
3
4
5
6
7
8
9
10
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2       //判断是不是registry协议
3        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4            return protocol.export(invoker);
5        }
6        //本地暴露走这
7        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
8    }
9
10

ProtocolFilterWrapper#buildInvokerChain(final Invoker invoker, String key, String group),生成责任链


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
2        Invoker<T> last = invoker;
3        //根据URL的属性获得相应Activate的过滤器
4        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
5        //生成调用的责任链
6        if (!filters.isEmpty()) {
7            for (int i = filters.size() - 1; i >= 0; i--) {
8                final Filter filter = filters.get(i);
9                final Invoker<T> next = last;
10                last = new Invoker<T>() {
11                    @Override
12                    public Class<T> getInterface() {
13                        return invoker.getInterface();
14                    }
15                    @Override
16                    public URL getUrl() {
17                        return invoker.getUrl();
18                    }
19                    @Override
20                    public boolean isAvailable() {
21                        return invoker.isAvailable();
22                    }
23                    @Override
24                    public Result invoke(Invocation invocation) throws RpcException {
25                        return filter.invoke(next, invocation);
26                    }
27                    @Override
28                    public void destroy() {
29                        invoker.destroy();
30                    }
31                    @Override
32                    public String toString() {
33                        return invoker.toString();
34                    }
35                };
36            }
37        }
38        return last;
39    }
40
41

回到ProtocolFilterWrapper#export(Invoker invoker),protocol.export调用了QosProtocolWrapper#export(Invoker),2.5.8 新版本增加了 QOS 模块,本地服务不涉及。


1
2
3
4
5
6
7
8
9
10
11
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2       //判断是不是registry协议
3        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4            startQosServer(invoker.getUrl());
5            return protocol.export(invoker);
6        }
7        //本地暴露走这
8        return protocol.export(invoker);
9    }
10
11

ProtocolListenerWrapper#export(Invoker),由于本地暴露protocol为InjvmProtocol。


1
2
3
4
5
6
7
8
9
10
11
12
13
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2       //判断是不是registry协议
3        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4            return protocol.export(invoker);
5        }
6        //本地暴露走这,先往下看
7        //然后构造ListenerExporterWrapper,也是把参数赋值
8        return new ListenerExporterWrapper<T>(protocol.export(invoker),
9                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
10                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
11    }
12
13

InjvmProtocol#export(Invoker),终于到最后了!!!


1
2
3
4
5
6
7
8
9
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2       //这边的ServiceKey为“接口名称:版本号”,如果没有版本号就为“接口名称”
3       //exporterMap是InjvmProtocol抽象父类的对象变量,protected final Map<String, Exporter<?>> exporterMap
4       //构造方法,把参数赋值到对象变量,并且exporterMap.put(key, this),this为InjvmExporter
5       //所以所有本地暴露服务都放在exporterMap中
6        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
7    }
8
9

回到ProtocolListenerWrapper#export(Invoker),构造ListenerExporterWrapper,把参数赋值。默认ExporterListener是空的。

最终回到本地暴露小节的开始,生成的Exporter如下图:
dubbo服务暴露

远程暴露

远程暴露是服务暴露的重点,涉及的内容比较多。
下面这段代码在上面提到过,远程暴露生成Invoker和本地暴露是类似的,只是URL不同。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
2        /*省略无关代码,具体代码在上面提到过*/
3        
4        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
5
6            // 不是配置remote就本地暴露(配置remote,表示只远程暴露)
7            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
8                exportLocal(url);
9            }
10            // 不是配置local就远程暴露(配置local,表示只本地暴露)
11            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
12                /*省略代码,logger*/
13                if (registryURLs != null && !registryURLs.isEmpty()) {
14                  //遍历所有注册中心
15                    for (URL registryURL : registryURLs) {
16                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
17                        //监控中心,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-monitor.html
18                        URL monitorUrl = loadMonitor(registryURL);
19                        if (monitorUrl != null) {
20                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
21                        }
22                        /*省略代码,logger*/
23                        // For providers, this is used to enable custom proxy to generate invoker
24                        String proxy = url.getParameter(Constants.PROXY_KEY);
25                        if (StringUtils.isNotEmpty(proxy)) {
26                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
27                        }
28                      //通过代理工厂获得invoker
29                      //registryURL.addParameterAndEncoded会将dubboURL以export为key加入到registryURL
30                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
31                        //生成invoker的委托,将invoker和ServiceConfig绑定
32                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
33                        //远程暴露
34                        Exporter<?> exporter = protocol.export(wrapperInvoker);
35                        exporters.add(exporter);
36                    }
37                } else {
38                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
39                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
40
41                    Exporter<?> exporter = protocol.export(wrapperInvoker);
42                    exporters.add(exporter);
43                }
44            }
45        }
46        this.urls.add(url);
47    }
48
49

远程暴露的核心类(这个流程会反复出现)为RegisterProtocol所以Protocol$Adaptive动态获得了RegisterProtocol,ProtocolFilterWrapper和本地暴露一致,只是协议不同,本地暴露为InjvmProtocol,所以直接看到RegistryProtocol#export(final Invoker)方法。


1
2
3
4
5
6
7
8
9
10
11
1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2        //导出服务部分,先往下看
3        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
4        
5        /*省略代码,下面会有写*/
6        
7        //保证每次export都返回一个新的exporter实例
8        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
9    }
10
11

RegistryProtocol#doLocalExport(final Invoker originInvoker)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
2       //从invoker中获得服务的URL,默认是dubbo://开头的URL
3        String key = getCacheKey(originInvoker);
4        //从缓存中获得exporter,如果已经暴露就不再暴露
5        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
6        //双重检查锁
7        if (exporter == null) {
8            synchronized (bounds) {
9                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
10                if (exporter == null) {
11                  //将originInvoker和服务的URL(默认dubbo://开头的URL)封装到一个委托中
12                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
13                    //生成Exporter流程与本地暴露相同
14                    //由于默认dubbo协议,所以protocol包装类最终的protocol为DubboProtocol,只是最后会调用DubboProtocol#export(Invoker<T> invoker)
15                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
16                    //写缓存
17                    bounds.put(key, exporter);
18                }
19            }
20        }
21        return exporter;
22    }
23
24

DubboProtocol#export(Invoker invoker)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2        URL url = invoker.getUrl();
3        // 和本地暴露一样,远程暴露DubboProtocol中也有一个exporterMap,记录了暴露的服务
4       // key由服务组名,服务名,服务版本号以及端口组成
5       //没有设置group和版本号,key为pers.congzhou.service.DemoService:20880
6       //如全设置为demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
7        String key = serviceKey(url);
8        //构造DubboExporter
9        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
10        //加入缓存
11        exporterMap.put(key, exporter);
12      //本地存根,http://dubbo.apache.org/zh-cn/docs/user/demos/local-stub.html
13        //跳过,消费者端为调度事件导出存根服务
14        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
15        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
16        if (isStubSupportEvent && !isCallbackservice) {
17            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
18            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
19                if (logger.isWarnEnabled()) {
20                    /*省略代码,logger*/
21                }
22            } else {
23                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
24            }
25        }
26      //启动服务器
27        openServer(url);
28        //优化序列化
29        optimizeSerialization(url);
30        return exporter;
31    }
32
33

启动服务器实例

DubboProtocol#openServer(URL url)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1private void openServer(URL url) {
2        // 获取地址(host:port),并将其作为服务器实例的 key,用于标识当前的服务器实例
3        String key = url.getAddress();
4        // 官方注释:client 也可以暴露一个只有server可以调用的服务。
5        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
6        if (isServer) {
7           //缓存,不存在就创建一个,默认只启动一个
8            ExchangeServer server = serverMap.get(key);
9            if (server == null) {
10                serverMap.put(key, createServer(url));
11            } else {
12                // 服务器已存在,则根据URL中的配置重置服务器
13                server.reset(url);
14            }
15        }
16    }
17
18

DubboProtocol#createServer(URL url)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1private ExchangeServer createServer(URL url) {
2       //往URL加配置
3        // 默认开启server关闭时发送readonly事件
4        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
5        // 默认开启heartbeat
6        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
7        //获得server参数,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-protocol.html
8        //dubbo协议缺省为netty,http协议缺省为servlet
9        //这边我配置了netty4
10        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
11
12        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
13            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
14        //配置协议编码方式
15        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
16        ExchangeServer server;
17        try {
18          //启动服务器
19          //Exchangers通过Adaptive机制获得Exchanger,默认为HeaderExchanger
20            server = Exchangers.bind(url, requestHandler);
21        } catch (RemotingException e) {
22            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
23        }
24        str = url.getParameter(Constants.CLIENT_KEY);
25        if (str != null && str.length() > 0) {
26            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
27            if (!supportedTypes.contains(str)) {
28                throw new RpcException("Unsupported client type: " + str);
29            }
30        }
31        return server;
32    }
33
34

HeaderExchanger#bind(URL url, ExchangeHandler handler)方法的三个逻辑:

  1. HeaderExchangeHandler代理了DubboProtocol#requestHandler

  2. DecodeHandler代理了HeaderExchangeHandler

  3. bind,创建 NettyServer


1
2
3
4
5
1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3    }
4
5

Transporters#bind(URL url, ChannelHandler… handlers)


1
2
3
4
5
6
7
8
9
10
11
12
13
1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2        /*省略代码,参数校验*/
3        ChannelHandler handler;
4        if (handlers.length == 1) {
5            handler = handlers[0];
6        } else {
7            handler = new ChannelHandlerDispatcher(handlers);
8        }
9        //getTransporter()获得Adaptive
10        return getTransporter().bind(url, handler);
11    }
12
13

Transporter$Adaptive#bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1)


1
2
3
4
5
6
7
8
9
10
11
12
1public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
2        if (arg0 == null) throw new IllegalArgumentException("url == null");
3        com.alibaba.dubbo.common.URL url = arg0;
4        String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
5        if (extName == null)
6            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
7        //根据server参数获得具体Transporter实例,我使用的server参数为netty4
8        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
9        return extension.bind(arg0, arg1);
10    }
11
12

com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter#bind(URL url, ChannelHandler listener)
netty启动的代码不写了,其中listener会被包装,当netty触发事件时会调用listener处理。


1
2
3
4
5
1public Server bind(URL url, ChannelHandler listener) throws RemotingException {
2        return new NettyServer(url, listener);
3    }
4
5

服务注册

回到远程暴露的核心类RegistryProtocol#export(final Invoker)
再贴一下代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2        //导出服务部分
3        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
4        //获得注册中心URL,zookeeper的URL为zookeeper://开头
5        URL registryUrl = getRegistryUrl(originInvoker);
6        // 获取 Registry,先往下看
7        final Registry registry = getRegistry(originInvoker);
8        //获得注册的URL,过滤了一些不需要的参数
9        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
10
11        //判断是否延迟发布
12        boolean register = registeredProviderUrl.getParameter("register", true);
13
14        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
15
16      //register过程,就刚刚回来的地方继续
17        if (register) {
18            register(registryUrl, registeredProviderUrl);
19            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
20        }
21
22        // 订阅override数据
23        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
24        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
25        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
26        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
27        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
28        //保证每次export都返回一个新的exporter实例
29        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
30    }
31
32

RegistryProtocol#getRegistry(final Invoker<?> originInvoker)


1
2
3
4
5
6
7
8
1private Registry getRegistry(final Invoker&lt;?&gt; originInvoker) {
2       //获得注册中心URL
3        URL registryUrl = getRegistryUrl(originInvoker);
4        //通过RegistryFactory$Adaptive获得具体实例,和其他Adaptive一样,就不贴代码了
5        return registryFactory.getRegistry(registryUrl);
6    }
7
8

zookeeper会获得ZookeeperRegistryFactory
AbstractRegistryFactory#getRegistry(URL url)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1public Registry getRegistry(URL url) {
2        url = url.setPath(RegistryService.class.getName())
3                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
4                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
5        String key = url.toServiceString();
6        // 加锁
7        LOCK.lock();
8        try {
9           // 缓存
10            Registry registry = REGISTRIES.get(key);
11            if (registry != null) {
12                return registry;
13            }
14            // 缓存未命中,创建 Registry 实例
15            registry = createRegistry(url);
16            if (registry == null) {
17                throw new IllegalStateException(&quot;Can not create registry &quot; + url);
18            }
19            // 写缓存
20            REGISTRIES.put(key, registry);
21            return registry;
22        } finally {
23            LOCK.unlock();
24        }
25    }
26
27

ZookeeperRegistryFactory#createRegistry(URL url)


1
2
3
4
5
1public Registry createRegistry(URL url) {
2        return new ZookeeperRegistry(url, zookeeperTransporter);
3    }
4
5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
2        super(url);
3        if (url.isAnyHost()) {
4            throw new IllegalStateException(&quot;registry address == null&quot;);
5        }
6        //获取组名,默认dubbo
7        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
8        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
9           //加上路径,/group
10            group = Constants.PATH_SEPARATOR + group;
11        }
12        this.root = group;
13        // 创建 Zookeeper 客户端,先往下看
14        zkClient = zookeeperTransporter.connect(url);
15        // 状态监听器
16        zkClient.addStateListener(new StateListener() {
17            @Override
18            public void stateChanged(int state) {
19                if (state == RECONNECTED) {
20                    try {
21                        recover();
22                    } catch (Exception e) {
23                        logger.error(e.getMessage(), e);
24                    }
25                }
26            }
27        });
28    }
29
30

ZookeeperTransporter$Adaptive,默认为 CuratorZookeeperTransporter,Apache Curator一个Zookeeper客户端。
CuratorZookeeperTransporter#connect(URL url)


1
2
3
4
5
1public ZookeeperClient connect(URL url) {
2        return new CuratorZookeeperClient(url);
3    }
4
5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1public CuratorZookeeperClient(URL url) {
2        super(url);
3        try {
4           // CuratorFramework 工厂,各种参数加进去
5            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
6                    .connectString(url.getBackupAddress())
7                    .retryPolicy(new RetryNTimes(1, 1000))
8                    .connectionTimeoutMs(5000);
9            String authority = url.getAuthority();
10            if (authority != null &amp;&amp; authority.length() &gt; 0) {
11                builder = builder.authorization(&quot;digest&quot;, authority.getBytes());
12            }
13            // 构建 CuratorFramework 实例
14            client = builder.build();
15            // 添加监听器
16            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
17                @Override
18                public void stateChanged(CuratorFramework client, ConnectionState state) {
19                    if (state == ConnectionState.LOST) {
20                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
21                    } else if (state == ConnectionState.CONNECTED) {
22                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
23                    } else if (state == ConnectionState.RECONNECTED) {
24                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
25                    }
26                }
27            });
28            // 启动客户端
29            client.start();
30        } catch (Exception e) {
31            throw new IllegalStateException(e.getMessage(), e);
32        }
33    }
34
35

回到服务暴露的核心类,在贴一下代码就不需要往上看啦


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public &lt;T&gt; Exporter&lt;T&gt; export(final Invoker&lt;T&gt; originInvoker) throws RpcException {
2        //导出服务部分
3        final ExporterChangeableWrapper&lt;T&gt; exporter = doLocalExport(originInvoker);
4        //获得注册中心URL,zookeeper的URL为zookeeper://开头
5        URL registryUrl = getRegistryUrl(originInvoker);
6        // 获取 Registry,先往下看
7        final Registry registry = getRegistry(originInvoker);
8        
9        //上面写到这边
10        //获得注册的URL,过滤了一些不需要的参数
11        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
12
13        //判断是否延迟发布
14        boolean register = registeredProviderUrl.getParameter(&quot;register&quot;, true);
15
16        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
17
18      //register过程
19        if (register) {
20          // 向注册中心注册服务
21            register(registryUrl, registeredProviderUrl);
22            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
23        }
24
25        // 订阅override数据
26        // 创建监听器,FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
27        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
28        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
29        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
30        // 向注册中心订阅
31        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
32        // 创建DestroyableExporter,保证每次export都返回一个新的exporter实例
33        return new DestroyableExporter&lt;T&gt;(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
34    }
35
36

RegistryProtocol#register(URL registryUrl, URL registedProviderUrl)


1
2
3
4
5
6
1 public void register(URL registryUrl, URL registedProviderUrl) {
2        Registry registry = registryFactory.getRegistry(registryUrl);
3        registry.register(registedProviderUrl);
4    }
5
6

ZookeeperRegistry的抽象父类FailbackRegistry#register(URL url)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1public void register(URL url) {
2       //加到一个hashset中
3        super.register(url);
4        failedRegistered.remove(url);
5        failedUnregistered.remove(url);
6        try {
7            // 向服务器端发送注册请求
8            doRegister(url);
9        } catch (Exception e) {
10            Throwable t = e;
11            // 如果打开启动检测,则直接抛出异常
12            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
13                    &amp;&amp; url.getParameter(Constants.CHECK_KEY, true)
14                    &amp;&amp; !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
15            boolean skipFailback = t instanceof SkipFailbackWrapperException;
16            if (check || skipFailback) {
17                if (skipFailback) {
18                    t = t.getCause();
19                }
20                throw new IllegalStateException(&quot;Failed to register &quot; + url + &quot; to registry &quot; + getUrl().getAddress() + &quot;, cause: &quot; + t.getMessage(), t);
21            } else {
22                logger.error(&quot;Failed to register &quot; + url + &quot;, waiting for retry, cause: &quot; + t.getMessage(), t);
23            }
24            // 将失败的注册请求记录到失败的列表中,定期重试
25            failedRegistered.add(url);
26        }
27    }
28
29

ZookeeperRegistry#doRegister(URL url)

  1. toUrlPath 方法生成节点路径,路径格式/${group}/${serviceInterface}/providers/${url}

  2. 通过Zookeeper客户端创建节点


1
2
3
4
5
6
7
8
9
1protected void doRegister(URL url) {
2        try {
3            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
4        } catch (Throwable e) {
5            throw new RpcException(&quot;Failed to register &quot; + url + &quot; to zookeeper &quot; + getUrl() + &quot;, cause: &quot; + e.getMessage(), e);
6        }
7    }
8
9

没有了!

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全资讯

广东省农村信用社联合社与亚信安全达成战略合作

2016-12-25 17:23:59

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索