博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[Curator] Path Cache 的使用与分析
阅读量:5983 次
发布时间:2019-06-20

本文共 16306 字,大约阅读时间需要 54 分钟。

  hot3.png

Path Cache

Path Cache其实就是用于对zk节点的监听。不论是子节点的新增、更新或者移除的时候,Path Cache都能对子节点集合的状态和数据变化做出响应。

1. 关键 API

org.apache.curator.framework.recipes.cache.PathChildrenCache

org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent

org.apache.curator.framework.recipes.cache.PathChildrenCacheListener

org.apache.curator.framework.recipes.cache.ChildData

2. 机制说明

PathChildrenCache内部使用一个命令模式来封装各种操作:

  • 操作接口:org.apache.curator.framework.recipes.cache.Operation
    • 刷新操作:org.apache.curator.framework.recipes.cache.RefreshOperation
    • 触发事件操作:org.apache.curator.framework.recipes.cache.EventOperation
    • 获取数据操作:org.apache.curator.framework.recipes.cache.GetDataOperation

而这些操作对象,都在构造器中接受PathChildrenCache引用,这样可以在操作中,处理cache(回调):

EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event){    this.cache = cache;    this.event = event;}GetDataOperation(PathChildrenCache cache, String fullPath){    this.cache = cache;    this.fullPath = PathUtils.validatePath(fullPath);}RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode){    this.cache = cache;    this.mode = mode;}

而这些操作,还使用了一个单线程的线程池来调用,从而形成了异步调用。

  • 使用了一个private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());来作为线程池的任务接收队列
    • 使用set,避免了并发情况下重复操作
    • 由于单线程,使得各种操作都是按序执行的
  • 所以为了避免curator的监听机制阻塞
    • childrenWatcher以及dataWatcher中,都使用异步执行命令的方式

触发操作:

