Path Cache
Path Cache其实就是用于对zk节点的监听。不论是子节点的新增、更新或者移除的时候,Path Cache都能对子节点集合的状态和数据变化做出响应。
1. 关键 API
org.apache.curator.framework.recipes.cache.PathChildrenCache
org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
org.apache.curator.framework.recipes.cache.ChildData
2. 机制说明
PathChildrenCache内部使用一个命令模式来封装各种操作:
- 操作接口:
org.apache.curator.framework.recipes.cache.Operation
- 刷新操作:
org.apache.curator.framework.recipes.cache.RefreshOperation
- 触发事件操作:
org.apache.curator.framework.recipes.cache.EventOperation
- 获取数据操作:
org.apache.curator.framework.recipes.cache.GetDataOperation
- 刷新操作:
而这些操作对象,都在构造器中接受PathChildrenCache
引用,这样可以在操作中,处理cache(回调):
EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event){ this.cache = cache; this.event = event;}GetDataOperation(PathChildrenCache cache, String fullPath){ this.cache = cache; this.fullPath = PathUtils.validatePath(fullPath);}RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode){ this.cache = cache; this.mode = mode;}
而这些操作,还使用了一个单线程的线程池来调用,从而形成了异步调用。
- 使用了一个
private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
来作为线程池的任务接收队列- 使用set,避免了并发情况下重复操作
- 由于单线程,使得各种操作都是按序执行的
- 所以为了避免curator的监听机制阻塞
- 在
childrenWatcher
以及dataWatcher
中,都使用异步执行命令的方式
- 在
触发操作:
void offerOperation(final Operation operation){ if ( operationsQuantizer.add(operation) ) { submitToExecutor ( new Runnable() { @Override public void run() { try { operationsQuantizer.remove(operation); operation.invoke(); } catch ( InterruptedException e ) { //We expect to get interrupted during shutdown, //so just ignore these events if ( state.get() != State.CLOSED ) { handleException(e); } Thread.currentThread().interrupt(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); handleException(e); } } } ); }}private synchronized void submitToExecutor(final Runnable command){ if ( state.get() == State.STARTED ) { executorService.submit(command); }}
- 考虑到了各种操作的中断
- 考虑到了状态
- 统一操作的异常处理
- 投递方法
submitToExecutor
使用了synchronized
- 因为可能监听器触发,所以需要对状态进行检查
- 如先关闭,然后再被某个监听器回掉,导致不必要的操作
- 而检查动作不是原子的,所以需要同步锁
- 因为可能监听器触发,所以需要对状态进行检查
3. 用法
3.1 创建
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
- cacheData
- 如果设置true,是否需要缓存数据
3.2 使用
- Cache必须在使用前调用
start()
方法- 有两个
start()
方法void start()
- 无参
void start(PathChildrenCache.StartMode mode)
- 可以通过参数,选择如何初始化
- StartMode
- NORMAL
- BUILD_INITIAL_CACHE
- POST_INITIALIZED_EVENT
- 有两个
- 使用完成后需要调用
close()
方法 - 任何时候,调用
getCurrentData()
都可以得到状态信息 - 可以添加监听器,当数据发生变动时回调执行
public void addListener(PathChildrenCacheListener listener)
4. 错误处理
PathChildrenCache实例会通过ConnectionStateListener
监听链接状态。 如果链接状态发生变化,缓存会被重置(PathChildrenCacheListener
会受到一个RESET
事件)
5. 源码分析
5.1 类定义
public class PathChildrenCache implements Closeable{}
- 实现了
java.io.Closeable
接口
5.2 成员变量
public class PathChildrenCache implements Closeable{ private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final String path; private final CloseableExecutorService executorService; private final boolean cacheData; private final boolean dataIsCompressed; private final ListenerContainerlisteners = new ListenerContainer (); private final ConcurrentMap currentData = Maps.newConcurrentMap(); private final AtomicReference
- log
- client
- path
- 缓存对应的zk节点路径
- executorService
org.apache.curator.utils.CloseableExecutorService
- 线程池
- 用以执行各种操作
- 参见第2章节
- cacheData
- 是否需要缓存数据
- dataIsCompressed
- 数据是否已压缩
- listeners
org.apache.curator.framework.listen.ListenerContainer
- 监听器容器(管理多个监听器)
- 业务监听器
- 可以添加自己的监听器
- currentData
java.util.concurrent.ConcurrentMap
- 当前数据
<String, ChildData>
- 存放着多个
org.apache.curator.framework.recipes.cache.ChildData
- initialSet
AtomicReference
- 初始化集合
- 放置节点,以此来跟踪各个节点是否初始化
- 如果全部节点都初始化完成,则会触发
PathChildrenCacheEvent.Type.INITIALIZED
事件
- 如果全部节点都初始化完成,则会触发
- operationsQuantizer
- 相当于线程池的任务接收队列
- state
- 状态
AtomicReference
- ensureContainers
org.apache.curator.framework.EnsureContainers
- 可以线程安全的创建path节点
- State
- 内部枚举
- LATENT
- STARTED
- CLOSED
- 内部枚举
- NULL_CHILD_DATA
- 私有常量
- 空数据节点
- USE_EXISTS
- 私有常量
- 使用系统配置中
curator-path-children-cache-use-exists
的值
- childrenWatcher
- volatile
- 子节点变动的监听器
- dataWatcher
- volatile
- 数据变动监听器
- rebuildTestExchanger
java.util.concurrent.Exchanger
- 用于并发线程间传值
- 在重建缓存时通过此对象传递一个信号对象
- 用于测试
- connectionStateListener
- 链接状态监听器
- defaultThreadFactory
- 线程工厂
5.3 构造器
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode){ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory){ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){ this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory){ this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory){ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService){ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService){ this.client = client; this.path = PathUtils.validatePath(path); this.cacheData = cacheData; this.dataIsCompressed = dataIsCompressed; this.executorService = executorService; ensureContainers = new EnsureContainers(client, path);}
有7个构造器,最终都是调用最后一个。不过从中也可以看出:
- 默认使用
newSingleThreadExecutor
单线程线程池 - 默认不对数据进行压缩处理
5.4 启动
缓存在使用前需要调用start()
public enum StartMode { NORMAL, BUILD_INITIAL_CACHE, POST_INITIALIZED_EVENT }public void start() throws Exception{ start(StartMode.NORMAL);}@Deprecatedpublic void start(boolean buildInitial) throws Exception{ start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);}public void start(StartMode mode) throws Exception{ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); mode = Preconditions.checkNotNull(mode, "mode cannot be null"); client.getConnectionStateListenable().addListener(connectionStateListener); switch ( mode ) { case NORMAL: { offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); break; } case BUILD_INITIAL_CACHE: { rebuild(); break; } case POST_INITIALIZED_EVENT: { initialSet.set(Maps.newConcurrentMap()); offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED)); break; } }}private void processChildren(List children, RefreshMode mode) throws Exception{ Set removedNodes = Sets.newHashSet(currentData.keySet()); for ( String child : children ) { removedNodes.remove(ZKPaths.makePath(path, child)); } for ( String fullPath : removedNodes ) { remove(fullPath); } for ( String name : children ) { String fullPath = ZKPaths.makePath(path, name); if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) ) { getDataAndStat(fullPath); } updateInitialSet(name, NULL_CHILD_DATA); } maybeOfferInitializedEvent(initialSet.get());}
- 无参的
start()
- 默认使用
StartMode.NORMAL
策略
- 默认使用
- 不建议使用的
start(boolean buildInitial)
- true
- 使用
StartMode.BUILD_INITIAL_CACHE
策略
- 使用
- false
- 使用
StartMode.NORMAL
策略
- 使用
- true
- 启动时添加了链接状态的监听器
可以看到启动过程有三种策略:
NORMAL
模式- 执行刷新命令
org.apache.curator.framework.recipes.cache.RefreshOperation
(命令模式)- 使用
RefreshMode.STANDARD
刷新模式 - 调用
org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh
方法- 调用
org.apache.curator.framework.EnsureContainers#ensure
创建节点 - 在节点上添加
childrenWatcher
监听器 - 回调触发
org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren
进行刷新- 清理掉已缓存在本地的数据中的其他节点
- 筛选出不是本cache的数据节点
- 从本地初始集合中清理掉
- 如果缓存节点还没用同步到本地,或者指定为
RefreshMode.FORCE_GET_DATA_AND_STAT
模式- 则立即同步节点数据与状态
- 如果不需要缓存数据,则只检查节点是否存在(只缓存节点以及状态,不含数据)
- 否则读取数据(如果需要解压则解压数据)并构建
ChildData
缓存- 新数据放入
currentData
- 根据情况触发事件(唤起监听器)
PathChildrenCacheEvent.Type.CHILD_ADDED
事件PathChildrenCacheEvent.Type.CHILD_UPDATED
事件
- 更新
initialSet
数据(将未同步的NULL_CHILD_DATA
数据替换成读取的数据)
- 新数据放入
- 则立即同步节点数据与状态
- 更新
initialSet
- 如果
initialSet
的Map不为空NORMAL
模式下,这里为空- 可以参见
POST_INITIALIZED_EVENT
模式
- 如果
- 清理掉已缓存在本地的数据中的其他节点
- 调用
- 使用
- 执行刷新命令
BUILD_INITIAL_CACHE
模式- 调用
rebuild
方法(此方法会阻塞执行)- 重新查询所有需要的数据
- 不会触发任何事件
- 安全创建path
- 清空
currentData
缓存 - 重新加载path下子节点,逐个结点重构缓存
- 逐个读取节点数据和状态
- 构建
ChildData
放入currentData
- 通过
rebuildTestExchanger
发送要给信号对象
- 调用
POST_INITIALIZED_EVENT
模式- 初始化
initialSet
- 以
RefreshMode.POST_INITIALIZED
模式刷新缓存- 参见
NORMAL
模式,但不同的是- 更新
initialSet
时- 如果
initialSet
的Map不为空POST_INITIALIZED_EVENT
模式下,这里已经初始化了Map
- 如果
initialSet
中的数据都已经同步完成(都不等于NULL_CHILD_DATA
)- 将
initialSet
制空 - 触发
PathChildrenCacheEvent.Type.INITIALIZED
事件
- 将
- 如果
- 更新
- 参见
- 初始化
5.5 节点发生变化
在启动start()
已经给path
上增加了一个监听器childrenWatcher
private volatile Watcher childrenWatcher = new Watcher(){ @Override public void process(WatchedEvent event) { offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD)); }};
- 以
RefreshMode.STANDARD
模式刷新缓存- 会对本地的缓存数据和zk节点做比较
- 只是处理新的缓存数据
- 注意操作的参数
PathChildrenCache.this
this
不同了
5.6 数据发生变化
在每次获取缓存数据时(getDataAndStat
方法),在每个缓存上添加了监听器dataWatcher
:
private volatile Watcher dataWatcher = new Watcher(){ @Override public void process(WatchedEvent event) { try { if ( event.getType() == Event.EventType.NodeDeleted ) { remove(event.getPath()); } else if ( event.getType() == Event.EventType.NodeDataChanged ) { offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath())); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); handleException(e); } }};
- 节点删除时
- 清理缓存
- 触发
PathChildrenCacheEvent.Type.CHILD_REMOVED
事件
- 数据发生变化时
- 执行
GetDataOperation
操作- 也就是再次执行
getDataAndStat
方法
- 也就是再次执行
- 执行
- 注意操作的参数
PathChildrenCache.this
this
不同了
5.7 获取当前数据
public ListgetCurrentData(){ return ImmutableList.copyOf(Sets. newTreeSet(currentData.values()));}public ChildData getCurrentData(String fullPath){ return currentData.get(fullPath);}
都是从本地数据中获取
5.8 清理
5.8.1 清理缓存
public void clear(){ currentData.clear();}public void clearAndRefresh() throws Exception{ currentData.clear(); offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));}
清空本地数据
如果需要则使用RefreshMode.STANDARD
模式,刷新
5.8.2 清理缓存数据
public void clearDataBytes(String fullPath){ clearDataBytes(fullPath, -1);}public boolean clearDataBytes(String fullPath, int ifVersion){ ChildData data = currentData.get(fullPath); if ( data != null ) { if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) ) { if ( data.getData() != null ) { currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null)); } return true; } } return false;}
保留缓存信息,但是数据部分制空
5.9 链接状态变化
在启动时(start()
)中为链接添加了connectionStateListener
监听器:
private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener(){ @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); }};private void handleStateChange(ConnectionState newState){ switch ( newState ) { case SUSPENDED: { offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null))); break; } case LOST: { offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null))); break; } case CONNECTED: case RECONNECTED: { try { offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT)); offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null))); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); handleException(e); } break; } }}
主要都是根据链接状态,触发不同的操作,以及触发业务监听器来执行。
- 由于数据都是缓存,所以在链接丢失,中断时,仅仅时触发事件,并没有将数据置为不可用
- 当链接建立
CONNECTED
,以及恢复时RECONNECTED
都触发了一次RefreshMode.FORCE_GET_DATA_AND_STAT
模式的刷新操作。
5.10 关闭
在使用完之后,需要调用close()
方法:
public void close() throws IOException{ if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { client.getConnectionStateListenable().removeListener(connectionStateListener); listeners.clear(); executorService.close(); client.clearWatcherReferences(childrenWatcher); client.clearWatcherReferences(dataWatcher); // TODO // This seems to enable even more GC - I'm not sure why yet - it // has something to do with Guava's cache and circular references connectionStateListener = null; childrenWatcher = null; dataWatcher = null; }}
- 原子操作,将状态更新为
CLOSED
- 移除链接状态监听器
- 清空业务监听器
- 关闭线程池
- 清空节点监听器
- 清空数据监听器
6. 小结
PathChildrenCache虽然名字带有Cache。 但其实并不是一个完整的缓存。
应该说,它仅仅是对path下诸多节点进行统一的管理。 当这些节点发生变动,或者数据发生变化时,都可以被PathChildrenCache发现,并同步到本地Map中。以此来达到一个缓存的概念。
从API中也能发现,它只能获取数据。至于放置缓存,则需要另外实现。
- 其实也简单,直接向path下新建节点并写入数据就行
可以通过getListenable().addListener(listener);
添加自定义监听器,从而实现对缓存进行更细致的控制。
7. 示例
这里可以参考