聊聊curator|聊聊curator recipes的LeaderLatch

序 本文主要研究一下curator recipes的LeaderLatch
实例

@Test public void testCuratorLeaderLatch() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); String leaderLockPath = "/leader-lock2"; List latchList = IntStream.rangeClosed(1,10) .parallel() .mapToObj(i -> new LeaderLatch(client,leaderLockPath,"client"+i)) .collect(Collectors.toList()); latchList.parallelStream() .forEach(latch -> { try { latch.start(); } catch (Exception e) { e.printStackTrace(); } }); TimeUnit.SECONDS.sleep(5); Iterator iterator = latchList.iterator(); while (iterator.hasNext()){ LeaderLatch latch = iterator.next(); if(latch.hasLeadership()){ System.out.println(latch.getId() + " hasLeadership"); try { latch.close(); } catch (IOException e) { e.printStackTrace(); } iterator.remove(); } }TimeUnit.SECONDS.sleep(5); latchList.stream() .filter(latch -> latch.hasLeadership()) .forEach(latch -> System.out.println(latch.getId() + " hasLeadership")); Participant participant = latchList.get(0).getLeader(); System.out.println(participant); TimeUnit.MINUTES.sleep(15); latchList.stream() .forEach(latch -> { try { latch.close(); } catch (IOException e) { e.printStackTrace(); } }); client.close(); }

  • zkCli查询
[zk: localhost:2181(CONNECTED) 17] ls / [leader-lock1, leader-lock2, zookeeper, leader-lock] [zk: localhost:2181(CONNECTED) 18] ls /leader-lock2 [_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]

LeaderLatch.start curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
/** * Add this instance to the leadership election and attempt to acquire leadership. * * @throws Exception errors */ public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); startTask.set(AfterConnectionEstablished.execute(client, new Runnable() { @Override public void run() { try { internalStart(); } finally { startTask.set(null); } } })); }private synchronized void internalStart() { if ( state.get() == State.STARTED ) { client.getConnectionStateListenable().addListener(listener); try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("An error occurred checking resetting leadership.", e); } } }@VisibleForTesting void reset() throws Exception { setLeadership(false); setNode(null); BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( debugResetWaitLatch != null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; }if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNode(event.getName()); if ( state.get() == State.CLOSED ) { setNode(null); } else { getChildren(); } } else { log.error("getChildren() failed. rc = " + event.getResultCode()); } } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); }

  • 这里start方法表示参与选举,reset方法通过forPath创建子节点
  • 这里ZKPaths.makePath(latchPath, LOCK_NAME)返回的是/latchPath/latch-
  • 这里有个callback主要做getChildren处理