void offerOperation(final Operation operation){    if ( operationsQuantizer.add(operation) )    {        submitToExecutor        (            new Runnable()            {                @Override                public void run()                {                    try                    {                        operationsQuantizer.remove(operation);                        operation.invoke();                    }                    catch ( InterruptedException e )                    {                        //We expect to get interrupted during shutdown,                        //so just ignore these events                        if ( state.get() != State.CLOSED )                        {                            handleException(e);                        }                        Thread.currentThread().interrupt();                    }                    catch ( Exception e )                    {                        ThreadUtils.checkInterrupted(e);                        handleException(e);                    }                }            }        );    }}private synchronized void submitToExecutor(final Runnable command){    if ( state.get() == State.STARTED )    {        executorService.submit(command);    }}
  • 考虑到了各种操作的中断
  • 考虑到了状态
  • 统一操作的异常处理
  • 投递方法submitToExecutor使用了synchronized
    • 因为可能监听器触发,所以需要对状态进行检查
      • 如先关闭,然后再被某个监听器回掉,导致不必要的操作
    • 而检查动作不是原子的,所以需要同步锁

3. 用法

3.1 创建

public PathChildrenCache(CuratorFramework client,                         String path,                         boolean cacheData)
  • cacheData
    • 如果设置true,是否需要缓存数据

3.2 使用

  • Cache必须在使用前调用start()方法
    • 有两个start()方法
      1. void start()
        • 无参
      2. void start(PathChildrenCache.StartMode mode)
        • 可以通过参数,选择如何初始化
        • StartMode
          • NORMAL
          • BUILD_INITIAL_CACHE
          • POST_INITIALIZED_EVENT
  • 使用完成后需要调用close()方法
  • 任何时候,调用getCurrentData()都可以得到状态信息
  • 可以添加监听器,当数据发生变动时回调执行
    • public void addListener(PathChildrenCacheListener listener)

4. 错误处理

PathChildrenCache实例会通过ConnectionStateListener监听链接状态。 如果链接状态发生变化,缓存会被重置(PathChildrenCacheListener会受到一个RESET事件)

5. 源码分析

5.1 类定义

public class PathChildrenCache implements Closeable{}
  • 实现了java.io.Closeable接口

5.2 成员变量

public class PathChildrenCache implements Closeable{    private final Logger log = LoggerFactory.getLogger(getClass());    private final CuratorFramework client;    private final String path;    private final CloseableExecutorService executorService;    private final boolean cacheData;    private final boolean dataIsCompressed;    private final ListenerContainer
listeners = new ListenerContainer
(); private final ConcurrentMap
currentData = Maps.newConcurrentMap(); private final AtomicReference
> initialSet = new AtomicReference
>(); private final Set
operationsQuantizer = Sets.newSetFromMap(Maps.
newConcurrentMap()); private final AtomicReference
state = new AtomicReference
(State.LATENT); private final EnsureContainers ensureContainers; private enum State { LATENT, STARTED, CLOSED } private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null); private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists"); private volatile Watcher childrenWatcher = new Watcher() { @Override public void process(WatchedEvent event) { offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD)); } }; private volatile Watcher dataWatcher = new Watcher() { @Override public void process(WatchedEvent event) { try { if ( event.getType() == Event.EventType.NodeDeleted ) { remove(event.getPath()); } else if ( event.getType() == Event.EventType.NodeDataChanged ) { offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath())); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); handleException(e); } } }; @VisibleForTesting volatile Exchanger
rebuildTestExchanger; private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); } }; private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");}
  • log
  • client
  • path
    • 缓存对应的zk节点路径
  • executorService
    • org.apache.curator.utils.CloseableExecutorService
    • 线程池
    • 用以执行各种操作
    • 参见第2章节
  • cacheData
    • 是否需要缓存数据
  • dataIsCompressed
    • 数据是否已压缩
  • listeners
    • org.apache.curator.framework.listen.ListenerContainer
    • 监听器容器(管理多个监听器)
    • 业务监听器
    • 可以添加自己的监听器
  • currentData
    • java.util.concurrent.ConcurrentMap
    • 当前数据
    • <String, ChildData>
    • 存放着多个org.apache.curator.framework.recipes.cache.ChildData
  • initialSet
    • AtomicReference
    • 初始化集合
    • 放置节点,以此来跟踪各个节点是否初始化
      • 如果全部节点都初始化完成,则会触发PathChildrenCacheEvent.Type.INITIALIZED事件
  • operationsQuantizer
    • 相当于线程池的任务接收队列
  • state
    • 状态
    • AtomicReference
  • ensureContainers
    • org.apache.curator.framework.EnsureContainers
    • 可以线程安全的创建path节点
  • State
    • 内部枚举
      • LATENT
      • STARTED
      • CLOSED
  • NULL_CHILD_DATA
    • 私有常量
    • 空数据节点
  • USE_EXISTS
    • 私有常量
    • 使用系统配置中curator-path-children-cache-use-exists的值
  • childrenWatcher
    • volatile
    • 子节点变动的监听器
  • dataWatcher
    • volatile
    • 数据变动监听器
  • rebuildTestExchanger
    • java.util.concurrent.Exchanger
    • 用于并发线程间传值
    • 在重建缓存时通过此对象传递一个信号对象
    • 用于测试
  • connectionStateListener
    • 链接状态监听器
  • defaultThreadFactory
    • 线程工厂

5.3 构造器

public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode){    this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory){    this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){    this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory){    this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory){    this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService){    this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));}public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService){    this.client = client;    this.path = PathUtils.validatePath(path);    this.cacheData = cacheData;    this.dataIsCompressed = dataIsCompressed;    this.executorService = executorService;    ensureContainers = new EnsureContainers(client, path);}

有7个构造器,最终都是调用最后一个。不过从中也可以看出:

  • 默认使用newSingleThreadExecutor单线程线程池
  • 默认不对数据进行压缩处理

5.4 启动

缓存在使用前需要调用start()

public enum StartMode    {        NORMAL,        BUILD_INITIAL_CACHE,        POST_INITIALIZED_EVENT    }public void start() throws Exception{    start(StartMode.NORMAL);}@Deprecatedpublic void start(boolean buildInitial) throws Exception{    start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);}public void start(StartMode mode) throws Exception{    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");    mode = Preconditions.checkNotNull(mode, "mode cannot be null");    client.getConnectionStateListenable().addListener(connectionStateListener);    switch ( mode )    {        case NORMAL:        {            offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));            break;        }        case BUILD_INITIAL_CACHE:        {            rebuild();            break;        }        case POST_INITIALIZED_EVENT:        {            initialSet.set(Maps.
newConcurrentMap()); offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED)); break; } }}private void processChildren(List
children, RefreshMode mode) throws Exception{ Set
removedNodes = Sets.newHashSet(currentData.keySet()); for ( String child : children ) { removedNodes.remove(ZKPaths.makePath(path, child)); } for ( String fullPath : removedNodes ) { remove(fullPath); } for ( String name : children ) { String fullPath = ZKPaths.makePath(path, name); if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) ) { getDataAndStat(fullPath); } updateInitialSet(name, NULL_CHILD_DATA); } maybeOfferInitializedEvent(initialSet.get());}
  • 无参的start()
    • 默认使用StartMode.NORMAL策略
  • 不建议使用start(boolean buildInitial)
    • true
      • 使用StartMode.BUILD_INITIAL_CACHE策略
    • false
      • 使用StartMode.NORMAL策略
  • 启动时添加了链接状态的监听器

