上一篇文章《Spring Cloud微服务技术栈(三):服务治理Spring Cloud Eureka核心元素分析》主要对Spring Cloud Eureka的三个核心元素(服务注册中心、服务提供者、服务消费者)进行了分析,熟悉了三者之间的通信关系,本篇文章将主要分析Spring Cloud Eureka的部分源码。
当我们搭建好Eureka Server服务注册中心并启动后,就可以继续启动服务提供者和服务消费者了。大家都知道,当服务提供者成功启动后,就会向服务注册中心注册自己的服务,服务消费者成功启动后,就会向服务注册中心获取服务实例列表,根据实例列表来调用具体服务。那么,这整个过程是如何运转的呢?我们一起来根据源码的思路来探索。
Eureka Server服务注册中心源码分析
回忆之前我们一起搭建的服务注册中心的项目,我们在服务注册中心的项目中的application.properties文件中配置好服务注册中心需要的相关配置,然后在Spring Boot的启动类中加了一个注解@EnableEurekaServer,然后启动项目就成功启动了服务注册中心,那么到底是如何启动的呢?
在配置文件中(单节点),我们是如下配置的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1# 配置端口
2server.port=1111
3# 配置服务注册中心地址
4eureka.instance.hostname=localhost
5# 作为服务注册中心,禁止本应用向自己注册服务
6eureka.client.register-with-eureka=false
7# 作为服务注册中心,禁止本应用向自己检索服务
8eureka.client.fetch-registry=false
9# 设置服务注册中心服务注册地址
10eureka.client.service-url.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/
11# 关闭自我保护机制,及时剔除无效服务
12eureka.server.enable-self-preservation=false
13
14
这个配置在工程启动的时候,会被Spring容器读取,配置到EurekaClientConfigBean中,而这个配置类会被注册成Spring的Bean以供其他的Bean来使用。
我们再进入注解@EnableEurekaServer一探究竟,@EnableEurekaServer的源码如下:
1
2
3
4
5
6
7
8
9 1@Target(ElementType.TYPE)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Import(EurekaServerMarkerConfiguration.class)
5public @interface EnableEurekaServer {
6
7}
8
9
从上述注解可以看出,该注解导入了配置类EurekaServerMarkerConfiguration,我们在进一步进入到EurekaServerMarkerConfiguration中,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11 1@Configuration
2public class EurekaServerMarkerConfiguration {
3 @Bean
4 public Marker eurekaServerMarkerBean() {
5 return new Marker();
6 }
7 class Marker {
8 }
9}
10
11
从这个配置类中暂时无法看到什么具体的内容,我们可以进一步查看类Marker在哪些地方被使用了,通过搜索Marker,可以发现在类EurekaServerAutoConfiguration上的注解中被引用了,具体代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 1@Configuration
2@Import(EurekaServerInitializerConfiguration.class)
3@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
4@EnableConfigurationProperties({ EurekaDashboardProperties.class,
5 InstanceRegistryProperties.class })
6@PropertySource("classpath:/eureka/server.properties")
7public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
8 /**
9 * List of packages containing Jersey resources required by the Eureka server
10 */
11 private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
12 "com.netflix.eureka" };
13
14 @Autowired
15 private ApplicationInfoManager applicationInfoManager;
16
17 @Autowired
18 private EurekaServerConfig eurekaServerConfig;
19
20 @Autowired
21 private EurekaClientConfig eurekaClientConfig;
22
23 @Autowired
24 private EurekaClient eurekaClient;
25
26 @Autowired
27 private InstanceRegistryProperties instanceRegistryProperties;
28
29 public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();
30
31 @Bean
32 public HasFeatures eurekaServerFeature() {
33 return HasFeatures.namedFeature("Eureka Server",
34 EurekaServerAutoConfiguration.class);
35 }
36
37 @Configuration
38 protected static class EurekaServerConfigBeanConfiguration {
39 @Bean
40 @ConditionalOnMissingBean
41 public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
42 EurekaServerConfigBean server = new EurekaServerConfigBean();
43 if (clientConfig.shouldRegisterWithEureka()) {
44 // Set a sensible default if we are supposed to replicate
45 server.setRegistrySyncRetries(5);
46 }
47 return server;
48 }
49 }
50
51 @Bean
52 @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
53 public EurekaController eurekaController() {
54 return new EurekaController(this.applicationInfoManager);
55 }
56
57 static {
58 CodecWrappers.registerWrapper(JACKSON_JSON);
59 EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
60 }
61
62 @Bean
63 public ServerCodecs serverCodecs() {
64 return new CloudServerCodecs(this.eurekaServerConfig);
65 }
66
67 private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
68 CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
69 return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
70 }
71
72 private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
73 CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
74 return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class)
75 : codec;
76 }
77
78 class CloudServerCodecs extends DefaultServerCodecs {
79
80 public CloudServerCodecs(EurekaServerConfig serverConfig) {
81 super(getFullJson(serverConfig),
82 CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class),
83 getFullXml(serverConfig),
84 CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class));
85 }
86 }
87
88 @Bean
89 public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
90 ServerCodecs serverCodecs) {
91 this.eurekaClient.getApplications(); // force initialization
92 return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
93 serverCodecs, this.eurekaClient,
94 this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
95 this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
96 }
97
98 @Bean
99 @ConditionalOnMissingBean
100 public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
101 ServerCodecs serverCodecs) {
102 return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
103 this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
104 }
105
106 /**
107 * {@link PeerEurekaNodes} which updates peers when /refresh is invoked.
108 * Peers are updated only if
109 * <code>eureka.client.use-dns-for-fetching-service-urls</code> is
110 * <code>false</code> and one of following properties have changed.
111 * </p>
112 * <ul>
113 * <li><code>eureka.client.availability-zones</code></li>
114 * <li><code>eureka.client.region</code></li>
115 * <li><code>eureka.client.service-url.<zone></code></li>
116 * </ul>
117 */
118 static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
119 implements ApplicationListener<EnvironmentChangeEvent> {
120
121 public RefreshablePeerEurekaNodes(
122 final PeerAwareInstanceRegistry registry,
123 final EurekaServerConfig serverConfig,
124 final EurekaClientConfig clientConfig,
125 final ServerCodecs serverCodecs,
126 final ApplicationInfoManager applicationInfoManager) {
127 super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
128 }
129
130 @Override
131 public void onApplicationEvent(final EnvironmentChangeEvent event) {
132 if (shouldUpdate(event.getKeys())) {
133 updatePeerEurekaNodes(resolvePeerUrls());
134 }
135 }
136
137 /*
138 * Check whether specific properties have changed.
139 */
140 protected boolean shouldUpdate(final Set<String> changedKeys) {
141 assert changedKeys != null;
142
143 // if eureka.client.use-dns-for-fetching-service-urls is true, then
144 // service-url will not be fetched from environment.
145 if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
146 return false;
147 }
148
149 if (changedKeys.contains("eureka.client.region")) {
150 return true;
151 }
152
153 for (final String key : changedKeys) {
154 // property keys are not expected to be null.
155 if (key.startsWith("eureka.client.service-url.") ||
156 key.startsWith("eureka.client.availability-zones.")) {
157 return true;
158 }
159 }
160
161 return false;
162 }
163 }
164
165 @Bean
166 public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
167 PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
168 return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
169 registry, peerEurekaNodes, this.applicationInfoManager);
170 }
171
172 @Bean
173 public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
174 EurekaServerContext serverContext) {
175 return new EurekaServerBootstrap(this.applicationInfoManager,
176 this.eurekaClientConfig, this.eurekaServerConfig, registry,
177 serverContext);
178 }
179
180 /**
181 * Register the Jersey filter
182 */
183 @Bean
184 public FilterRegistrationBean jerseyFilterRegistration(
185 javax.ws.rs.core.Application eurekaJerseyApp) {
186 FilterRegistrationBean bean = new FilterRegistrationBean();
187 bean.setFilter(new ServletContainer(eurekaJerseyApp));
188 bean.setOrder(Ordered.LOWEST_PRECEDENCE);
189 bean.setUrlPatterns(
190 Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
191
192 return bean;
193 }
194
195 /**
196 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
197 * required by the Eureka server.
198 */
199 @Bean
200 public javax.ws.rs.core.Application jerseyApplication(Environment environment,
201 ResourceLoader resourceLoader) {
202
203 ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
204 false, environment);
205
206 // Filter to include only classes that have a particular annotation.
207 //
208 provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
209 provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
210
211 // Find classes in Eureka packages (or subpackages)
212 //
213 Set<Class<?>> classes = new HashSet<>();
214 for (String basePackage : EUREKA_PACKAGES) {
215 Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
216 for (BeanDefinition bd : beans) {
217 Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
218 resourceLoader.getClassLoader());
219 classes.add(cls);
220 }
221 }
222
223 // Construct the Jersey ResourceConfig
224 //
225 Map<String, Object> propsAndFeatures = new HashMap<>();
226 propsAndFeatures.put(
227 // Skip static content used by the webapp
228 ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
229 EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
230
231 DefaultResourceConfig rc = new DefaultResourceConfig(classes);
232 rc.setPropertiesAndFeatures(propsAndFeatures);
233
234 return rc;
235 }
236
237 @Bean
238 public FilterRegistrationBean traceFilterRegistration(
239 @Qualifier("httpTraceFilter") Filter filter) {
240 FilterRegistrationBean bean = new FilterRegistrationBean();
241 bean.setFilter(filter);
242 bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
243 return bean;
244 }
245}
246
247
在这个配置类上面,加入了@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class),也就是说,类EurekaServerAutoConfiguration被注册为Spring Bean的前提是在Spring容器中存在EurekaServerMarkerConfiguration.Marker.class的对象,而这个对象存在的前提是我们在Spring Boot启动类中加入了@EnableEurekaServer注解。小总结一下就是,在Spring Boot启动类上加入了@EnableEurekaServer注解以后,就会触发EurekaServerMarkerConfiguration.Marker.class被Spring实例化为Spring Bean,有了这个Bean以后,Spring就会再实例化EurekaServerAutoConfiguration类,而这个类就是配置了Eureka Server的相关内容,列举如下:
1
2
3
4
5
6
7
8 1注入EurekaServerConfig—->用于注册中心相关配置信息
2注入EurekaController—->提供注册中心上相关服务信息的展示支持
3注入PeerAwareInstanceRegistry—->提供实例注册支持,例如实例获取、状态更新等相关支持
4注入PeerEurekaNodes—->提供注册中心对等服务间通信支持
5注入EurekaServerContext—->提供初始化注册init服务、初始化PeerEurekaNode节点信息
6注入EurekaServerBootstrap—->用于初始化initEurekaEnvironment/initEurekaServerContext
7
8
而且,在类EurekaServerAutoConfiguration上,我们看见@Import(EurekaServerInitializerConfiguration.class),说明实例化类EurekaServerAutoConfiguration之前,已经实例化了EurekaServerInitializerConfiguration类,从类名可以看出,该类是Eureka Server的初始化配置类,我们进入到EurekaServerInitializerConfiguration类中一探究竟,发现该类实现了Spring的生命周期接口SmartLifecycle,也就是说类EurekaServerInitializerConfiguration在被Spring实例化过程中的时候会执行一些生命周期方法,比如Lifecycle的start方法,那么看看EurekaServerInitializerConfiguration是如何重写start方法的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1@Configuration
2public class EurekaServerInitializerConfiguration
3 implements ServletContextAware, SmartLifecycle, Ordered {
4
5 // 此处省略部分代码
6
7 @Override
8 public void start() {
9 new Thread(new Runnable() {
10 @Override
11 public void run() {
12 try {
13 //TODO: is this class even needed now?
14 eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
15 log.info("Started Eureka Server");
16
17 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
18 EurekaServerInitializerConfiguration.this.running = true;
19 publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
20 }
21 catch (Exception ex) {
22 // Help!
23 log.error("Could not initialize Eureka servlet context", ex);
24 }
25 }
26 }).start();
27 }
28
29 // 此处省略部分代码
30}
31
32
这个start方法中开启了一个新的线程,然后进行一些Eureka Server的初始化工作,比如调用eurekaServerBootstrap的contextInitialized方法,进入该方法看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public class EurekaServerBootstrap {
2 public void contextInitialized(ServletContext context) {
3 try {
4 // 初始化Eureka Server环境变量
5 initEurekaEnvironment();
6 // 初始化Eureka Server上下文
7 initEurekaServerContext();
8
9 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
10 }
11 catch (Throwable e) {
12 log.error("Cannot bootstrap eureka server :", e);
13 throw new RuntimeException("Cannot bootstrap eureka server :", e);
14 }
15 }
16}
17
18
这个方法中主要进行了Eureka的环境初始化和服务初始化,我们进入到initEurekaServerContext方法中来看服务初始化是如何实现的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 1public class EurekaServerBootstrap {
2 protected void initEurekaServerContext() throws Exception {
3 // For backward compatibility
4 JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
5 XStream.PRIORITY_VERY_HIGH);
6 XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
7 XStream.PRIORITY_VERY_HIGH);
8
9 if (isAws(this.applicationInfoManager.getInfo())) {
10 this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
11 this.eurekaClientConfig, this.registry, this.applicationInfoManager);
12 this.awsBinder.start();
13 }
14 // 初始化Eureka Server上下文环境
15 EurekaServerContextHolder.initialize(this.serverContext);
16
17 log.info("Initialized server context");
18
19 // Copy registry from neighboring eureka node
20 int registryCount = this.registry.syncUp();
21 // 期望每30秒接收到一次心跳,1分钟就是2次
22 // 修改Instance Status状态为up
23 // 同时,这里面会开启一个定时任务,用于清理 60秒没有心跳的客户端。自动下线
24 this.registry.openForTraffic(this.applicationInfoManager, registryCount);
25
26 // Register all monitoring statistics.
27 EurekaMonitors.registerAllStats();
28 }
29}
30
31
在初始化Eureka Server上下文环境后,就会继续执行openForTraffic方法,这个方法主要是设置了期望每分钟接收到的心跳次数,并将服务实例的状态设置为UP,最后又通过方法postInit来开启一个定时任务,用于每隔一段时间(默认60秒)将没有续约的服务实例(默认90秒没有续约)清理掉。openForTraffic的方法代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1@Override
2public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
3 // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
4 // 计算每分钟最大续约数
5 this.expectedNumberOfRenewsPerMin = count * 2;
6 // 计算每分钟最小续约数
7 this.numberOfRenewsPerMinThreshold =
8 (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
9 logger.info("Got {} instances from neighboring DS node", count);
10 logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
11 this.startupTime = System.currentTimeMillis();
12 if (count > 0) {
13 this.peerInstancesTransferEmptyOnStartup = false;
14 }
15 DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
16 boolean isAws = Name.Amazon == selfName;
17 if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
18 logger.info("Priming AWS connections for all replicas..");
19 primeAwsReplicas(applicationInfoManager);
20 }
21 logger.info("Changing status to UP");
22 // 修改服务实例的状态为UP
23 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
24 // 开启定时任务,每隔一段时间(默认60秒)将没有续约的服务实例(默认90秒没有续约)清理掉
25 super.postInit();
26}
27
28
postInit方法开启了一个新的定时任务,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12 1protected void postInit() {
2 renewsLastMin.start();
3 if (evictionTaskRef.get() != null) {
4 evictionTaskRef.get().cancel();
5 }
6 evictionTaskRef.set(new EvictionTask());
7 evictionTimer.schedule(evictionTaskRef.get(),
8 serverConfig.getEvictionIntervalTimerInMs(),
9 serverConfig.getEvictionIntervalTimerInMs());
10}
11
12
这里的时间间隔都来自于EurekaServerConfigBean类,可以在配置文件中以eureka.server开头的配置来进行设置。
当然,服务注册中心启动的源码不仅仅只有这么多,其还有向其他集群中的服务注册中心复制服务实例列表的相关源码没有在这里进行分析,感兴趣的朋友可以自行断点分析。
Eureka Client服务注册行为分析
我们启动一个本地的服务注册中心,然后再启动一个单节点的服务提供者,我们都知道,在服务注册中心已经启动情况下,然后再启动服务提供者,服务提供者会将服务注册到服务注册中心,那么这个注册行为是如何运作的呢?我们都知道,服务注册行为是在服务提供者启动过程中完成的,那么我们完全可以从启动日志中揣摩出注册行为,请看下面服务提供者的启动日志:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 12018-12-01 15:37:17.832 INFO 31948 --- [ restartedMain] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
22018-12-01 15:37:17.868 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1
32018-12-01 15:37:18.031 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson
42018-12-01 15:37:18.031 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson
52018-12-01 15:37:18.168 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml
62018-12-01 15:37:18.168 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml
72018-12-01 15:37:18.370 INFO 31948 --- [ restartedMain] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
82018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Disable delta property : false
92018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null
102018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false
112018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Application is null : false
122018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true
132018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Application version is -1: true
142018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server
152018-12-01 15:37:18.539 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : The response status is 200
162018-12-01 15:37:18.541 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Starting heartbeat executor: renew interval is: 30
172018-12-01 15:37:18.543 INFO 31948 --- [ restartedMain] c.n.discovery.InstanceInfoReplicator : InstanceInfoReplicator onDemand update allowed rate per min is 4
182018-12-01 15:37:18.548 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1543649838546 with initial instances count: 1
192018-12-01 15:37:18.554 INFO 31948 --- [ restartedMain] o.s.c.n.e.s.EurekaServiceRegistry : Registering application PRODUCER-SERVICE with eureka with status UP
202018-12-01 15:37:18.556 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1543649838556, current=UP, previous=STARTING]
212018-12-01 15:37:18.557 INFO 31948 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227: registering service...
222018-12-01 15:37:18.627 INFO 31948 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227 - registration status: 204
232018-12-01 15:37:18.645 INFO 31948 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 13226 (http) with context path ''
242018-12-01 15:37:18.647 INFO 31948 --- [ restartedMain] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 13226
252018-12-01 15:37:18.654 INFO 31948 --- [ restartedMain] ringCloudEurekaProducerClientApplication : Started SpringCloudEurekaProducerClientApplication in 5.537 seconds (JVM running for 7.0)
262018-12-01 15:42:18.402 INFO 31948 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
27
28
从日志中我们可以读取到许多信息:
-
第一行日志告诉我们,服务提供者实例的状态被标注为“正在启动”。
-
第二行日志告诉我们,在默认的名为“us-east-1”的Region中初始化Eureka客户端,Region的名称是可以配置的,可以通过eureka.client.region来配置,如果没有配置它,那么默认的Region就是us-east-1。这里顺便多说一句,一个微服务应用只可以注册到一个Region中,也就是说一个微服务应用对应一个Region,一个Region对应多个Zone,是否还记得,我们在配置集群的Eureka Server服务注册中心的时候,都设置了eureka.client.service-url.defaultZone这个值,就是为了告诉服务提供者者或者集群内的其他Eureka Server,可以向这个Zone注册,并且defaultZone的值是使用逗号隔开的,也就是说我们的服务可以同时向多个Zone注册。由此可见,一个服务可以同时注册到一个Region中的多个Zone的。如果需要自己指定Zone,可以通过eureka.client.availability-zones来指定。关于Region和Zone请看下面的源码:
1
2
3
4
5
6
7
8
9
10 1public static String getRegion(EurekaClientConfig clientConfig) {
2 String region = clientConfig.getRegion();
3 if (region == null) {
4 region = DEFAULT_REGION;
5 }
6 region = region.trim().toLowerCase();
7 return region;
8}
9
10
1
2
3
4
5
6
7
8
9 1public String[] getAvailabilityZones(String region) {
2 String value = this.availabilityZones.get(region);
3 if (value == null) {
4 value = DEFAULT_ZONE;
5 }
6 return value.split(",");
7}
8
9
- 日志中getting all instance registry info from the eureka server表示服务在注册的过程中也会向服务注册中心获取其他服务实例的信息列表。
- 日志中Starting heartbeat executor: renew interval is: 30表示以默认的30秒作为间隔向服务注册中心发起心跳请求,告诉服务注册中心“我还活着”。
- 日志中Discovery Client initialized at timestamp 1543649838546 with initial instances count: 1表示在时间戳1543649838546的时候,服务成功初始化完成。
- 日志中DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227: registering service…表示开始将服务注册到服务注册中心。
- 日志中DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227 – registration status: 204表示服务注册完成,完成的状态标志为204。
接下来,我们进入到源码中,借助源代码来分析一下服务注册到服务注册中心的流程。在分析之前,我们有必要搞清楚Spring Cloud是如何集成Eureka的,我们都知道,在Eureka客户端,无论是服务提供者还是服务消费者,都需要加上@EnableDiscoveryClient注解,用来开启DiscoveryClient实例,我们通过搜索DiscoveryClient,可以发现,搜索的结果有一个接口还有一个类,接口在包org.springframework.cloud.client.discovery下,类在com.netflix.discovery包下,接口DiscoveryClient是Spring Cloud的接口,它定义了用来发现服务的常用方法,通过该接口可以有效地屏蔽服务治理中的实现细节,这就方便切换不同的服务服务治理框架,而无需改动从Spring Cloud层面调用的代码,该接口有一个实现类EurekaDiscoveryClient,从命名可以可以看出他是对Eureka服务发现的封装,进入到EurekaDiscoveryClient可以看到,它有一个成员变量为EurekaClient,这是包com.netflix.discovery下的一个接口,该接口继承了LookupService接口,且有一个实现类DiscoveryClient,接口EurekaClient和LookupService都在com.netflix.discovery包下,他们都定义了针对Eureka的服务发现的抽象方法,而EurekaClient的实现类DiscoveryClient则实现了这些抽象方法,所以说,类DiscoveryClient是真正实现发现服务的类。结合以上的文字,下面展示接口与类的关系图如下所示:
我们进入到DiscoveryClient类中查看源码,首先看到的是它的类注释如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1/**
2 * The class that is instrumental for interactions with <tt>Eureka Server</tt>.
3 *
4 * <p>
5 * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the
6 * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with
7 * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from
8 * <tt>Eureka Server</tt> during shutdown
9 * <p>
10 * d) <em>Querying</em> the list of services/instances registered with
11 * <tt>Eureka Server</tt>
12 * <p>
13 *
14 * <p>
15 * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>
16 * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips
17 * which do not change. All of the functions defined above fail-over to other
18 * {@link java.net.URL}s specified in the list in the case of failure.
19 * </p>
20 *
21 * @author Karthik Ranganathan, Greg Kim
22 * @author Spencer Gibb
23 *
24 */
25
26
大致翻译如下:
1
2
3
4
5
6
7
8
9
10
11 1这个类用于帮助与Eureka Server相互协作。
2
3Eureka Client负责下面的任务:
4- 向Eureka Server注册服务实例
5- 向Eureka Server服务续约
6- 当服务关闭期间,向Eureka Server取消租约
7- 查询Eureka Server中的服务实例列表
8
9Eureka Client还需要配置一个Eureka Server的URL列表
10
11
在分析类DiscoveryClient完成的具体任务之前,我们首先来回忆一下,我们在配置服务提供者的时候,在配置文件中都配置了eureka.client.service-url.defaultZone属性,而这个属性的值就是告诉服务提供者,该向哪里注册服务,也就是服务注册的地址,该地址比如是http://peer1:1111/eureka/,http://peer2:1112/eureka/,http://peer3:1113/eureka/,各个地址之间使用逗号隔开,我们在类EndpointUtils中可以找到一个方法名为getServiceUrlsMapFromConfig的方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
2 Map<String, List<String>> orderedUrls = new LinkedHashMap<>();
3 String region = getRegion(clientConfig);
4 String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
5 if (availZones == null || availZones.length == 0) {
6 availZones = new String[1];
7 availZones[0] = DEFAULT_ZONE;
8 }
9 logger.debug("The availability zone for the given region {} are {}", region, availZones);
10 int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
11
12 String zone = availZones[myZoneOffset];
13 List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
14 if (serviceUrls != null) {
15 orderedUrls.put(zone, serviceUrls);
16 }
17 int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
18 while (currentOffset != myZoneOffset) {
19 zone = availZones[currentOffset];
20 serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
21 if (serviceUrls != null) {
22 orderedUrls.put(zone, serviceUrls);
23 }
24 if (currentOffset == (availZones.length - 1)) {
25 currentOffset = 0;
26 } else {
27 currentOffset++;
28 }
29 }
30
31 if (orderedUrls.size() < 1) {
32 throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
33 }
34 return orderedUrls;
35}
36
37
该方法就是从我们配置的Zone中读取注册地址,并组成一个List,最后将这个List存储到Map集合中,在读取过程中,它首先加载的是getRegion方法,这个方法读取了一个Region返回,进入到getRegion方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1/**
2 * Get the region that this particular instance is in.
3 *
4 * @return - The region in which the particular instance belongs to.
5 */
6public static String getRegion(EurekaClientConfig clientConfig) {
7 String region = clientConfig.getRegion();
8 if (region == null) {
9 region = DEFAULT_REGION;
10 }
11 region = region.trim().toLowerCase();
12 return region;
13 }
14
15
从方法的注释上可以知道,一个微服务应用只可以属于一个Region,方法体中的第一行代码就是从EurekaClientConfigBean类中来读取Region,而EurekaClientConfigBean中getRegion方法返回的值就是需要我们配置的,在配置文件中,对应的属性是eureka.client.region,如果我们每月配置,那么将使用默认的Region,默认DEFAULT_REGION为default。在方法getServiceUrlsMapFromConfig中,还加载了getAvailabilityZones方法,方法代码如下所示:
1
2
3
4
5
6
7
8
9 1public String[] getAvailabilityZones(String region) {
2 String value = this.availabilityZones.get(region);
3 if (value == null) {
4 value = DEFAULT_ZONE;
5 }
6 return value.split(",");
7}
8
9
上述方法中第一行代码是从Region中获取Zone,availabilityZones是EurekaClientConfigBean的一个Map成员变量,如果我们没有为Region特别配置eureka.client.availablity-zones属性,那么zone将采用默认值,默认值是defaultZone,这就是我们一开始配置eureka.client.service-url.defaultZone的由来,由此可见,一个Region对应多个Zone,也就是说一个微服务应用可以向多个服务注册地址注册。在获取了Region和Zone的信息之后,才开始真正地加载Eureka Server的具体地址,它根据传入的参数按照一定的算法确定加载位于哪一个Zone配置的serviceUrls,代码如下:
1
2
3
4
5 1int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
2String zone = availZones[myZoneOffset];
3List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
4
5
当我们在微服务应用中使用Ribbon来实现服务调用时,对于Zone的设置可以在负载均衡时实现区域亲和特性:Ribbon的默认策略会优先访问同客户端处于一个Zone中的服务实例,只有当同一个Zone中没有可用的服务实例的时候才会去访问其他Zone中的实例。利用亲和性这一特性,我们就可以有效地设计出针对区域性故障的容错集群。
从本小节一开始,我们就分析了Spring Cloud Eureka是对Netflix Eureka的封装,com.netflix.discovery包下的DiscoveryClient才是真正实现服务的注册与发现。我们一起来看看它的构造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 1@Inject
2DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
3 Provider<BackupRegistry> backupRegistryProvider) {
4 if (args != null) {
5 this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
6 this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
7 this.eventListeners.addAll(args.getEventListeners());
8 this.preRegistrationHandler = args.preRegistrationHandler;
9 } else {
10 this.healthCheckCallbackProvider = null;
11 this.healthCheckHandlerProvider = null;
12 this.preRegistrationHandler = null;
13 }
14
15 this.applicationInfoManager = applicationInfoManager;
16 InstanceInfo myInfo = applicationInfoManager.getInfo();
17
18 clientConfig = config;
19 staticClientConfig = clientConfig;
20 transportConfig = config.getTransportConfig();
21 instanceInfo = myInfo;
22 if (myInfo != null) {
23 appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
24 } else {
25 logger.warn("Setting instanceInfo to a passed in null value");
26 }
27
28 this.backupRegistryProvider = backupRegistryProvider;
29
30 this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
31 localRegionApps.set(new Applications());
32
33 fetchRegistryGeneration = new AtomicLong(0);
34
35 remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
36 remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
37
38 if (config.shouldFetchRegistry()) {
39 this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
40 } else {
41 this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
42 }
43
44 if (config.shouldRegisterWithEureka()) {
45 this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
46 } else {
47 this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
48 }
49
50 logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
51
52 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
53 logger.info("Client configured to neither register nor query for data.");
54 scheduler = null;
55 heartbeatExecutor = null;
56 cacheRefreshExecutor = null;
57 eurekaTransport = null;
58 instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
59
60 // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
61 // to work with DI'd DiscoveryClient
62 DiscoveryManager.getInstance().setDiscoveryClient(this);
63 DiscoveryManager.getInstance().setEurekaClientConfig(config);
64
65 initTimestampMs = System.currentTimeMillis();
66 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
67 initTimestampMs, this.getApplications().size());
68
69 return; // no need to setup up an network tasks and we are done
70 }
71
72 try {
73 // default size of 2 - 1 each for heartbeat and cacheRefresh
74 scheduler = Executors.newScheduledThreadPool(2,
75 new ThreadFactoryBuilder()
76 .setNameFormat("DiscoveryClient-%d")
77 .setDaemon(true)
78 .build());
79
80 heartbeatExecutor = new ThreadPoolExecutor(
81 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
82 new SynchronousQueue<Runnable>(),
83 new ThreadFactoryBuilder()
84 .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
85 .setDaemon(true)
86 .build()
87 ); // use direct handoff
88
89 cacheRefreshExecutor = new ThreadPoolExecutor(
90 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
91 new SynchronousQueue<Runnable>(),
92 new ThreadFactoryBuilder()
93 .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
94 .setDaemon(true)
95 .build()
96 ); // use direct handoff
97
98 eurekaTransport = new EurekaTransport();
99 scheduleServerEndpointTask(eurekaTransport, args);
100
101 AzToRegionMapper azToRegionMapper;
102 if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
103 azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
104 } else {
105 azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
106 }
107 if (null != remoteRegionsToFetch.get()) {
108 azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
109 }
110 instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
111 } catch (Throwable e) {
112 throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
113 }
114
115 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
116 fetchRegistryFromBackup();
117 }
118
119 // call and execute the pre registration handler before all background tasks (inc registration) is started
120 if (this.preRegistrationHandler != null) {
121 this.preRegistrationHandler.beforeRegistration();
122 }
123
124 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
125 try {
126 if (!register() ) {
127 throw new IllegalStateException("Registration error at startup. Invalid server response.");
128 }
129 } catch (Throwable th) {
130 logger.error("Registration error at startup: {}", th.getMessage());
131 throw new IllegalStateException(th);
132 }
133 }
134
135 // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
136 // 这个方法里实现了服务向服务注册中心注册的行为
137 initScheduledTasks();
138
139 try {
140 Monitors.registerObject(this);
141 } catch (Throwable e) {
142 logger.warn("Cannot register timers", e);
143 }
144
145 // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
146 // to work with DI'd DiscoveryClient
147 DiscoveryManager.getInstance().setDiscoveryClient(this);
148 DiscoveryManager.getInstance().setEurekaClientConfig(config);
149
150 initTimestampMs = System.currentTimeMillis();
151 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
152 initTimestampMs, this.getApplications().size());
153}
154
155
整个构造方法里,一开始进行了各种参数的设置,而真正地注册行为是在initScheduledTasks方法里实现的,我们一起来看看注册行为是如何实现的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 1private void initScheduledTasks() {
2 if (clientConfig.shouldFetchRegistry()) {
3 // registry cache refresh timer
4 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
5 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
6 scheduler.schedule(
7 new TimedSupervisorTask(
8 "cacheRefresh",
9 scheduler,
10 cacheRefreshExecutor,
11 registryFetchIntervalSeconds,
12 TimeUnit.SECONDS,
13 expBackOffBound,
14 new CacheRefreshThread()
15 ),
16 registryFetchIntervalSeconds, TimeUnit.SECONDS);
17 }
18
19 if (clientConfig.shouldRegisterWithEureka()) {
20 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
21 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
22 logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
23
24 // Heartbeat timer
25 scheduler.schedule(
26 new TimedSupervisorTask(
27 "heartbeat",
28 scheduler,
29 heartbeatExecutor,
30 renewalIntervalInSecs,
31 TimeUnit.SECONDS,
32 expBackOffBound,
33 new HeartbeatThread()
34 ),
35 renewalIntervalInSecs, TimeUnit.SECONDS);
36
37 // InstanceInfo replicator
38 instanceInfoReplicator = new InstanceInfoReplicator(
39 this,
40 instanceInfo,
41 clientConfig.getInstanceInfoReplicationIntervalSeconds(),
42 2); // burstSize
43
44 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
45 @Override
46 public String getId() {
47 return "statusChangeListener";
48 }
49
50 @Override
51 public void notify(StatusChangeEvent statusChangeEvent) {
52 if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
53 InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
54 // log at warn level if DOWN was involved
55 logger.warn("Saw local status change event {}", statusChangeEvent);
56 } else {
57 logger.info("Saw local status change event {}", statusChangeEvent);
58 }
59 instanceInfoReplicator.onDemandUpdate();
60 }
61 };
62
63 if (clientConfig.shouldOnDemandUpdateStatusChange()) {
64 applicationInfoManager.registerStatusChangeListener(statusChangeListener);
65 }
66
67 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
68 } else {
69 logger.info("Not registering with Eureka server per configuration");
70 }
71}
72
73
这段代码中有两个主要的if代码块,第一个if代码块是决定是否从Eureka Server来获取注册信息,判断条件clientConfig.shouldFetchRegistry()是需要我们自己的在配置文件中通过属性eureka.client.fetch-registry=true进行配置的,默认为true,也就是说服务会从Eureka Server拉取注册信息,且默认间隔为30秒,每30秒执行一次定时任务,用于刷新所获取的注册信息。
第二个if代码块是决定是否将服务注册到服务注册中心的,也是我们本次要探讨的主要内容。判断条件clientConfig.shouldRegisterWithEureka()表示是否向Eureka Server注册自己,你是否还记得,我们在搭建单节点服务注册中心的时候,我们搭建的那个Eureka Server设置了属性eureka.client.register-with-eureka=false,意思就是说禁止Eureka Server把自己当做一个普通服务注册到自身,而这个属性默认值也是为true,也就是说我们在注册的服务的时候,无需配置这个属性,就可以将服务注册到服务注册中心。分析第二个if代码块,代码块中一开始就设置了一个定时任务,这个定时任务就是按照指定的时间间隔向Eureka Server发送心跳,告诉服务注册中心“我还活着”,对于发送心跳的时间间隔,我们一开始就讨论过,默认是30秒,这也就是为什么按照默认来说,一分钟理应发送两次心跳了,这个心跳间隔我们可以在配置文件中进行配置,配置属性为eureka.instance.lease-renewal-interval-in-seconds=30,对于默认90秒内没有发送心跳的服务,将会被服务在服务注册中心剔除,剔除时间间隔可以通过属性eureka.instance.lease-expiration-duration-in-seconds=90来进行配置。而整个服务的续约逻辑也很简单,在定时任务中有一个代码片段new HeartbeatThread(),然后开启了一个新的线程实现续约服务,就是通过发送REST请求来实现的,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1/**
2 * Renew with the eureka service by making the appropriate REST call
3 */
4boolean renew() {
5 EurekaHttpResponse<InstanceInfo> httpResponse;
6 try {
7 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
8 logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
9 if (httpResponse.getStatusCode() == 404) {
10 REREGISTER_COUNTER.increment();
11 logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
12 long timestamp = instanceInfo.setIsDirtyWithTime();
13 boolean success = register();
14 if (success) {
15 instanceInfo.unsetIsDirty(timestamp);
16 }
17 return success;
18 }
19 return httpResponse.getStatusCode() == 200;
20 } catch (Throwable e) {
21 logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
22 return false;
23 }
24}
25
26
服务提供者向服务注册中心发送心跳,并检查返回码,如果是404,那么服务将重新调用register方法,实现将服务注册到服务注册中心,否则直接检测返回码是否是200,返回布尔类型来告诉定时器是否续约成功。
续约的操作完成之后,就开始了服务实例的复制工作,紧接着通过服务实例管理器ApplicationInfoManager来创建一个服务实例状态监听器,用于监听服务实例的状态,并进入到onDemandUpdate中进行注册,方法onDemandUpdate的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 1public boolean onDemandUpdate() {
2 if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
3 if (!scheduler.isShutdown()) {
4 scheduler.submit(new Runnable() {
5 @Override
6 public void run() {
7 logger.debug("Executing on-demand update of local InstanceInfo");
8
9 Future latestPeriodic = scheduledPeriodicRef.get();
10 if (latestPeriodic != null && !latestPeriodic.isDone()) {
11 logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
12 latestPeriodic.cancel(false);
13 }
14
15 InstanceInfoReplicator.this.run();
16 }
17 });
18 return true;
19 } else {
20 logger.warn("Ignoring onDemand update due to stopped scheduler");
21 return false;
22 }
23 } else {
24 logger.warn("Ignoring onDemand update due to rate limiter");
25 return false;
26 }
27}
28
29public void run() {
30 try {
31 discoveryClient.refreshInstanceInfo();
32
33 Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
34 if (dirtyTimestamp != null) {
35 discoveryClient.register();
36 instanceInfo.unsetIsDirty(dirtyTimestamp);
37 }
38 } catch (Throwable t) {
39 logger.warn("There was a problem with the instance info replicator", t);
40 } finally {
41 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
42 scheduledPeriodicRef.set(next);
43 }
44}
45
46
服务实例的注册行为是在方法register中执行的,进入到register方法中,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1/**
2 * Register with the eureka service by making the appropriate REST call.
3 */
4boolean register() throws Throwable {
5 logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
6 EurekaHttpResponse<Void> httpResponse;
7 try {
8 httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
9 } catch (Exception e) {
10 logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
11 throw e;
12 }
13 if (logger.isInfoEnabled()) {
14 logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
15 }
16 return httpResponse.getStatusCode() == 204;
17}
18
19
注册行为其实就是将服务实例的信息通过HTTP请求传递给Eureka Server服务注册中心,当注册中心接收到注册信息之后将返回204状态码给客户端,表示注册成功。这就是客户端向服务注册中心注册的行为源码分析,那么服务注册中心是如何接收这些实例的注册信息,且如何保存的呢?请接着往下看。
Eureka Server服务注册中心接收注册行为分析
客户端向服务端发起注册请求之后,服务端是如何处理的呢?通过源码的分析,可以发现,客户端和服务端的交互都是通过REST请求发起的,而在服务端,包com.netflix.eureka.resources下定义了许多处理REST请求的类,对于接收客户端的注册信息,可以发现在类ApplicationResource下有一个addInstance方法,专门用来处理注册请求的,我们一起来分析这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 1/**
2 * Registers information about a particular instance for an
3 * {@link com.netflix.discovery.shared.Application}.
4 *
5 * @param info
6 * {@link InstanceInfo} information of the instance.
7 * @param isReplication
8 * a header parameter containing information whether this is
9 * replicated from other nodes.
10 */
11@POST
12@Consumes({"application/json", "application/xml"})
13public Response addInstance(InstanceInfo info,
14 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
15 logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
16 // validate that the instanceinfo contains all the necessary required fields
17 if (isBlank(info.getId())) {
18 return Response.status(400).entity("Missing instanceId").build();
19 } else if (isBlank(info.getHostName())) {
20 return Response.status(400).entity("Missing hostname").build();
21 } else if (isBlank(info.getIPAddr())) {
22 return Response.status(400).entity("Missing ip address").build();
23 } else if (isBlank(info.getAppName())) {
24 return Response.status(400).entity("Missing appName").build();
25 } else if (!appName.equals(info.getAppName())) {
26 return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
27 } else if (info.getDataCenterInfo() == null) {
28 return Response.status(400).entity("Missing dataCenterInfo").build();
29 } else if (info.getDataCenterInfo().getName() == null) {
30 return Response.status(400).entity("Missing dataCenterInfo Name").build();
31 }
32
33 // handle cases where clients may be registering with bad DataCenterInfo with missing data
34 DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
35 if (dataCenterInfo instanceof UniqueIdentifier) {
36 String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
37 if (isBlank(dataCenterInfoId)) {
38 boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
39 if (experimental) {
40 String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
41 return Response.status(400).entity(entity).build();
42 } else if (dataCenterInfo instanceof AmazonInfo) {
43 AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
44 String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
45 if (effectiveId == null) {
46 amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
47 }
48 } else {
49 logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
50 }
51 }
52 }
53
54 registry.register(info, "true".equals(isReplication));
55 return Response.status(204).build(); // 204 to be backwards compatible
56}
57
58
在接收服务实例注册的时候,首先要经过一系列的数据校验,通过校验以后调用PeerAwareInstanceRegistry的实现类对象的register方法对服务进行注册,进入到register方法继续分析:
1
2
3
4
5
6
7 1@Override
2public void register(final InstanceInfo info, final boolean isReplication) {
3 handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
4 super.register(info, isReplication);
5}
6
7
方法体中第一行代码中调用了publishEvent方法,将注册事件传播出去,然后继续调用com.netflix.eureka.registry包下的AbstractInstanceRegistry抽象类的register方法进行注册:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 1/**
2 * Registers a new instance with a given duration.
3 *
4 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
5 */
6public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
7 try {
8 read.lock();
9 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
10 REGISTER.increment(isReplication);
11 if (gMap == null) {
12 final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
13 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
14 if (gMap == null) {
15 gMap = gNewMap;
16 }
17 }
18 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
19 // Retain the last dirty timestamp without overwriting it, if there is already a lease
20 if (existingLease != null && (existingLease.getHolder() != null)) {
21 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
22 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
23 logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
24
25 // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
26 // InstanceInfo instead of the server local copy.
27 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
28 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
29 " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
30 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
31 registrant = existingLease.getHolder();
32 }
33 } else {
34 // The lease does not exist and hence it is a new registration
35 synchronized (lock) {
36 if (this.expectedNumberOfRenewsPerMin > 0) {
37 // Since the client wants to cancel it, reduce the threshold
38 // (1
39 // for 30 seconds, 2 for a minute)
40 this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
41 this.numberOfRenewsPerMinThreshold =
42 (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
43 }
44 }
45 logger.debug("No previous lease information found; it is new registration");
46 }
47 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
48 if (existingLease != null) {
49 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
50 }
51 gMap.put(registrant.getId(), lease);
52 synchronized (recentRegisteredQueue) {
53 recentRegisteredQueue.add(new Pair<Long, String>(
54 System.currentTimeMillis(),
55 registrant.getAppName() + "(" + registrant.getId() + ")"));
56 }
57 // This is where the initial state transfer of overridden status happens
58 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
59 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
60 + "overrides", registrant.getOverriddenStatus(), registrant.getId());
61 if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
62 logger.info("Not found overridden id {} and hence adding it", registrant.getId());
63 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
64 }
65 }
66 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
67 if (overriddenStatusFromMap != null) {
68 logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
69 registrant.setOverriddenStatus(overriddenStatusFromMap);
70 }
71
72 // Set the status based on the overridden status rules
73 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
74 registrant.setStatusWithoutDirty(overriddenInstanceStatus);
75
76 // If the lease is registered with UP status, set lease service up timestamp
77 if (InstanceStatus.UP.equals(registrant.getStatus())) {
78 lease.serviceUp();
79 }
80 registrant.setActionType(ActionType.ADDED);
81 recentlyChangedQueue.add(new RecentlyChangedItem(lease));
82 registrant.setLastUpdatedTimestamp();
83 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
84 logger.info("Registered instance {}/{} with status {} (replication={})",
85 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
86 } finally {
87 read.unlock();
88 }
89
90
在代码中我们看到,实例信息InstanceInfo对象被存储在以instanceId为key的ConcurrentHashMap中,然后又将这个ConcurrentHashMap对象存储到了以服务名为key的Map中,这就形成了双层Map结构,这也就对应了一开始我们所说的服务的元信息存储在一个双层Map结构中。
小结
这就基本完成了对Spring Cloud Eureka的简单源码分析,这里仅仅是对Eureka Server初始化的源码、服务注册、服务端接收注册的源码进行了简单分析,感兴趣的朋友可以通过DEBUG方式深入了解源码的运行机制。