CreateBuilderImpl.forPath
curator-framework-4.0.1-sources.jar!/org/apache/curator/framework/imps/CreateBuilderImpl.java
@VisibleForTesting static final String PROTECTED_PREFIX = "_c_"; @Override public String forPath(final String givenPath, byte[] data) throws Exception { if ( compress ) { data = https://www.it610.com/article/client.getCompressionProvider().compress(givenPath, data); }final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential())); List aclList = acling.getAclList(adjustedPath); client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList); String returnPath = null; if ( backgrounding.inBackground() ) { pathInBackground(adjustedPath, data, givenPath); } else { String path = protectedPathInForeground(adjustedPath, data, aclList); returnPath = client.unfixForNamespace(path); } return returnPath; }@VisibleForTesting String adjustPath(String path) throws Exception { if ( doProtected ) { ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); String name = getProtectedPrefix(protectedId) + pathAndNode.getNode(); path = ZKPaths.makePath(pathAndNode.getPath(), name); } return path; }private static String getProtectedPrefix(String protectedId) { return PROTECTED_PREFIX + protectedId +"-"; }

  • 如果CuratorFramework创建的时候没有指定的namespace的话,这里client.fixForNamespace返回原值
  • adjustPath对于需要doProtected的进行处理,添加上PROTECTED_PREFIX以及protectedId(UUID)还有-,比如原来是latch-,处理之后变为_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-
  • 之后由于创建的是EPHEMERAL_SEQUENTIAL,因而最后会添加上编号,比如/leader-lock2/_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045,而节点的值为LeaderLatch指定的id
LeaderLatch.getChildren
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
private void getChildren() throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { checkLeadership(event.getChildren()); } } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); }private void checkLeadership(List children) throws Exception { final String localOurPath = ourPath.get(); List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; if ( ourIndex < 0 ) { log.error("Can't find our node. Resetting. Index: " + ourIndex); reset(); } else if ( ourIndex == 0 ) { setLeadership(true); } else { String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ) { try { getChildren(); } catch ( Exception ex ) { ThreadUtils.checkInterrupted(ex); log.error("An error occurred checking the leadership.", ex); } } } }; BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { // previous node is gone - reset reset(); } } }; // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } }

  • 这里主要是调用了checkLeadership方法,该方法对于index为0的标记为leader,对于index大于0的则添加watch,watch的路径为前一个节点,如果前一个节点被删除了,则重新触发getChildren方法
  • 这里还注册一个callback,如果前一个节点被删除,则重新触发reset操作
LeaderLatch.close 【聊聊curator|聊聊curator recipes的LeaderLatch】curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
/** * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch * instances must eventually be closed. * * @throws IOException errors */ @Override public void close() throws IOException { close(closeMode); }/** * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch * instances must eventually be closed. * * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ public synchronized void close(CloseMode closeMode) throws IOException { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); cancelStartTask(); try { setNode(null); client.removeWatchers(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); throw new IOException(e); } finally { client.getConnectionStateListenable().removeListener(listener); switch ( closeMode ) { case NOTIFY_LEADER: { setLeadership(false); listeners.clear(); break; }default: { listeners.clear(); setLeadership(false); break; } } } }private synchronized void setLeadership(boolean newValue) { boolean oldValue = https://www.it610.com/article/hasLeadership.getAndSet(newValue); if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(new Function() { @Override public Void apply(LeaderLatchListener listener) { listener.notLeader(); return null; } }); } else if ( !oldValue && newValue ) { // Gained leadership, was false, now true listeners.forEach(new Function() { @Override public Void apply(LeaderLatchListener input) { input.isLeader(); return null; } }); }notifyAll(); }

  • close方法用于将该LeaderLatch退出选举,如果该latch是leader,则需要释放leadership
  • close方法首先cancel掉StartTask,设置节点值为null,然后移除了watcher以及ConnectionStateListener,最后设置leadership为false,然后触发相关listener
  • 注意如果closeMode是NOTIFY_LEADER,则先设置leadership为false,触发相关listener之后再移除listener;否则是先移除listener,再设置为false
  • setLeadership根据新旧值调用listener.notLeader()或者input.isLeader()
ConnectionStateListener curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
private final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); } }; private void handleStateChange(ConnectionState newState) { switch ( newState ) { default: { // NOP break; }case RECONNECTED: { try { if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() ) { reset(); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Could not reset leader latch", e); setLeadership(false); } break; }case SUSPENDED: { if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) { setLeadership(false); } break; }case LOST: { setLeadership(false); break; } } }

  • LeaderLatch注册了一个自定义的ConnectionStateListener,分别在RECONNECTED、SUSPENDED、LOST的时候进行相应处理
  • setLeadership(false)的时候,会根据新旧值通知相应的listener做处理,如果原来是leader,则回调listener.notLeader()
  • 对于RECONNECTED状态,如果当前latch不是leader,则调用reset,重新走start过程注册节点
小结
  • curator recipes的LeaderLatch给我们提供了leader选举的便利方法,并提供了LeaderLatchListener供自定义处理
  • LeaderLatch使用了zk的EPHEMERAL_SEQUENTIAL,节点名会自动带上编号,默认LOCK_NAME为latch-,另外对于protected的,会自动添加上PROTECTED_PREFIX(_c_)以及protectedId(UUID),因而最后的节点名的格式为PROTECTED_PREFIX+UUID+LOCK_NAME+编号,类似_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045
  • LeaderLatch使用了ConnectionStateListener对自身节点变化进行相应处理,取index为0的节点位leader,对于非leader的还对前一个节点添加watcher针对前一节点删除进行处理,触发checkLeadership操作,重新检查自身的index是否是在children排在第一位,如果是则更新为leader,触发相应操作,如果不是则重新watch前面一个节点。如此一环扣一环的实现显得十分精妙。
doc
  • Leader Latch
  • Apache Curator Leader选举 简单示例
  • 基于Apache Curator框架的两种分布式Leader选举策略详解

    推荐阅读