可以看到启动过程有三种策略:

  1. NORMAL模式
    1. 执行刷新命令org.apache.curator.framework.recipes.cache.RefreshOperation命令模式
      • 使用RefreshMode.STANDARD刷新模式
      • 调用org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh方法
        1. 调用org.apache.curator.framework.EnsureContainers#ensure创建节点
        2. 在节点上添加childrenWatcher监听器
        3. 回调触发org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren进行刷新
          1. 清理掉已缓存在本地的数据中的其他节点
            1. 筛选出不是本cache的数据节点
            2. 从本地初始集合中清理掉
          2. 如果缓存节点还没用同步到本地,或者指定为RefreshMode.FORCE_GET_DATA_AND_STAT模式
            1. 则立即同步节点数据与状态
              1. 如果不需要缓存数据,则只检查节点是否存在(只缓存节点以及状态,不含数据)
              2. 否则读取数据(如果需要解压则解压数据)并构建ChildData缓存
                1. 新数据放入currentData
                2. 根据情况触发事件(唤起监听器)
                  • PathChildrenCacheEvent.Type.CHILD_ADDED事件
                  • PathChildrenCacheEvent.Type.CHILD_UPDATED事件
                3. 更新initialSet数据(将未同步的NULL_CHILD_DATA数据替换成读取的数据)
          3. 更新initialSet
            1. 如果initialSet的Map不为空
              • NORMAL模式下,这里为空
              • 可以参见POST_INITIALIZED_EVENT模式
  2. BUILD_INITIAL_CACHE模式
    1. 调用rebuild方法(此方法会阻塞执行)
      • 重新查询所有需要的数据
      • 不会触发任何事件
      1. 安全创建path
      2. 清空currentData缓存
      3. 重新加载path下子节点,逐个结点重构缓存
        • 逐个读取节点数据和状态
        • 构建ChildData放入currentData
      4. 通过rebuildTestExchanger发送要给信号对象
  3. POST_INITIALIZED_EVENT模式
    1. 初始化initialSet
    2. RefreshMode.POST_INITIALIZED模式刷新缓存
      • 参见NORMAL模式,但不同的是
        • 更新initialSet
          1. 如果initialSet的Map不为空
            • POST_INITIALIZED_EVENT模式下,这里已经初始化了Map
          2. 如果initialSet中的数据都已经同步完成(都不等于NULL_CHILD_DATA
            1. initialSet制空
            2. 触发PathChildrenCacheEvent.Type.INITIALIZED事件

5.5 节点发生变化

在启动start()已经给path上增加了一个监听器childrenWatcher

private volatile Watcher childrenWatcher = new Watcher(){    @Override    public void process(WatchedEvent event)    {        offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));    }};
  • RefreshMode.STANDARD模式刷新缓存
    • 会对本地的缓存数据和zk节点做比较
    • 只是处理新的缓存数据
  • 注意操作的参数PathChildrenCache.this
    • this不同了

5.6 数据发生变化

在每次获取缓存数据时(getDataAndStat方法),在每个缓存上添加了监听器dataWatcher

private volatile Watcher dataWatcher = new Watcher(){    @Override    public void process(WatchedEvent event)    {        try        {            if ( event.getType() == Event.EventType.NodeDeleted )            {                remove(event.getPath());            }            else if ( event.getType() == Event.EventType.NodeDataChanged )            {                offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));            }        }        catch ( Exception e )        {            ThreadUtils.checkInterrupted(e);            handleException(e);        }    }};
  • 节点删除时
    • 清理缓存
    • 触发PathChildrenCacheEvent.Type.CHILD_REMOVED事件
  • 数据发生变化时
    • 执行GetDataOperation操作
      • 也就是再次执行getDataAndStat方法
  • 注意操作的参数PathChildrenCache.this
    • this不同了

5.7 获取当前数据

