zookeeper启动类的位置在org.apache.zookeeper.server.ZooKeeperServerMain
,没错,找到它,并运行Main方法,即可启动zookeeper服务器。
请注意,在笔者的环境中只启动了一个zookeeper服务器,所以它并不是一个集群环境。
一、加载配置
第一步就是要加载配置文件,我们来看initializeAndRun
方法。
protected void initializeAndRun(String[] args)throws ConfigException, IOException{ ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(conf); } runFromConfig(config);}复制代码
这里主要就是把zoo.cfg
中的配置加载到ServerConfig对象中,过程比较简单,不再赘述。我们先看几个简单的配置项含义。
配置 | 含义 |
---|---|
clientPort | 对外服务端口,一般2181 |
dataDir | 存储快照文件的目录,默认情况下,事务日志文件也会放在这 |
tickTime | ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置 |
minSessionTimeout maxSessionTimeout | Session超时时间,默认是2tickTime ~ 20tickTime 之间 |
preAllocSize | 预先开辟磁盘空间,用于后续写入事务日志,默认64M |
snapCount | 每进行snapCount次事务日志输出后,触发一次快照,默认是100,000 |
maxClientCnxns | 最大并发客户端数,默认是60 |
二、启动服务
我们接着往下看,来到runFromConfig
方法。
public void runFromConfig(ServerConfig config) throws IOException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { final ZooKeeperServer zkServer = new ZooKeeperServer(); final CountDownLatch shutdownLatch = new CountDownLatch(1); //注册服务器关闭事件 zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); //操作事务日志和快照日志文件类 txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File( config.dataDir)); txnLog.setServerStats(zkServer.serverStats()); //设置配置属性 zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); //实例化ServerCnxnFactory抽象类 cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); cnxnFactory.startup(zkServer); shutdownLatch.await(); shutdown(); cnxnFactory.join(); if (zkServer.canShutdown()) { zkServer.shutdown(true); } } catch (InterruptedException e) { LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } }}复制代码
以上代码就是zookeeper服务器从启动到关闭的流程。我们拆分来看。
1、服务关闭事件
我们看到给zkServer注册了服务器关闭的处理类。
final ZooKeeperServer zkServer = new ZooKeeperServer();final CountDownLatch shutdownLatch = new CountDownLatch(1);zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch));复制代码
首先,我们应该知道zookeeper服务器是有状态的。
protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR;}复制代码
那么,在状态发生变化的时候,就会调用到setState
方法。
public class ZooKeeperServer{ //当zookeeper服务器状态发生变化时候调用此方法 protected void setState(State state) { this.state = state; if (zkShutdownHandler != null) { zkShutdownHandler.handle(state); } else { LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server " + "won't take any action on ERROR or SHUTDOWN server state changes"); } }}复制代码
然后在这里就会调用到注册的处理器。在处理器中,如果发现状态不对,shutdownLatch.await方法就会被唤醒。
class ZooKeeperServerShutdownHandler { void handle(State state) { if (state == State.ERROR || state == State.SHUTDOWN) { shutdownLatch.countDown(); } }}复制代码
当它被唤醒,事情就变得简单了。关闭、清理各种资源。
2、日志文件
事务日志文件和快照文件的操作,分别对应着两个实现类,在这里就是为了创建文件路径和创建类实例。
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir); this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); if (!this.dataDir.exists()) { if (!this.dataDir.mkdirs()) { throw new IOException("Unable to create data directory " + this.dataDir); } } if (!this.dataDir.canWrite()) { throw new IOException("Cannot write to data directory " + this.dataDir); } if (!this.snapDir.exists()) { if (!this.snapDir.mkdirs()) { throw new IOException("Unable to create snap directory " + this.snapDir); } } if (!this.snapDir.canWrite()) { throw new IOException("Cannot write to snap directory " + this.snapDir); } if(!this.dataDir.getPath().equals(this.snapDir.getPath())){ checkLogDir(); checkSnapDir(); } txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir);}复制代码
上面的好理解,如果文件不存在就去创建,并检查是否拥有写入权限。
中间还有个判断很有意思,如果两个文件路径不相同,还要调用checkLogDir、checkSnapDir
去检查。检查什么呢?就是不能放在一起。
事务日志文件目录下,不能包含快照文件。 快照文件目录下,也不能包含事务日志文件。
最后,就是初始化两个实现类,把创建后的文件对象告诉它们。
3、启动服务
服务器的启动对应着两个实现:NIO服务器和Netty服务器。所以一开始要调用createFactory
来选择实例化一个实现类。
static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory"); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = Class.forName(serverCnxnFactoryName) .getDeclaredConstructor().newInstance(); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(e); throw ioe; }}复制代码
先获取zookeeper.serverCnxnFactory
属性值,如果它为空,默认创建的就是NIOServerCnxnFactory
实例。
所以,如果我们希望用Netty启动,就可以这样设置: System.setProperty("zookeeper.serverCnxnFactory", NettyServerCnxnFactory.class.getName());
最后通过反射获取它们的构造器并实例化。然后调用它们的方法来绑定端口,启动服务。两者差异不大,在这里咱们以Netty为例看一下。
- 构造函数
NettyServerCnxnFactory() { bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.soLinger", -1); bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);}复制代码
在构造函数中,初始化ServerBootstrap对象,设置TCP参数。我们重点关注的是,它的事件处理器channelHandler。
- 事件处理器
这里的channelHandler是一个内部类,继承自SimpleChannelHandler。它被标注为@Sharable,还是一个共享的处理器。
@Sharableclass CnxnChannelHandler extends SimpleChannelHandler { //客户端连接被关闭 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception{ //移除相应的Channel allChannels.remove(ctx.getChannel()); } //客户端连接 public void channelConnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{ allChannels.add(ctx.getChannel()); NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(), zkServer, NettyServerCnxnFactory.this); ctx.setAttachment(cnxn); addCnxn(cnxn); } //连接断开 public void channelDisconnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{ NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); if (cnxn != null) { cnxn.close(); } } //发生异常 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception{ NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); if (cnxn != null) { if (LOG.isDebugEnabled()) { LOG.debug("Closing " + cnxn); } cnxn.close(); } } //有消息可读 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)throws Exception{ try { //找到对应的NettyServerCnxn,调用方法处理请求信息 NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); synchronized(cnxn) { processMessage(e, cnxn); } } catch(Exception ex) { LOG.error("Unexpected exception in receive", ex); throw ex; } } //处理消息 private void processMessage(MessageEvent e, NettyServerCnxn cnxn) { ....省略 }}复制代码
这里面就是处理各种IO事件。比如客户端连接、断开连接、可读消息...
我们看messageReceived
方法。当有消息请求时,调用到此方法。它会找到当前Channel对应的NettyServerCnxn对象,调用其receiveMessage
方法,来完成具体请求的处理。
- 绑定端口
初始化完成之后,通过bootstrap.bind
来绑定端口,正式开始对外提供服务。
public class NettyServerCnxnFactory extends ServerCnxnFactory { public void start() { LOG.info("binding to port " + localAddress); parentChannel = bootstrap.bind(localAddress); }}复制代码
上面我们调用start方法启动了Netty服务,但整个zookeeper的启动过程还没有完成。
public void startup(ZooKeeperServer zks) throws IOException,InterruptedException { start(); setZooKeeperServer(zks); zks.startdata(); zks.startup();}复制代码
三、加载数据
接着我们看zks.startdata();
它要从zookeeper数据库加载数据。
有的同学不禁有疑问,什么,zk竟然还有数据库? 不着急,我们慢慢看。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { //加载数据 public void startdata()throws IOException, InterruptedException { //刚启动的时候,zkDb为空,先去初始化。 if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } //加载数据 if (!zkDb.isInitialized()) { loadData(); } }}复制代码
上面的代码中,在刚启动的时候zkDb为空,所以会进入第一个条件判断,调用构造方法,初始化zkDb。之后,调用loadData方法加载数据。
1、ZKDatabase
事实上,zookeeper并没有数据库,有的只是ZKDatabase
这个类,或者叫它内存数据库。 我们先看看它有哪些属性。
public class ZKDatabase { //数据树 protected DataTree dataTree; //Session超时会话 protected ConcurrentHashMapsessionsWithTimeouts; //事务、快照Log protected FileTxnSnapLog snapLog; //最小、最大事务ID protected long minCommittedLog, maxCommittedLog; public static final int commitLogCount = 500; protected static int commitLogBuffer = 700; //事务日志列表,记录着提案信息 protected LinkedList committedLog = new LinkedList (); protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock(); //初始化标记 volatile private boolean initialized = false;}复制代码
这里面包括会话,数据树和提交日志。所有的数据都保存在DataTree中,它就是数据树,它保存所有的节点数据。
public class DataTree { //哈希表提供对数据节点的快速查找 private final ConcurrentHashMapnodes = new ConcurrentHashMap (); //Watcher相关 private final WatchManager dataWatches = new WatchManager(); private final WatchManager childWatches = new WatchManager(); //zookeeper默认创建的节点 private static final String rootZookeeper = "/"; private static final String procZookeeper = "/zookeeper"; private static final String procChildZookeeper = procZookeeper.substring(1); private static final String quotaZookeeper = "/zookeeper/quota"; private static final String quotaChildZookeeper = quotaZookeeper .substring(procZookeeper.length() + 1);}复制代码
在我们从zookeeper上查询节点数据的时候,就是通过DataTree中的方法去获取。再具体就是通过节点名称去nodes哈希表去查询。比如:
public byte[] getData(String path, Stat stat, Watcher watcher){ DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; }}复制代码
那我们也许已经想到了,DataNode才会保存数据的真正载体。
public class DataNode implements Record { //父级节点 DataNode parent; //节点数据内容 byte data[]; //权限信息 Long acl; //节点统计信息 public StatPersisted stat; //子节点集合 private Setchildren = null; //空Set对象 private static final Set EMPTY_SET = Collections.emptySet();}复制代码
在zookeeper中的一个节点就对应一个DataNode对象。它包含一个父级节点和子节点集合、权限信息、节点数据内容、统计信息,都在此类中表示。
2、实例化对象
我们接着回过头来,继续看代码。如果zkDb为空,就要去实例化它。
public ZKDatabase(FileTxnSnapLog snapLog) { dataTree = new DataTree(); sessionsWithTimeouts = new ConcurrentHashMap(); this.snapLog = snapLog;}复制代码
这里就是实例化DataTree对象,初始化超时会话的Map,赋值snapLog 对象。
那么在DataTree的构造函数中,初始化zookeeper默认的节点,就是往nodes哈希表中添加DataNode对象。
public DataTree() { nodes.put("", root); nodes.put(rootZookeeper, root); root.addChild(procChildZookeeper); nodes.put(procZookeeper, procDataNode); procDataNode.addChild(quotaChildZookeeper); nodes.put(quotaZookeeper, quotaDataNode);}复制代码
3、加载数据
如果zkDb还没有被初始化,那就加载数据库,并设置为已初始化状态,然后清理一下过期Session。
public class ZooKeeperServer{ public void loadData() throws IOException, InterruptedException { if(zkDb.isInitialized()){ setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { setZxid(zkDb.loadDataBase()); } //清理过期session LinkedListdeadSessions = new LinkedList (); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }}复制代码
我们看zkDb.loadDataBase()
方法。它将从磁盘文件中加载数据库。
public class ZKDatabase { //从磁盘文件中加载数据库,并返回最大事务ID public long loadDataBase() throws IOException { long zxid = snapLog.restore(dataTree, s essionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; }}复制代码
既然是磁盘文件,那么肯定就是快照文件和事务日志文件。snapLog.restore
将证实这一点。
public class FileTxnSnapLog { public long restore(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException { //从快照文件中加载数据 snapLog.deserialize(dt, sessions); //从事务日志文件中加载数据 long fastForwardFromEdits = fastForwardFromEdits(dt, sessions, listener); return fastForwardFromEdits; }}复制代码
加载数据的过程看起来比较复杂,但核心就一点:从文件流中读取数据,转换成DataTree对象,放入zkDb中。在这里,咱们先不看解析文件的过程,就看看文件里存放的到底是些啥?
快照文件
我们找到org.apache.zookeeper.server.SnapshotFormatter
,它可以帮我们输出快照文件内容。在main方法中,设置一下快照文件的路径,然后运行它。
public class SnapshotFormatter { public static void main(String[] args) throws Exception { //设置快照文件路径 args = new String[1]; args[0] = "E:\\zookeeper-data\\version-2\\snapshot.6"; if (args.length != 1) { System.err.println("USAGE: SnapshotFormatter snapshot_file"); System.exit(2); } new SnapshotFormatter().run(args[0]); }}复制代码
运行这个main方法,在控制台输出的就是快照文件内容。
ZNode Details (count=8):----/ cZxid = 0x00000000000000 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x00000000000000 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x00000000000002 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 0----/zookeeper cZxid = 0x00000000000000 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x00000000000000 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x00000000000000 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 0----/zookeeper/quota cZxid = 0x00000000000000 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x00000000000000 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x00000000000000 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 0----/test cZxid = 0x00000000000002 ctime = Sat Feb 23 19:57:43 CST 2019 mZxid = 0x00000000000002 mtime = Sat Feb 23 19:57:43 CST 2019 pZxid = 0x00000000000005 cversion = 3 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 4----/test/t1 cZxid = 0x00000000000003 ctime = Sat Feb 23 19:57:53 CST 2019 mZxid = 0x00000000000003 mtime = Sat Feb 23 19:57:53 CST 2019 pZxid = 0x00000000000003 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 4----/test/t2 cZxid = 0x00000000000004 ctime = Sat Feb 23 19:57:56 CST 2019 mZxid = 0x00000000000004 mtime = Sat Feb 23 19:57:56 CST 2019 pZxid = 0x00000000000004 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 4----/test/t3 cZxid = 0x00000000000005 ctime = Sat Feb 23 19:57:58 CST 2019 mZxid = 0x00000000000005 mtime = Sat Feb 23 19:57:58 CST 2019 pZxid = 0x00000000000005 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x00000000000000 dataLength = 4----Session Details (sid, timeout, ephemeralCount):0x10013d3939a0000, 99999, 00x10013d1adcb0000, 99999, 0复制代码
我们可以看到,格式化后的快照文件内容,除了开头的count信息和结尾的Session信息,中间每一行就是一个DataNode对象。从节点名称可以推算出自己的父级节点和子节点,其它的就是此节点的统计信息对象StatPersisted。
事务日志文件
我们找到org.apache.zookeeper.server.LogFormatter
这个类,在main方法中设置事务日志的文件路径,然后运行它。在zookeeper中的每一个事务操作,都会被记录下来。
19-2-23 下午07时57分32秒 session 0x10013d1adcb0000 cxid 0x0 zxid 0x1 createSession 9999919-2-23 下午07时57分43秒 session 0x10013d1adcb0000 cxid 0x2 zxid 0x2 create '/test,#31323334,v{s{31,s{'world,'anyone}}},F,119-2-23 下午07时57分53秒 session 0x10013d1adcb0000 cxid 0x3 zxid 0x3 create '/test/t1,#31323334,v{s{31,s{'world,'anyone}}},F,119-2-23 下午07时57分56秒 session 0x10013d1adcb0000 cxid 0x4 zxid 0x4 create '/test/t2,#31323334,v{s{31,s{'world,'anyone}}},F,219-2-23 下午07时57分58秒 session 0x10013d1adcb0000 cxid 0x5 zxid 0x5 create '/test/t3,#31323334,v{s{31,s{'world,'anyone}}},F,319-2-23 下午07时58分51秒 session 0x10013d3939a0000 cxid 0x0 zxid 0x6 createSession 9999919-2-23 下午07时59分07秒 session 0x10013d3939a0000 cxid 0x4 zxid 0x7 create '/test/t4,#31323334,v{s{31,s{'world,'anyone}}},F,4复制代码
可以看到,每一个事务对应一行记录。包含操作时间、sessionId、事务ID、操作类型、节点名称和权限信息等。 需要注意的是,只有变更操作才会被记录到事务日志。所以,在这里我们看不到任何读取操作请求。
四、会话管理器
会话是Zookeeper中一个重要的抽象。保证请求有序、临时znode节点、监听点都和会话密切相关。Zookeeper服务器的一个重要任务就是跟踪并维护这些会话。
在zookeeper中,服务器要负责清理掉过期会话,而客户端要保持自己的活跃状态,只能依靠心跳信息或者一个新的读写请求。
而对于过期会话的管理,则依靠“分桶策略”来完成。具体情况是这样的:
- 1、zookeeper会为每个会话设置一个过期时间,我们称它为nextExpirationTime
- 2、将这个过期时间和相对应的Session集合放入Map中
- 3、开启线程,不断轮训这个Map,取出当前过期点nextExpirationTime的Session集合,然后关闭它们
- 4、未活跃的Session被关闭;正在活跃的Session会重新计算自己的过期时间,修改自己的过期时间nextExpirationTime,保证不会被线程扫描到
简而言之,还在活跃的Session依靠不断重置自己的nextExpirationTime时间,就不会被线程扫描到,继而被关闭。
接下来我们看调用到的zks.startup();
方法,具体是怎么做的。
public class ZooKeeperServer public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll(); }}复制代码
我们只关注createSessionTracker、startSessionTracker
两个方法,它们和会话相关。
1、创建会话跟踪器
创建会话跟踪器,这里是一个SessionTrackerImpl
对象实例。
protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener());}复制代码
在构造方法里,做了一些参数初始化的工作。
public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMapsessionsWithTimeout, int tickTime, long sid, ZooKeeperServerListener listener){ super("SessionTracker", listener); this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; nextExpirationTime = roundToInterval(Time.currentElapsedTime()); this.nextSessionId = initializeNextSession(sid); for (Entry e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); }}复制代码
我们重点关注下一个过期时间nextExpirationTime
是怎样计算出来的。我们来看roundToInterval
方法。
private long roundToInterval(long time) { return (time / expirationInterval + 1) * expirationInterval;}复制代码
其中,time是基于当前时间的一个时间戳;expirationInterval是我们配置文件中的tickTime。如果我们假定time=10,expirationInterval=2
,那么上面计算后的下一个过期时间为(10/2+1)*2=12
这也就是说,当前的Session会被分配到Id为12的分桶中。我们继续往下看这一过程。 在addSession
方法中,先查询是否有会话Id的SessionImpl,没有则新建并存入。
synchronized public void addSession(long id, int sessionTimeout) { sessionsWithTimeout.put(id, sessionTimeout); //查询对应SessionId的Impl类 if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout, 0); sessionsById.put(id, s); } else { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Existing session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } touchSession(id, sessionTimeout);}复制代码
最后调用touchSession
来激活会话。需要注意的是,zookeeper中的每个请求都会调用到此方法。它来计算活跃Session的下一个过期时间,并迁移到不同桶中。
我们一直在说“分桶”,或许难以理解“桶”到底是个什么东西。在代码中,它其实是个HashSet对象。
public class SessionTrackerImpl{ //过期时间和对应Session集合的映射 HashMapsessionSets = new HashMap (); //Session集合 static class SessionSet { HashSet sessions = new HashSet (); } synchronized public boolean touchSession(long sessionId, int timeout) { SessionImpl s = sessionsById.get(sessionId); //如果session被删除或者已经被标记为关闭状态 if (s == null || s.isClosing()) { return false; } //计算下一个过期时间 long expireTime = roundToInterval(Time.currentElapsedTime() + timeout); if (s.tickTime >= expireTime) { return true; } //获取Session当前的过期时间 SessionSet set = sessionSets.get(s.tickTime); if (set != null) { //从集合中删除 set.sessions.remove(s); } //设置新的过期时间并加入到Session集合中 s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; }}复制代码
我们回头看上面那个公式,如果第一次Session请求计算后的过期时间为12。 那么,对应Session的映射如下: 12=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@25143a5e
第二次请求,计算后的过期时间为15。就会变成: 15=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@3045314d
同时,过期时间为12的记录被删除。这样,通过过期时间的变更,不断迁移这个Session的位置。我们就会想到,如果由于网络原因或者客户端假死,请求长时间未能到达服务器,那么对应Session的过期时间就不会发生变化。 **时代在变化,你不变,就会被抛弃。**这句话,同样适用于zookeeper中的会话。
我们接着看startSessionTracker();
protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start();}复制代码
SessionTrackerImpl
继承自ZooKeeperCriticalThread
,所以它本身也是线程类。调用start方法后开启线程,我们看run方法。
synchronized public void run() { try { while (running) { currentTime = Time.currentElapsedTime(); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } SessionSet set; //获取过期时间对应的Session集合 set = sessionSets.remove(nextExpirationTime); //循环Session,关闭它们 if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!");}复制代码
这个方法通过死循环的方式,不断获取过期时间对应的Session集合。简直就是发现一起,查处一起 。 这也就解释了为什么活跃Session必须要不断更改自己的过期时间,因为这里有人在监督。
最后就是注册了JMX,并设置服务器的运行状态。
五、总结
本文主要分析了zookeeper服务器启动的具体流程,我们再回顾一下。
- 配置 zoo.cfg文件,运行Main方法
- 注册zk服务器关闭事件,清理资源
- 选择NIO或者Netty服务器绑定端口,开启服务
- 初始化zkDB,加载磁盘文件到内存
- 创建会话管理器,监视过期会话并删除
- 注册JMX,设置zk服务状态为running