Spring Cloud Eureka源码分析之三级缓存的设计原理及源码分析
Eureka Server 为了提供响应效率,提供了两层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中,实现原理如下图所示。
第一层缓存:readOnlyCacheMap,本质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认时间为 30 秒。
readOnlyCacheMap : 是一个 CurrentHashMap 只读缓存,这个主要是为了供客户端获取注册信息时使用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。
第二层缓存:readWriteCacheMap,本质上是 Guava 缓存。
readWriteCacheMap:readWriteCacheMap 的数据主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。
readWriteCacheMap 缓存过期时间,默认为 180 秒,当服务下线、过期、注册、状态变更,都会来清除此缓存中的数据。
Eureka Client 获取全量或者增量的数据时,会先从一级缓存中获取;如果一级缓存中不存在,再从二级缓存中获取;如果二级缓存也不存在,这时候先将存储层的数据同步到缓存中,再从缓存中获取。
通过 Eureka Server 的二层缓存机制,可以非常有效地提升 Eureka Server 的响应时间,通过数据存储层和缓存层的数据切割,根据使用场景来提供不同的数据支持。
多级缓存的意义#
这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。
而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。
获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存
- responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为30秒
- responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒 。
缓存初始化#
readWriteCacheMap使用的是LoadingCache对象,它是guava中提供的用来实现内存缓存的一个api。创建方式如下
LoadingCache
re>cache = CacheBuilder.newBuilder() //缓存池大小,在缓存项接近该大小时, Guava开始回收旧的缓存项 .maximumSize(10000) //设置时间对象没有被读/写访问则对象从内存中删除(在另外的线程里面不定期维护) .expireAfterAccess(10, TimeUnit.MINUTES) //移除监听器,缓存项被移除时会触发 .removalListener(new RemovalListener () { @Override public void onRemoval(RemovalNotification rn) { //执行逻辑操作 } }) .recordStats()//开启Guava Cache的统计功能 .build(new CacheLoader() { @Override public Object load(String key) { //从 SQL或者NoSql 获取对象 } });//CacheLoader类 实现自动加载 其中,CacheLoader是用来实现缓存自动加载的功能,当触发
readWriteCacheMap.get(key)
方法时,就会回调CacheLoader.load
方法,根据key去服务注册信息中去查找实例数据进行缓存ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener
re>() { @Override public void onRemoval(RemovalNotification notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); //注意这里 return value; } }); 而缓存的加载,是基于
generatePayload
方法完成的,代码如下。private Value generatePayload(Key key) { Stopwatch tracer = null; try { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } } else { tracer = serializeOneApptimer.start(); payload = getPayLoad(key, registry.getApplication(key.getName())); } break; case VIP: case SVIP: tracer = serializeViptimer.start(); payload = getPayLoad(key, getApplicationsForVip(key, registry)); break; default: logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType()); payload = ""; break; } return new Value(payload); } finally { if (tracer != null) { tracer.stop(); } } }
re>此方法接受一个
Key
类型的参数,返回一个Value
类型。 其中Key
中重要的字段有:
KeyType
,表示payload文本格式,有 JSON 和 XML 两种值。EntityType
,表示缓存的类型,有Application
,VIP
,SVIP
三种值。entityName
,表示缓存的名称,可能是单个应用名,也可能是ALL_APPS
或ALL_APPS_DELTA
。
Value
则有一个 String
类型的payload和一个 byte
数组,表示gzip压缩后的字节。
缓存同步#
在ResponseCacheImpl
这个类的构造实现中,初始化了一个定时任务,这个定时任务每个
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { //省略... if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } }
re>默认每30s从readWriteCacheMap更新有差异的数据同步到readOnlyCacheMap中
private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { //遍历只读集合 if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { //判断差异信息,如果有差异,则更新 readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }
re>缓存失效#
在AbstractInstanceRegistry.register这个方法中,当完成服务信息保存后,会调用
invalidateCache
失效缓存public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { //.... invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); //.... }
re>最终调用ResponseCacheImpl.invalidate方法,完成缓存的失效机制
public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection
re>keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } } 版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
Mic带你学架构
!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!