本文共 23734 字,大约阅读时间需要 79 分钟。
1.启动原理2.数据同步
先跑一下microservice-discovery-eureka的EurekaApplication。可以看到启动日志:[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap : Setting the eureka configuration..[ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application unknown with eureka with status UP[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap : Eureka data center value eureka.datacenter is not set, defaulting to default[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap : Eureka environment value eureka.environment is not set, defaulting to test[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap : isAws returned false[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap : Initialized server context[Thread-22] c.n.e.r.PeerAwareInstanceRegistryImpl : Got 1 instances from neighboring DS node[Thread-22] c.n.e.r.PeerAwareInstanceRegistryImpl : Renew threshold is: 1[Thread-22] c.n.e.r.PeerAwareInstanceRegistryImpl : Changing status to UP[Thread-22] e.s.EurekaServerInitializerConfiguration : Started Eureka Server[ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http)[ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761[ main] c.itmuch.cloud.study.EurekaApplication : Started EurekaApplication in 6.011 seconds (JVM running for 8.163)【分析】可以发现EurekaServerBootstrap有个日志“Setting the eureka configuration..”。
【分析二】contextInitialized进行了2个内容的初始化。public void contextInitialized(ServletContext context) { try { this.initEurekaEnvironment(); this.initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable var3) { log.error("Cannot bootstrap eureka server :", var3); throw new RuntimeException("Cannot bootstrap eureka server :", var3); }}【分析一】:被打印出来的“Setting the eureka configuration..”protected void initEurekaEnvironment() throws Exception { log.info("Setting the eureka configuration.."); String dataCenter = ConfigurationManager.getConfigInstance().getString("eureka.datacenter"); if (dataCenter == null) { log.info("Eureka data center value eureka.datacenter is not set, defaulting to default"); ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", "default"); } else { ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", dataCenter); } String environment = ConfigurationManager.getConfigInstance().getString("eureka.environment"); if (environment == null) { ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", "test"); log.info("Eureka environment value eureka.environment is not set, defaulting to test"); } else { ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", environment); }}protected void initEurekaServerContext() throws Exception { JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000); if (this.isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); EurekaMonitors.registerAllStats();}
通过类图,可以看到EurekaServerInitializerConfiguration中有EurekaServerBootstrap的聚合。
【分析一】:通过接口ServletContextAware和SmartLifecycle。实现和spring cloud的结合。public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {【分析二】:通过ServletContextAware接口,注入tomcat的ServletContext对象。public void setServletContext(ServletContext servletContext) { this.servletContext = servletContext;}【分析三】:通过SmartLifecycle接口,Spring容器初始化该bean时会调用相应生命周期方法start(),触发了Eureka的启动。public void start() { (new Thread(new Runnable() { public void run() { try { 【分析四】:EurekaServerBootstrap的contextInitialized EurekaServerInitializerConfiguration.this.eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); EurekaServerInitializerConfiguration.log.info("Started Eureka Server"); EurekaServerInitializerConfiguration.this.publish(new EurekaRegistryAvailableEvent(EurekaServerInitializerConfiguration.this.getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; EurekaServerInitializerConfiguration.this.publish(new EurekaServerStartedEvent(EurekaServerInitializerConfiguration.this.getEurekaServerConfig())); } catch (Exception var2) { EurekaServerInitializerConfiguration.log.error("Could not initialize Eureka servlet context", var2); } } })).start();}【分析五】:调用了2次publish方法。EurekaRegistryAvailableEvent和EurekaRegistryAvailableEvent,没有看到监听的,可能是开放给开发者自定义的。使用方法如下:@Componentpublic class EurekaStateChangeListener {@EventListenerpublic void listen(EurekaInstanceCanceledEvent eurekaInstanceCanceledEvent) { //服务断线事件 String appName = eurekaInstanceCanceledEvent.getAppName(); String serverId = eurekaInstanceCanceledEvent.getServerId(); System.out.println(appName); System.out.println(serverId);}@EventListenerpublic void listen(EurekaInstanceRegisteredEvent event) { InstanceInfo instanceInfo = event.getInstanceInfo(); System.out.println(instanceInfo);}@EventListenerpublic void listen(EurekaInstanceRenewedEvent event) { event.getAppName(); event.getServerId();}@EventListenerpublic void listen(EurekaRegistryAvailableEvent event) {}@EventListenerpublic void listen(EurekaServerStartedEvent event) { //Server启动}}
public class DefaultLifecycleProcessor implements LifecycleProcessor, BeanFactoryAware { public void setBeanFactory(BeanFactory beanFactory) { Assert.isInstanceOf(ConfigurableListableBeanFactory.class, beanFactory); this.beanFactory = (ConfigurableListableBeanFactory)beanFactory; } public void start() { this.startBeans(false); this.running = true; } private void startBeans(boolean autoStartupOnly) { MaplifecycleBeans = this.getLifecycleBeans(); Map phases = new HashMap(); Iterator var4 = lifecycleBeans.entrySet().iterator(); while(true) { Map.Entry entry; Lifecycle bean; do { if (!var4.hasNext()) { if (!phases.isEmpty()) { List keys = new ArrayList(phases.keySet()); Collections.sort(keys); Iterator var10 = keys.iterator(); while(var10.hasNext()) { Integer key = (Integer)var10.next(); ((org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup)phases.get(key)).start(); 【分析】核心每个LifecycleGroup的start执行方法。 } } return; } entry = (Map.Entry)var4.next(); bean = (Lifecycle)entry.getValue(); } while(autoStartupOnly && (!(bean instanceof SmartLifecycle) || !((SmartLifecycle)bean).isAutoStartup())); int phase = this.getPhase(bean); org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup group = (org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup)phases.get(phase); if (group == null) { group = new org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly); phases.put(phase, group); } group.add((String)entry.getKey(), bean); } } private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName); if (bean != null && !this.equals(bean)) { String[] dependenciesForBean = this.beanFactory.getDependenciesForBean(beanName); String[] var6 = dependenciesForBean; int var7 = dependenciesForBean.length; for(int var8 = 0; var8 < var7; ++var8) { String dependency = var6[var8]; this.doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) { if (this.logger.isDebugEnabled()) { this.logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]"); } try { bean.start(); } catch (Throwable var10) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10); } if (this.logger.isDebugEnabled()) { this.logger.debug("Successfully started bean '" + beanName + "'"); } } } } //获取所有的Lifecycle的bean protected Map getLifecycleBeans() { return beans; } private class LifecycleGroupMember implements Comparable { private final String name; private final Lifecycle bean; LifecycleGroupMember(String name, Lifecycle bean) { this.name = name; this.bean = bean; } //每个bean的getPhase,根据权重优先执行。 public int compareTo(org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroupMember other) { int thisOrder = org.springframework.context.support.DefaultLifecycleProcessor.this.getPhase(this.bean); int otherOrder = org.springframework.context.support.DefaultLifecycleProcessor.this.getPhase(other.bean); return thisOrder == otherOrder ? 0 : (thisOrder < otherOrder ? -1 : 1); } } private class LifecycleGroup { private final List members = new ArrayList(); private final int phase; private final long timeout; private final Map lifecycleBeans; private final boolean autoStartupOnly; private volatile int smartMemberCount; public LifecycleGroup(int var1, long phase, Map timeout, boolean lifecycleBeans) { this.phase = phase; this.timeout = timeout; this.lifecycleBeans = lifecycleBeans; this.autoStartupOnly = autoStartupOnly; } public void add(String name, Lifecycle bean) { if (bean instanceof SmartLifecycle) { ++this.smartMemberCount; } this.members.add(org.springframework.context.support.DefaultLifecycleProcessor.this.new LifecycleGroupMember(name, bean)); } public void start() { if (!this.members.isEmpty()) { if (org.springframework.context.support.DefaultLifecycleProcessor.this.logger.isInfoEnabled()) { org.springframework.context.support.DefaultLifecycleProcessor.this.logger.info("Starting beans in phase " + this.phase); } Collections.sort(this.members); Iterator var1 = this.members.iterator(); while(var1.hasNext()) { org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroupMember member = (org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroupMember)var1.next(); if (this.lifecycleBeans.containsKey(member.name)) { org.springframework.context.support.DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); } } } } }}
这是spring对Lifecycle接口默认的执行类。将所有的Lifecycle实现bean转换成一个LifecycleGroup。通过phases参数进行排序,按照顺序执行。
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(EurekaServerConfiguration.class)public @interface EnableEurekaServer { }
【分析一】: EnableEurekaServer 类上有个 @Import 注解,引用了一个 class 文件,由此我们进入观察;
加载了很多全局的bean,涉及:PeerEurekaNodes,DefaultEurekaServerContext,PeerAwareInstanceRegistry
public class EurekaServerConfiguration extends WebMvcConfigurerAdapter { private static String[] EUREKA_PACKAGES = new String[]{ "com.netflix.discovery", "com.netflix.eureka"}; @Autowired private ApplicationInfoManager applicationInfoManager; @Autowired private EurekaServerConfig eurekaServerConfig; @Autowired private EurekaClientConfig eurekaClientConfig; @Autowired private EurekaClient eurekaClient; @Autowired private InstanceRegistryProperties instanceRegistryProperties; public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson(); public ServerCodecs serverCodecs() { return new EurekaServerConfiguration.CloudServerCodecs(this.eurekaServerConfig); } private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) { CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName()); return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec; } private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) { CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName()); return codec == null ? CodecWrappers.getCodec(XStreamXml.class) : codec; } @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } @Bean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new PeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } @Bean public FilterRegistrationBean jerseyFilterRegistration(Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(2147483647); bean.setUrlPatterns(Collections.singletonList("/eureka/*")); return bean; } @Bean public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment); provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); Set> classes = new HashSet(); String[] var5 = EUREKA_PACKAGES; int var6 = var5.length; for(int var7 = 0; var7 < var6; ++var7) { String basePackage = var5[var7]; Set beans = provider.findCandidateComponents(basePackage); Iterator var10 = beans.iterator(); while(var10.hasNext()) { BeanDefinition bd = (BeanDefinition)var10.next(); Class cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } Map propsAndFeatures = new HashMap(); propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; } @Bean public FilterRegistrationBean traceFilterRegistration(@Qualifier("webRequestLoggingFilter") Filter filter) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(filter); bean.setOrder(2147483637); return bean; } @Configuration protected static class EurekaServerConfigBeanConfiguration { protected EurekaServerConfigBeanConfiguration() { } @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); if (clientConfig.shouldRegisterWithEureka()) { server.setRegistrySyncRetries(5); } return server; } }}
涉及
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); }
public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); //更新节点信息 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // 注释:间隔 600000 毫秒,即 10分钟 间隔执行一次服务集群数据同步; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: " + node.getServiceUrl()); } } 【分析四】: 然后断点放走放下走,进入 initialize 方法中 registry.init(peerEurekaNodes); @Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; // 注释:初始化 Eureka Server 响应缓存,默认缓存时间为30s initializedResponseCache(); // 注释:定时任务,多久重置一下心跳阈值,900000 毫秒,即 15分钟 的间隔时间,会重置心跳阈值 scheduleRenewalThresholdUpdateTask(); // 注释:初始化远端注册 initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } } // registry.syncUp() @Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; } registry.openForTraffic(applicationInfoManager, registryCount);@Overridepublic void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 注释:每30秒续约一次,那么每分钟续约就是2次,所以才是 count * 2 的结果; this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got " + count + " instances from neighboring DS node"); logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 注释:修改 Eureka Server 为上电状态,就是说设置 Eureka Server 已经处于活跃状态了,那就是意味着 EurekaServer 基本上说可以正常使用了; applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 注释:定时任务,60000 毫秒,即 1分钟 的间隔时间,Eureke Server定期进行失效节点的清理 super.postInit();} //Updating port to 8761
EurekaDiscoveryClientConfiguration实现接口SmartLifecycle,
public void start() { if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) { this.instanceConfig.setNonSecurePort(this.port.get()); } if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) { this.maybeInitializeClient(); if (log.isInfoEnabled()) { log.info("Registering application " + this.instanceConfig.getAppname() + " with eureka with status " + this.instanceConfig.getInitialStatus()); } this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()); if (this.healthCheckHandler != null) { this.eurekaClient.registerHealthCheck(this.healthCheckHandler); } //发出一个消息:InstanceRegisteredEvent。 this.context.publishEvent(new InstanceRegisteredEvent(this, this.instanceConfig)); this.running.set(true); } } //注释:启动成功之后的消息@EventListener({EmbeddedServletContainerInitializedEvent.class}) public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) { int localPort = event.getEmbeddedServletContainer().getPort(); if (this.port.get() == 0) { log.info("Updating port to " + localPort); this.port.compareAndSet(0, localPort); this.start(); } }
1、初始化Eureka环境,Eureka上下文;2、初始化EurekaServer的缓存;3、启动了一些定时任务,比如充值心跳阈值定时任务,清理失效节点定时任务;4、注册节点,同步节点;5、更新EurekaServer上电状态,更新EurekaServer端口;