ServiceBean#onApplicationEvent(ContextRefreshedEvent)
ServiceConfig#export()
ServiceConfig#doExport()
首先校验该service的配置是否为空,则加载dubbo:provider、dubbo:module、dubbo:application缺省配置,若还为空则加载dubbo.properties的配置。
配置覆盖策略
ServiceConfig#doExportUrls()
1
2
3
4
5
6
7
8
9 1private void doExportUrls() {
2 //加载注册中心url
3 List<URL> registryURLs = loadRegistries(true);
4 for (ProtocolConfig protocolConfig : protocols) {
5 doExportUrlsFor1Protocol(protocolConfig, registryURLs);
6 }
7 }
8
9
首先看AbstractInterfaceConfig#loadRegistries(boolean)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1protected List<URL> loadRegistries(boolean provider) {
2 //检验registry是否为空
3 checkRegistry();
4 //遍历所有registry
5 List<URL> registryList = new ArrayList<URL>();
6 if (registries != null && !registries.isEmpty()) {
7 for (RegistryConfig config : registries) {
8 /*省略代码,参数校验*/
9 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
10 //构造kv属性
11 Map<String, String> map = new HashMap<String, String>();
12 appendParameters(map, application);
13 appendParameters(map, config);
14 map.put("path", RegistryService.class.getName());
15 map.put("dubbo", Version.getProtocolVersion());
16 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
17 if (ConfigUtils.getPid() > 0) {
18 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
19 }
20 if (!map.containsKey("protocol")) {
21 if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
22 map.put("protocol", "remote");
23 } else {
24 map.put("protocol", "dubbo");
25 }
26 }
27 //生成url对象
28 List<URL> urls = UrlUtils.parseURLs(address, map);
29 for (URL url : urls) {
30 url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
31 url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
32 if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
33 || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
34 registryList.add(url);
35 }
36 }
37 }
38 }
39 }
40 return registryList;
41 }
42
43
生成的registry url如下图
接着回去看doExportUrls,循环调用ServiceConfig#doExportUrlsFor1Protocol(ProtocolConfig , List)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 1private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
2 String name = protocolConfig.getName();
3 if (name == null || name.length() == 0) {
4 name = "dubbo";
5 }
6 //把application、provider、protocol等配置读取到map中
7 Map<String, String> map = new HashMap<String, String>();
8 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
9 map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
10 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
11 if (ConfigUtils.getPid() > 0) {
12 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
13 }
14 appendParameters(map, application);
15 appendParameters(map, module);
16 appendParameters(map, provider, Constants.DEFAULT_KEY);
17 appendParameters(map, protocolConfig);
18 appendParameters(map, this);
19 if (methods != null && !methods.isEmpty()) {
20 for (MethodConfig method : methods) {
21 /*省略代码,循环方法级别配置,一般不会配置方法的配置*/
22 } // end of methods for
23 }
24 //是否泛化实现http://dubbo.apache.org/zh-cn/docs/user/demos/generic-service.html
25 //普通service走else
26 if (ProtocolUtils.isGeneric(generic)) {
27 map.put(Constants.GENERIC_KEY, generic);
28 map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
29 } else {
30 String revision = Version.getVersion(interfaceClass, version);
31 if (revision != null && revision.length() > 0) {
32 map.put("revision", revision);
33 }
34 //getWrapper通过javassist生成Wrapper类,保存到一个WRAPPER_MAP中
35 //最后拿到所有方法的名称,拼接成method1,method2...,放到最初的那个map中
36 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
37 if (methods.length == 0) {
38 logger.warn("NO method found in service interface " + interfaceClass.getName());
39 map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
40 } else {
41 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
42 }
43 }
44 //令牌验证,http://dubbo.apache.org/zh-cn/docs/user/demos/token-authorization.html
45 if (!ConfigUtils.isEmpty(token)) {
46 if (ConfigUtils.isDefault(token)) {
47 map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
48 } else {
49 map.put(Constants.TOKEN_KEY, token);
50 }
51 }
52 //是否本地方法,http://dubbo.apache.org/zh-cn/docs/user/demos/local-call.html
53 if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
54 protocolConfig.setRegister(false);
55 map.put("notify", "false");
56 }
57 //开始了,服务暴露
58 String contextPath = protocolConfig.getContextpath();
59 //http协议的配置?一般为空串
60 if ((contextPath == null || contextPath.length() == 0) && provider != null) {
61 contextPath = provider.getContextpath();
62 }
63
64 String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
65 Integer port = this.findConfigedPorts(protocolConfig, name, map);、
66 //构造URL
67 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
68
69 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
70 .hasExtension(url.getProtocol())) {
71 url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
72 .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
73 }
74
75 String scope = url.getParameter(Constants.SCOPE_KEY);
76 // scope在dubbo.apache.org dubbo:service没有了?以前老的网站好像写了
77 // scope配置成none不暴露
78 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
79
80 // 不是配置remote就本地暴露(配置remote,表示只远程暴露)
81 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
82 exportLocal(url);
83 }
84 // 不是配置local就远程暴露(配置local,表示只本地暴露)
85 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
86 /*省略代码,logger*/
87 if (registryURLs != null && !registryURLs.isEmpty()) {
88 //遍历所有注册中心
89 for (URL registryURL : registryURLs) {
90 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
91 //监控中心,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-monitor.html
92 URL monitorUrl = loadMonitor(registryURL);
93 if (monitorUrl != null) {
94 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
95 }
96 /*省略代码,logger*/
97 // For providers, this is used to enable custom proxy to generate invoker
98 String proxy = url.getParameter(Constants.PROXY_KEY);
99 if (StringUtils.isNotEmpty(proxy)) {
100 registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
101 }
102 //通过代理工厂获得invoker
103 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
104 //生成invoker的委托
105 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
106 //远程暴露
107 Exporter<?> exporter = protocol.export(wrapperInvoker);
108 exporters.add(exporter);
109 }
110 } else {
111 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
112 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
113
114 Exporter<?> exporter = protocol.export(wrapperInvoker);
115 exporters.add(exporter);
116 }
117 }
118 }
119 this.urls.add(url);
120 }
121
122
小结
- 读取各种配置,生成注册中心URL;
- 读取各种配置,生成服务的URL;
- 通过代理工厂将服务ref对象转化成invoker对象,并且生成委托;
- 远程暴露,将多个提供者保存到exporters。
接下来,分别讲一下本地暴露和远程暴露的细节。
这边涉及到dubbo的Adaptive,其实就是通过传入的参数获得一个具体实现。
本地暴露
ServiceConfig#exportLocal(URL)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1private void exportLocal(URL url) {
2 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
3 //转换成local url
4 URL local = URL.valueOf(url.toFullString())
5 .setProtocol(Constants.LOCAL_PROTOCOL)
6 .setHost(LOCALHOST)
7 .setPort(0);
8 //将class放到ThreadLocal
9 ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
10 //获得ProxyFactory生成invoker,先往下看
11 //继续看protocol.export
12 Exporter<?> exporter = protocol.export(
13 proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
14 //最终会将生成的exporter加入到ServiceConfig的实例对象exporters中
15 exporters.add(exporter);
16 logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
17 }
18 }
19
20
ProxyFactory$Adaptive#getInvoker(java.lang.Object , java.lang.Class , com.alibaba.dubbo.common.URL )
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
2 if (arg2 == null) throw new IllegalArgumentException("url == null");
3 com.alibaba.dubbo.common.URL url = arg2;
4 //获得url中的协议
5 String extName = url.getParameter("proxy", "javassist");
6 if (extName == null)
7 /*省略代码,throw异常*/
8 //根据协议名称获得具体的ProxyFactory实现类,类似反射,ExtensionLoader为dubbo的扩展机制,这边不分析
9 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
10 //获得invoker
11 return extension.getInvoker(arg0, arg1, arg2);
12 }
13
14
包装类,StubProxyFactoryWrapper#getInvoker(T proxy, Class type, URL url)
1
2
3
4
5 1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
2 return proxyFactory.getInvoker(proxy, type, url);
3 }
4
5
实际调用类,JavassistProxyFactory#getInvoker(T proxy, Class type, URL url),默认协议Javassist
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
2 // TODO Wrapper类不能正确处理带$的类名
3 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
4 return new AbstractProxyInvoker<T>(proxy, type, url) {
5 @Override
6 protected Object doInvoke(T proxy, String methodName,
7 Class<?>[] parameterTypes,
8 Object[] arguments) throws Throwable {
9 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
10 }
11 };
12 }
13
14
生成的Wrapper中的invokeMethod方法如下所示,可以看一下AbstractProxyInvoker的调用过程就明白了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException
2{
3 org.apache.dubbo.demo.provider.DemoServiceImpl w;
4 try{
5 w = ((org.apache.dubbo.demo.provider.DemoServiceImpl)$1);
6 }catch(Throwable e){
7 throw new IllegalArgumentException(e);
8 }
9 try{
10 if( "sayHello".equals( $2 ) && $3.length == 1 ) {
11 return ($w)w.sayHello((java.lang.String)$4[0]);
12 }
13 } catch(Throwable e) {
14 throw new java.lang.reflect.InvocationTargetException(e);
15 } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
16}
17
18
回来看protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)),和ProxyFactory一样动态生成了Protocol$Adaptive,Protocol$Adaptive#export(com.alibaba.dubbo.rpc.Invoker arg0)方法代码如下:
1
2
3
4
5
6
7
8
9
10
11 1public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
2 /*省略代码,参数null判断,throw异常*/
3 com.alibaba.dubbo.common.URL url = arg0.getUrl();
4 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
5 if (extName == null)
6 /*省略代码,throw异常*/
7 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
8 return extension.export(arg0);
9 }
10
11
上述代码中,Protocol为包装类,如下图所示:
调用了ProtocolFilterWrapper#export(Invoker invoker)
1
2
3
4
5
6
7
8
9
10 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 //判断是不是registry协议
3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4 return protocol.export(invoker);
5 }
6 //本地暴露走这
7 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
8 }
9
10
ProtocolFilterWrapper#buildInvokerChain(final Invoker invoker, String key, String group),生成责任链
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
2 Invoker<T> last = invoker;
3 //根据URL的属性获得相应Activate的过滤器
4 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
5 //生成调用的责任链
6 if (!filters.isEmpty()) {
7 for (int i = filters.size() - 1; i >= 0; i--) {
8 final Filter filter = filters.get(i);
9 final Invoker<T> next = last;
10 last = new Invoker<T>() {
11 @Override
12 public Class<T> getInterface() {
13 return invoker.getInterface();
14 }
15 @Override
16 public URL getUrl() {
17 return invoker.getUrl();
18 }
19 @Override
20 public boolean isAvailable() {
21 return invoker.isAvailable();
22 }
23 @Override
24 public Result invoke(Invocation invocation) throws RpcException {
25 return filter.invoke(next, invocation);
26 }
27 @Override
28 public void destroy() {
29 invoker.destroy();
30 }
31 @Override
32 public String toString() {
33 return invoker.toString();
34 }
35 };
36 }
37 }
38 return last;
39 }
40
41
回到ProtocolFilterWrapper#export(Invoker invoker),protocol.export调用了QosProtocolWrapper#export(Invoker),2.5.8 新版本增加了 QOS 模块,本地服务不涉及。
1
2
3
4
5
6
7
8
9
10
11 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 //判断是不是registry协议
3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4 startQosServer(invoker.getUrl());
5 return protocol.export(invoker);
6 }
7 //本地暴露走这
8 return protocol.export(invoker);
9 }
10
11
ProtocolListenerWrapper#export(Invoker),由于本地暴露protocol为InjvmProtocol。
1
2
3
4
5
6
7
8
9
10
11
12
13 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 //判断是不是registry协议
3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4 return protocol.export(invoker);
5 }
6 //本地暴露走这,先往下看
7 //然后构造ListenerExporterWrapper,也是把参数赋值
8 return new ListenerExporterWrapper<T>(protocol.export(invoker),
9 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
10 .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
11 }
12
13
InjvmProtocol#export(Invoker),终于到最后了!!!
1
2
3
4
5
6
7
8
9 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 //这边的ServiceKey为“接口名称:版本号”,如果没有版本号就为“接口名称”
3 //exporterMap是InjvmProtocol抽象父类的对象变量,protected final Map<String, Exporter<?>> exporterMap
4 //构造方法,把参数赋值到对象变量,并且exporterMap.put(key, this),this为InjvmExporter
5 //所以所有本地暴露服务都放在exporterMap中
6 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
7 }
8
9
回到ProtocolListenerWrapper#export(Invoker),构造ListenerExporterWrapper,把参数赋值。默认ExporterListener是空的。
最终回到本地暴露小节的开始,生成的Exporter如下图:
远程暴露
远程暴露是服务暴露的重点,涉及的内容比较多。
下面这段代码在上面提到过,远程暴露生成Invoker和本地暴露是类似的,只是URL不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
2 /*省略无关代码,具体代码在上面提到过*/
3
4 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
5
6 // 不是配置remote就本地暴露(配置remote,表示只远程暴露)
7 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
8 exportLocal(url);
9 }
10 // 不是配置local就远程暴露(配置local,表示只本地暴露)
11 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
12 /*省略代码,logger*/
13 if (registryURLs != null && !registryURLs.isEmpty()) {
14 //遍历所有注册中心
15 for (URL registryURL : registryURLs) {
16 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
17 //监控中心,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-monitor.html
18 URL monitorUrl = loadMonitor(registryURL);
19 if (monitorUrl != null) {
20 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
21 }
22 /*省略代码,logger*/
23 // For providers, this is used to enable custom proxy to generate invoker
24 String proxy = url.getParameter(Constants.PROXY_KEY);
25 if (StringUtils.isNotEmpty(proxy)) {
26 registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
27 }
28 //通过代理工厂获得invoker
29 //registryURL.addParameterAndEncoded会将dubboURL以export为key加入到registryURL
30 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
31 //生成invoker的委托,将invoker和ServiceConfig绑定
32 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
33 //远程暴露
34 Exporter<?> exporter = protocol.export(wrapperInvoker);
35 exporters.add(exporter);
36 }
37 } else {
38 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
39 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
40
41 Exporter<?> exporter = protocol.export(wrapperInvoker);
42 exporters.add(exporter);
43 }
44 }
45 }
46 this.urls.add(url);
47 }
48
49
远程暴露的核心类(这个流程会反复出现)为RegisterProtocol所以Protocol$Adaptive动态获得了RegisterProtocol,ProtocolFilterWrapper和本地暴露一致,只是协议不同,本地暴露为InjvmProtocol,所以直接看到RegistryProtocol#export(final Invoker)方法。
1
2
3
4
5
6
7
8
9
10
11 1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2 //导出服务部分,先往下看
3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
4
5 /*省略代码,下面会有写*/
6
7 //保证每次export都返回一个新的exporter实例
8 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
9 }
10
11
RegistryProtocol#doLocalExport(final Invoker originInvoker)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
2 //从invoker中获得服务的URL,默认是dubbo://开头的URL
3 String key = getCacheKey(originInvoker);
4 //从缓存中获得exporter,如果已经暴露就不再暴露
5 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
6 //双重检查锁
7 if (exporter == null) {
8 synchronized (bounds) {
9 exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
10 if (exporter == null) {
11 //将originInvoker和服务的URL(默认dubbo://开头的URL)封装到一个委托中
12 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
13 //生成Exporter流程与本地暴露相同
14 //由于默认dubbo协议,所以protocol包装类最终的protocol为DubboProtocol,只是最后会调用DubboProtocol#export(Invoker<T> invoker)
15 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
16 //写缓存
17 bounds.put(key, exporter);
18 }
19 }
20 }
21 return exporter;
22 }
23
24
DubboProtocol#export(Invoker invoker)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 URL url = invoker.getUrl();
3 // 和本地暴露一样,远程暴露DubboProtocol中也有一个exporterMap,记录了暴露的服务
4 // key由服务组名,服务名,服务版本号以及端口组成
5 //没有设置group和版本号,key为pers.congzhou.service.DemoService:20880
6 //如全设置为demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
7 String key = serviceKey(url);
8 //构造DubboExporter
9 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
10 //加入缓存
11 exporterMap.put(key, exporter);
12 //本地存根,http://dubbo.apache.org/zh-cn/docs/user/demos/local-stub.html
13 //跳过,消费者端为调度事件导出存根服务
14 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
15 Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
16 if (isStubSupportEvent && !isCallbackservice) {
17 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
18 if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
19 if (logger.isWarnEnabled()) {
20 /*省略代码,logger*/
21 }
22 } else {
23 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
24 }
25 }
26 //启动服务器
27 openServer(url);
28 //优化序列化
29 optimizeSerialization(url);
30 return exporter;
31 }
32
33
启动服务器实例
DubboProtocol#openServer(URL url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1private void openServer(URL url) {
2 // 获取地址(host:port),并将其作为服务器实例的 key,用于标识当前的服务器实例
3 String key = url.getAddress();
4 // 官方注释:client 也可以暴露一个只有server可以调用的服务。
5 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
6 if (isServer) {
7 //缓存,不存在就创建一个,默认只启动一个
8 ExchangeServer server = serverMap.get(key);
9 if (server == null) {
10 serverMap.put(key, createServer(url));
11 } else {
12 // 服务器已存在,则根据URL中的配置重置服务器
13 server.reset(url);
14 }
15 }
16 }
17
18
DubboProtocol#createServer(URL url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1private ExchangeServer createServer(URL url) {
2 //往URL加配置
3 // 默认开启server关闭时发送readonly事件
4 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
5 // 默认开启heartbeat
6 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
7 //获得server参数,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-protocol.html
8 //dubbo协议缺省为netty,http协议缺省为servlet
9 //这边我配置了netty4
10 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
11
12 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
13 throw new RpcException("Unsupported server type: " + str + ", url: " + url);
14 //配置协议编码方式
15 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
16 ExchangeServer server;
17 try {
18 //启动服务器
19 //Exchangers通过Adaptive机制获得Exchanger,默认为HeaderExchanger
20 server = Exchangers.bind(url, requestHandler);
21 } catch (RemotingException e) {
22 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
23 }
24 str = url.getParameter(Constants.CLIENT_KEY);
25 if (str != null && str.length() > 0) {
26 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
27 if (!supportedTypes.contains(str)) {
28 throw new RpcException("Unsupported client type: " + str);
29 }
30 }
31 return server;
32 }
33
34
HeaderExchanger#bind(URL url, ExchangeHandler handler)方法的三个逻辑:
-
HeaderExchangeHandler代理了DubboProtocol#requestHandler
-
DecodeHandler代理了HeaderExchangeHandler
-
bind,创建 NettyServer
1
2
3
4
5 1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3 }
4
5
Transporters#bind(URL url, ChannelHandler… handlers)
1
2
3
4
5
6
7
8
9
10
11
12
13 1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2 /*省略代码,参数校验*/
3 ChannelHandler handler;
4 if (handlers.length == 1) {
5 handler = handlers[0];
6 } else {
7 handler = new ChannelHandlerDispatcher(handlers);
8 }
9 //getTransporter()获得Adaptive
10 return getTransporter().bind(url, handler);
11 }
12
13
Transporter$Adaptive#bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1)
1
2
3
4
5
6
7
8
9
10
11
12 1public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
2 if (arg0 == null) throw new IllegalArgumentException("url == null");
3 com.alibaba.dubbo.common.URL url = arg0;
4 String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
5 if (extName == null)
6 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
7 //根据server参数获得具体Transporter实例,我使用的server参数为netty4
8 com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
9 return extension.bind(arg0, arg1);
10 }
11
12
com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter#bind(URL url, ChannelHandler listener)
netty启动的代码不写了,其中listener会被包装,当netty触发事件时会调用listener处理。
1
2
3
4
5 1public Server bind(URL url, ChannelHandler listener) throws RemotingException {
2 return new NettyServer(url, listener);
3 }
4
5
服务注册
回到远程暴露的核心类RegistryProtocol#export(final Invoker)
再贴一下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2 //导出服务部分
3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
4 //获得注册中心URL,zookeeper的URL为zookeeper://开头
5 URL registryUrl = getRegistryUrl(originInvoker);
6 // 获取 Registry,先往下看
7 final Registry registry = getRegistry(originInvoker);
8 //获得注册的URL,过滤了一些不需要的参数
9 final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
10
11 //判断是否延迟发布
12 boolean register = registeredProviderUrl.getParameter("register", true);
13
14 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
15
16 //register过程,就刚刚回来的地方继续
17 if (register) {
18 register(registryUrl, registeredProviderUrl);
19 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
20 }
21
22 // 订阅override数据
23 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
24 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
25 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
26 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
27 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
28 //保证每次export都返回一个新的exporter实例
29 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
30 }
31
32
RegistryProtocol#getRegistry(final Invoker<?> originInvoker)
1
2
3
4
5
6
7
8 1private Registry getRegistry(final Invoker<?> originInvoker) {
2 //获得注册中心URL
3 URL registryUrl = getRegistryUrl(originInvoker);
4 //通过RegistryFactory$Adaptive获得具体实例,和其他Adaptive一样,就不贴代码了
5 return registryFactory.getRegistry(registryUrl);
6 }
7
8
zookeeper会获得ZookeeperRegistryFactory
AbstractRegistryFactory#getRegistry(URL url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 1public Registry getRegistry(URL url) {
2 url = url.setPath(RegistryService.class.getName())
3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
5 String key = url.toServiceString();
6 // 加锁
7 LOCK.lock();
8 try {
9 // 缓存
10 Registry registry = REGISTRIES.get(key);
11 if (registry != null) {
12 return registry;
13 }
14 // 缓存未命中,创建 Registry 实例
15 registry = createRegistry(url);
16 if (registry == null) {
17 throw new IllegalStateException("Can not create registry " + url);
18 }
19 // 写缓存
20 REGISTRIES.put(key, registry);
21 return registry;
22 } finally {
23 LOCK.unlock();
24 }
25 }
26
27
ZookeeperRegistryFactory#createRegistry(URL url)
1
2
3
4
5 1public Registry createRegistry(URL url) {
2 return new ZookeeperRegistry(url, zookeeperTransporter);
3 }
4
5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
2 super(url);
3 if (url.isAnyHost()) {
4 throw new IllegalStateException("registry address == null");
5 }
6 //获取组名,默认dubbo
7 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
8 if (!group.startsWith(Constants.PATH_SEPARATOR)) {
9 //加上路径,/group
10 group = Constants.PATH_SEPARATOR + group;
11 }
12 this.root = group;
13 // 创建 Zookeeper 客户端,先往下看
14 zkClient = zookeeperTransporter.connect(url);
15 // 状态监听器
16 zkClient.addStateListener(new StateListener() {
17 @Override
18 public void stateChanged(int state) {
19 if (state == RECONNECTED) {
20 try {
21 recover();
22 } catch (Exception e) {
23 logger.error(e.getMessage(), e);
24 }
25 }
26 }
27 });
28 }
29
30
ZookeeperTransporter$Adaptive,默认为 CuratorZookeeperTransporter,Apache Curator一个Zookeeper客户端。
CuratorZookeeperTransporter#connect(URL url)
1
2
3
4
5 1public ZookeeperClient connect(URL url) {
2 return new CuratorZookeeperClient(url);
3 }
4
5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 1public CuratorZookeeperClient(URL url) {
2 super(url);
3 try {
4 // CuratorFramework 工厂,各种参数加进去
5 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
6 .connectString(url.getBackupAddress())
7 .retryPolicy(new RetryNTimes(1, 1000))
8 .connectionTimeoutMs(5000);
9 String authority = url.getAuthority();
10 if (authority != null && authority.length() > 0) {
11 builder = builder.authorization("digest", authority.getBytes());
12 }
13 // 构建 CuratorFramework 实例
14 client = builder.build();
15 // 添加监听器
16 client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
17 @Override
18 public void stateChanged(CuratorFramework client, ConnectionState state) {
19 if (state == ConnectionState.LOST) {
20 CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
21 } else if (state == ConnectionState.CONNECTED) {
22 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
23 } else if (state == ConnectionState.RECONNECTED) {
24 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
25 }
26 }
27 });
28 // 启动客户端
29 client.start();
30 } catch (Exception e) {
31 throw new IllegalStateException(e.getMessage(), e);
32 }
33 }
34
35
回到服务暴露的核心类,在贴一下代码就不需要往上看啦
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2 //导出服务部分
3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
4 //获得注册中心URL,zookeeper的URL为zookeeper://开头
5 URL registryUrl = getRegistryUrl(originInvoker);
6 // 获取 Registry,先往下看
7 final Registry registry = getRegistry(originInvoker);
8
9 //上面写到这边
10 //获得注册的URL,过滤了一些不需要的参数
11 final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
12
13 //判断是否延迟发布
14 boolean register = registeredProviderUrl.getParameter("register", true);
15
16 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
17
18 //register过程
19 if (register) {
20 // 向注册中心注册服务
21 register(registryUrl, registeredProviderUrl);
22 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
23 }
24
25 // 订阅override数据
26 // 创建监听器,FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
27 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
28 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
29 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
30 // 向注册中心订阅
31 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
32 // 创建DestroyableExporter,保证每次export都返回一个新的exporter实例
33 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
34 }
35
36
RegistryProtocol#register(URL registryUrl, URL registedProviderUrl)
1
2
3
4
5
6 1 public void register(URL registryUrl, URL registedProviderUrl) {
2 Registry registry = registryFactory.getRegistry(registryUrl);
3 registry.register(registedProviderUrl);
4 }
5
6
ZookeeperRegistry的抽象父类FailbackRegistry#register(URL url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1public void register(URL url) {
2 //加到一个hashset中
3 super.register(url);
4 failedRegistered.remove(url);
5 failedUnregistered.remove(url);
6 try {
7 // 向服务器端发送注册请求
8 doRegister(url);
9 } catch (Exception e) {
10 Throwable t = e;
11 // 如果打开启动检测,则直接抛出异常
12 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
13 && url.getParameter(Constants.CHECK_KEY, true)
14 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
15 boolean skipFailback = t instanceof SkipFailbackWrapperException;
16 if (check || skipFailback) {
17 if (skipFailback) {
18 t = t.getCause();
19 }
20 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
21 } else {
22 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
23 }
24 // 将失败的注册请求记录到失败的列表中,定期重试
25 failedRegistered.add(url);
26 }
27 }
28
29
ZookeeperRegistry#doRegister(URL url)
-
toUrlPath 方法生成节点路径,路径格式/${group}/${serviceInterface}/providers/${url}
-
通过Zookeeper客户端创建节点
1
2
3
4
5
6
7
8
9 1protected void doRegister(URL url) {
2 try {
3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
4 } catch (Throwable e) {
5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
6 }
7 }
8
9
没有了!