public List
getCurrentData(){ return ImmutableList.copyOf(Sets.
newTreeSet(currentData.values()));}public ChildData getCurrentData(String fullPath){ return currentData.get(fullPath);}

都是从本地数据中获取

5.8 清理

5.8.1 清理缓存

public void clear(){    currentData.clear();}public void clearAndRefresh() throws Exception{    currentData.clear();    offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));}

清空本地数据

如果需要则使用RefreshMode.STANDARD模式,刷新

5.8.2 清理缓存数据

public void clearDataBytes(String fullPath){    clearDataBytes(fullPath, -1);}public boolean clearDataBytes(String fullPath, int ifVersion){    ChildData data = currentData.get(fullPath);    if ( data != null )    {        if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )        {            if ( data.getData() != null )            {                currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));            }            return true;        }    }    return false;}

保留缓存信息,但是数据部分制空

5.9 链接状态变化

在启动时(start())中为链接添加了connectionStateListener监听器:

private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener(){    @Override    public void stateChanged(CuratorFramework client, ConnectionState newState)    {        handleStateChange(newState);    }};private void handleStateChange(ConnectionState newState){    switch ( newState )    {    case SUSPENDED:    {        offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));        break;    }    case LOST:    {        offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));        break;    }    case CONNECTED:    case RECONNECTED:    {        try        {            offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));        }        catch ( Exception e )        {            ThreadUtils.checkInterrupted(e);            handleException(e);        }        break;    }    }}

主要都是根据链接状态,触发不同的操作,以及触发业务监听器来执行。

  • 由于数据都是缓存,所以在链接丢失,中断时,仅仅时触发事件,并没有将数据置为不可用
  • 当链接建立CONNECTED,以及恢复时RECONNECTED都触发了一次RefreshMode.FORCE_GET_DATA_AND_STAT模式的刷新操作。

5.10 关闭

在使用完之后,需要调用close()方法:

public void close() throws IOException{    if ( state.compareAndSet(State.STARTED, State.CLOSED) )    {        client.getConnectionStateListenable().removeListener(connectionStateListener);        listeners.clear();        executorService.close();        client.clearWatcherReferences(childrenWatcher);        client.clearWatcherReferences(dataWatcher);        // TODO        // This seems to enable even more GC - I'm not sure why yet - it        // has something to do with Guava's cache and circular references        connectionStateListener = null;        childrenWatcher = null;        dataWatcher = null;    }}
  • 原子操作,将状态更新为CLOSED
  • 移除链接状态监听器
  • 清空业务监听器
  • 关闭线程池
  • 清空节点监听器
  • 清空数据监听器

6. 小结

PathChildrenCache虽然名字带有Cache。 但其实并不是一个完整的缓存。

应该说,它仅仅是对path下诸多节点进行统一的管理。 当这些节点发生变动,或者数据发生变化时,都可以被PathChildrenCache发现,并同步到本地Map中。以此来达到一个缓存的概念。

从API中也能发现,它只能获取数据。至于放置缓存,则需要另外实现。

  • 其实也简单,直接向path下新建节点并写入数据就行

可以通过getListenable().addListener(listener);添加自定义监听器,从而实现对缓存进行更细致的控制。

7. 示例

这里可以参考

转载于:https://my.oschina.net/roccn/blog/918209

你可能感兴趣的文章
Entity Framework Core 修改映射主键名称
查看>>
SQL 经典面试题
查看>>
为知笔记发布博客地址
查看>>
java - Math、system、BigDecimal、Date、SimpleDateFormat、Calendar类概述和方法使用
查看>>
C# XML读写示例
查看>>
[leetcode-107-Binary Tree Level Order Traversal II]
查看>>
iptables
查看>>
MySQL数据库分表分区(一)(转)
查看>>
DEV CheckComboboxEdit、CheckedListBoxControl(转)
查看>>
MySQL跳过密码登录
查看>>
PLI 到 COBOL 的转换-数据类型 【不搞Mainframe的可能看不懂,冷门的语言】
查看>>
Tomcat学习总结(4)——基于Tomcat7、Java、WebSocket的服务器推送聊天室
查看>>
js_正则
查看>>
一些有用的技术文章
查看>>
Linux:shell登录过程
查看>>
linux 交叉编译出现的问题
查看>>
LruCache的缓存策略
查看>>
Android解析WindowManager(一)WindowManager体系
查看>>
MapReduce中的map个数
查看>>
开源框架:SDWebImage
查看>>