博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spring-cloud-eureka
阅读量:4207 次
发布时间:2019-05-26

本文共 23734 字,大约阅读时间需要 79 分钟。

概要

1.启动原理2.数据同步

1.启动原理

1.1启动日志
先跑一下microservice-discovery-eureka的EurekaApplication。可以看到启动日志:[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap   : Setting the eureka configuration..[     main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application unknown with eureka with status UP[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap   : Eureka data center value eureka.datacenter is not set, defaulting to default[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap   : Eureka environment value eureka.environment is not set, defaulting to test[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap   : isAws returned false[Thread-22] o.s.c.n.e.server.EurekaServerBootstrap   : Initialized server context[Thread-22] c.n.e.r.PeerAwareInstanceRegistryImpl    : Got 1 instances from neighboring DS node[Thread-22] c.n.e.r.PeerAwareInstanceRegistryImpl    : Renew threshold is: 1[Thread-22] c.n.e.r.PeerAwareInstanceRegistryImpl    : Changing status to UP[Thread-22] e.s.EurekaServerInitializerConfiguration : Started Eureka Server[     main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http)[     main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761[     main] c.itmuch.cloud.study.EurekaApplication   : Started EurekaApplication in 6.011 seconds (JVM running for 8.163)【分析】可以发现EurekaServerBootstrap有个日志“Setting the eureka configuration..”。
1.2 EurekaServerBootstrap
【分析二】contextInitialized进行了2个内容的初始化。public void contextInitialized(ServletContext context) {    try {        this.initEurekaEnvironment();        this.initEurekaServerContext();        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);    } catch (Throwable var3) {        log.error("Cannot bootstrap eureka server :", var3);        throw new RuntimeException("Cannot bootstrap eureka server :", var3);    }}【分析一】:被打印出来的“Setting the eureka configuration..”protected void initEurekaEnvironment() throws Exception {    log.info("Setting the eureka configuration..");    String dataCenter = ConfigurationManager.getConfigInstance().getString("eureka.datacenter");    if (dataCenter == null) {        log.info("Eureka data center value eureka.datacenter is not set, defaulting to default");        ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", "default");    } else {        ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", dataCenter);    }    String environment = ConfigurationManager.getConfigInstance().getString("eureka.environment");    if (environment == null) {        ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", "test");        log.info("Eureka environment value eureka.environment is not set, defaulting to test");    } else {        ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", environment);    }}protected void initEurekaServerContext() throws Exception {    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);    if (this.isAws(this.applicationInfoManager.getInfo())) {        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);        this.awsBinder.start();    }    EurekaServerContextHolder.initialize(this.serverContext);    log.info("Initialized server context");    int registryCount = this.registry.syncUp();    this.registry.openForTraffic(this.applicationInfoManager, registryCount);    EurekaMonitors.registerAllStats();}

这里写图片描述

通过类图,可以看到EurekaServerInitializerConfiguration中有EurekaServerBootstrap的聚合。
这里写图片描述

1.3 EurekaServerInitializerConfiguration

这里写图片描述

【分析一】:通过接口ServletContextAware和SmartLifecycle。实现和spring cloud的结合。public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {【分析二】:通过ServletContextAware接口,注入tomcat的ServletContext对象。public void setServletContext(ServletContext servletContext) {    this.servletContext = servletContext;}【分析三】:通过SmartLifecycle接口,Spring容器初始化该bean时会调用相应生命周期方法start(),触发了Eureka的启动。public void start() {    (new Thread(new Runnable() {        public void run() {            try {                【分析四】:EurekaServerBootstrap的contextInitialized                EurekaServerInitializerConfiguration.this.eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);                EurekaServerInitializerConfiguration.log.info("Started Eureka Server");                EurekaServerInitializerConfiguration.this.publish(new EurekaRegistryAvailableEvent(EurekaServerInitializerConfiguration.this.getEurekaServerConfig()));                EurekaServerInitializerConfiguration.this.running = true;                EurekaServerInitializerConfiguration.this.publish(new EurekaServerStartedEvent(EurekaServerInitializerConfiguration.this.getEurekaServerConfig()));            } catch (Exception var2) {                EurekaServerInitializerConfiguration.log.error("Could not initialize Eureka servlet context", var2);            }        }    })).start();}【分析五】:调用了2次publish方法。EurekaRegistryAvailableEvent和EurekaRegistryAvailableEvent,没有看到监听的,可能是开放给开发者自定义的。使用方法如下:@Componentpublic class EurekaStateChangeListener {@EventListenerpublic void listen(EurekaInstanceCanceledEvent eurekaInstanceCanceledEvent) {    //服务断线事件    String appName = eurekaInstanceCanceledEvent.getAppName();    String serverId = eurekaInstanceCanceledEvent.getServerId();    System.out.println(appName);    System.out.println(serverId);}@EventListenerpublic void listen(EurekaInstanceRegisteredEvent event) {    InstanceInfo instanceInfo = event.getInstanceInfo();    System.out.println(instanceInfo);}@EventListenerpublic void listen(EurekaInstanceRenewedEvent event) {    event.getAppName();    event.getServerId();}@EventListenerpublic void listen(EurekaRegistryAvailableEvent event) {}@EventListenerpublic void listen(EurekaServerStartedEvent event) {    //Server启动}}
1.4 DefaultLifecycleProcessor
public class DefaultLifecycleProcessor implements LifecycleProcessor, BeanFactoryAware {    public void setBeanFactory(BeanFactory beanFactory) {        Assert.isInstanceOf(ConfigurableListableBeanFactory.class, beanFactory);        this.beanFactory = (ConfigurableListableBeanFactory)beanFactory;    }    public void start() {        this.startBeans(false);        this.running = true;    }    private void startBeans(boolean autoStartupOnly) {        Map
lifecycleBeans = this.getLifecycleBeans(); Map
phases = new HashMap(); Iterator var4 = lifecycleBeans.entrySet().iterator(); while(true) { Map.Entry entry; Lifecycle bean; do { if (!var4.hasNext()) { if (!phases.isEmpty()) { List
keys = new ArrayList(phases.keySet()); Collections.sort(keys); Iterator var10 = keys.iterator(); while(var10.hasNext()) { Integer key = (Integer)var10.next(); ((org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup)phases.get(key)).start(); 【分析】核心每个LifecycleGroup的start执行方法。 } } return; } entry = (Map.Entry)var4.next(); bean = (Lifecycle)entry.getValue(); } while(autoStartupOnly && (!(bean instanceof SmartLifecycle) || !((SmartLifecycle)bean).isAutoStartup())); int phase = this.getPhase(bean); org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup group = (org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup)phases.get(phase); if (group == null) { group = new org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly); phases.put(phase, group); } group.add((String)entry.getKey(), bean); } } private void doStart(Map
lifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName); if (bean != null && !this.equals(bean)) { String[] dependenciesForBean = this.beanFactory.getDependenciesForBean(beanName); String[] var6 = dependenciesForBean; int var7 = dependenciesForBean.length; for(int var8 = 0; var8 < var7; ++var8) {
String dependency = var6[var8]; this.doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) { if (this.logger.isDebugEnabled()) { this.logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]"); } try { bean.start(); } catch (Throwable var10) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10); } if (this.logger.isDebugEnabled()) { this.logger.debug("Successfully started bean '" + beanName + "'"); } } } } //获取所有的Lifecycle的bean protected Map
getLifecycleBeans() { return beans; } private class LifecycleGroupMember implements Comparable
{ private final String name; private final Lifecycle bean; LifecycleGroupMember(String name, Lifecycle bean) { this.name = name; this.bean = bean; } //每个bean的getPhase,根据权重优先执行。 public int compareTo(org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroupMember other) { int thisOrder = org.springframework.context.support.DefaultLifecycleProcessor.this.getPhase(this.bean); int otherOrder = org.springframework.context.support.DefaultLifecycleProcessor.this.getPhase(other.bean); return thisOrder == otherOrder ? 0 : (thisOrder < otherOrder ? -1 : 1); } } private class LifecycleGroup { private final List
members = new ArrayList(); private final int phase; private final long timeout; private final Map
lifecycleBeans; private final boolean autoStartupOnly; private volatile int smartMemberCount; public LifecycleGroup(int var1, long phase, Map
timeout, boolean lifecycleBeans) { this.phase = phase; this.timeout = timeout; this.lifecycleBeans = lifecycleBeans; this.autoStartupOnly = autoStartupOnly; } public void add(String name, Lifecycle bean) { if (bean instanceof SmartLifecycle) { ++this.smartMemberCount; } this.members.add(org.springframework.context.support.DefaultLifecycleProcessor.this.new LifecycleGroupMember(name, bean)); } public void start() { if (!this.members.isEmpty()) { if (org.springframework.context.support.DefaultLifecycleProcessor.this.logger.isInfoEnabled()) { org.springframework.context.support.DefaultLifecycleProcessor.this.logger.info("Starting beans in phase " + this.phase); } Collections.sort(this.members); Iterator var1 = this.members.iterator(); while(var1.hasNext()) { org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroupMember member = (org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroupMember)var1.next(); if (this.lifecycleBeans.containsKey(member.name)) { org.springframework.context.support.DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); } } } } }}
这是spring对Lifecycle接口默认的执行类。将所有的Lifecycle实现bean转换成一个LifecycleGroup。通过phases参数进行排序,按照顺序执行。
1.5 EnableEurekaServer
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(EurekaServerConfiguration.class)public @interface EnableEurekaServer {
}

【分析一】: EnableEurekaServer 类上有个 @Import 注解,引用了一个 class 文件,由此我们进入观察;

1.6 EurekaServerConfiguration
加载了很多全局的bean,涉及:PeerEurekaNodes,DefaultEurekaServerContext,PeerAwareInstanceRegistry
public class EurekaServerConfiguration extends WebMvcConfigurerAdapter {
private static String[] EUREKA_PACKAGES = new String[]{
"com.netflix.discovery", "com.netflix.eureka"}; @Autowired private ApplicationInfoManager applicationInfoManager; @Autowired private EurekaServerConfig eurekaServerConfig; @Autowired private EurekaClientConfig eurekaClientConfig; @Autowired private EurekaClient eurekaClient; @Autowired private InstanceRegistryProperties instanceRegistryProperties; public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson(); public ServerCodecs serverCodecs() { return new EurekaServerConfiguration.CloudServerCodecs(this.eurekaServerConfig); } private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) { CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName()); return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec; } private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) { CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName()); return codec == null ? CodecWrappers.getCodec(XStreamXml.class) : codec; } @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } @Bean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new PeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } @Bean public FilterRegistrationBean jerseyFilterRegistration(Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(2147483647); bean.setUrlPatterns(Collections.singletonList("/eureka/*")); return bean; } @Bean public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment); provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); Set
> classes = new HashSet(); String[] var5 = EUREKA_PACKAGES; int var6 = var5.length; for(int var7 = 0; var7 < var6; ++var7) { String basePackage = var5[var7]; Set
beans = provider.findCandidateComponents(basePackage); Iterator var10 = beans.iterator(); while(var10.hasNext()) { BeanDefinition bd = (BeanDefinition)var10.next(); Class
cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } Map
propsAndFeatures = new HashMap(); propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; } @Bean public FilterRegistrationBean traceFilterRegistration(@Qualifier("webRequestLoggingFilter") Filter filter) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(filter); bean.setOrder(2147483637); return bean; } @Configuration protected static class EurekaServerConfigBeanConfiguration {
protected EurekaServerConfigBeanConfiguration() { } @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); if (clientConfig.shouldRegisterWithEureka()) { server.setRegistrySyncRetries(5); } return server; } }}
DefaultEurekaServerContext初始化
涉及
@PostConstruct    @Override    public void initialize() throws Exception {        logger.info("Initializing ...");        peerEurekaNodes.start();        registry.init(peerEurekaNodes);        logger.info("Initialized");    }
peerEurekaNodes初始化
public void start() {        taskExecutor = Executors.newSingleThreadScheduledExecutor(                new ThreadFactory() {                    @Override                    public Thread newThread(Runnable r) {                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");                        thread.setDaemon(true);                        return thread;                    }                }        );        try {            updatePeerEurekaNodes(resolvePeerUrls());            //更新节点信息            Runnable peersUpdateTask = new Runnable() {                @Override                public void run() {                    try {                        updatePeerEurekaNodes(resolvePeerUrls());                    } catch (Throwable e) {                        logger.error("Cannot update the replica Nodes", e);                    }                }            };            // 注释:间隔 600000 毫秒,即 10分钟 间隔执行一次服务集群数据同步;            taskExecutor.scheduleWithFixedDelay(                    peersUpdateTask,                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),                    TimeUnit.MILLISECONDS            );        } catch (Exception e) {            throw new IllegalStateException(e);        }        for (PeerEurekaNode node : peerEurekaNodes) {            logger.info("Replica node URL:  " + node.getServiceUrl());        }    }    【分析四】: 然后断点放走放下走,进入 initialize 方法中 registry.init(peerEurekaNodes);    @Override    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {        this.numberOfReplicationsLastMin.start();        this.peerEurekaNodes = peerEurekaNodes;        // 注释:初始化 Eureka Server 响应缓存,默认缓存时间为30s        initializedResponseCache();        // 注释:定时任务,多久重置一下心跳阈值,900000 毫秒,即 15分钟 的间隔时间,会重置心跳阈值        scheduleRenewalThresholdUpdateTask();        // 注释:初始化远端注册        initRemoteRegionRegistry();        try {            Monitors.registerObject(this);        } catch (Throwable e) {            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);        }    }    //    registry.syncUp()    @Override    public int syncUp() {        // Copy entire entry from neighboring DS node        int count = 0;        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {            if (i > 0) {                try {                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());                } catch (InterruptedException e) {                    logger.warn("Interrupted during registry transfer..");                    break;                }            }            Applications apps = eurekaClient.getApplications();            for (Application app : apps.getRegisteredApplications()) {                for (InstanceInfo instance : app.getInstances()) {                    try {                        if (isRegisterable(instance)) {                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);                            count++;                        }                    } catch (Throwable t) {                        logger.error("During DS init copy", t);                    }                }            }        }        return count;    }    registry.openForTraffic(applicationInfoManager, registryCount);@Overridepublic void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.    // 注释:每30秒续约一次,那么每分钟续约就是2次,所以才是 count * 2 的结果;    this.expectedNumberOfRenewsPerMin = count * 2;    this.numberOfRenewsPerMinThreshold =            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());    logger.info("Got " + count + " instances from neighboring DS node");    logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);    this.startupTime = System.currentTimeMillis();    if (count > 0) {        this.peerInstancesTransferEmptyOnStartup = false;    }    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();    boolean isAws = Name.Amazon == selfName;    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {        logger.info("Priming AWS connections for all replicas..");        primeAwsReplicas(applicationInfoManager);    }    logger.info("Changing status to UP");    // 注释:修改 Eureka Server 为上电状态,就是说设置 Eureka Server 已经处于活跃状态了,那就是意味着 EurekaServer 基本上说可以正常使用了;    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);    // 注释:定时任务,60000 毫秒,即 1分钟 的间隔时间,Eureke Server定期进行失效节点的清理    super.postInit();}    //Updating port to 8761
启动成功:“Updating port to 8761”
EurekaDiscoveryClientConfiguration实现接口SmartLifecycle,
public void start() {        if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) {            this.instanceConfig.setNonSecurePort(this.port.get());        }        if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) {            this.maybeInitializeClient();            if (log.isInfoEnabled()) {                log.info("Registering application " + this.instanceConfig.getAppname() + " with eureka with status " + this.instanceConfig.getInitialStatus());            }            this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus());            if (this.healthCheckHandler != null) {                this.eurekaClient.registerHealthCheck(this.healthCheckHandler);            }            //发出一个消息:InstanceRegisteredEvent。            this.context.publishEvent(new InstanceRegisteredEvent(this, this.instanceConfig));            this.running.set(true);        }    }    //注释:启动成功之后的消息@EventListener({EmbeddedServletContainerInitializedEvent.class})    public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) {        int localPort = event.getEmbeddedServletContainer().getPort();        if (this.port.get() == 0) {            log.info("Updating port to " + localPort);            this.port.compareAndSet(0, localPort);            this.start();        }    }
EurekaServer 启动内容
1、初始化Eureka环境,Eureka上下文;2、初始化EurekaServer的缓存;3、启动了一些定时任务,比如充值心跳阈值定时任务,清理失效节点定时任务;4、注册节点,同步节点;5、更新EurekaServer上电状态,更新EurekaServer端口;
你可能感兴趣的文章
【unix网络编程第三版】ubuntu端口占用问题
查看>>
【一天一道LeetCode】#120. Triangle
查看>>
【unix网络编程第三版】阅读笔记(三):基本套接字编程
查看>>
同步与异步的区别
查看>>
IT行业--简历模板及就业秘籍
查看>>
JNI简介及实例
查看>>
DOM4J使用教程
查看>>
JAVA实现文件树
查看>>
linux -8 Linux磁盘与文件系统的管理
查看>>
linux 9 -文件系统的压缩与打包 -dump
查看>>
PHP在变量前面加&是什么意思?
查看>>
ebay api - GetUserDisputes 函数
查看>>
ebay api GetMyMessages 函数
查看>>
php加速器 - zendopcache
查看>>
手动12 - 安装php加速器 Zend OPcache
查看>>
set theme -yii2
查看>>
yii2 - 模块(modules)的view 映射到theme里面
查看>>
yii2 - controller
查看>>
yii2 - 增加actions
查看>>
网站加载代码
查看>>