ZooKeeper 基础知识

无情 阅读:698 2021-03-31 14:13:09 评论:0

第一、什么是ZooKeeper

Zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务, 
它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等, 
简化分布式应用协调及其管理的难度,提供高性能的分布式服务。 
ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower), 
基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

1.1 ZooKeeper 集群

Zookeeper集群的角色: Leader 和 follower  
只要集群中有半数以上节点存活,集群就能提供服务

1.2 ZooKeeper 特性

1、Zookeeper:一个leader,多个follower组成的集群 
2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的 
3、分布式读写,更新请求转发,由leader实施 
4、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行 
5、数据更新原子性,一次数据更新要么成功,要么失败 
6、实时性,在一定时间范围内,client能读到最新数据

1.3 ZooKeeper 数据结构

1、层次化的目录结构,命名符合常规文件系统规范(类似文件系统)

2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识

3、节点Znode可以包含数据和子节点(但是EPHEMERAL(临时)类型的节点不能有子节点)
ZooKeeper节点类型说明:

a、Znode有两种类型: 
短暂(ephemeral)(create -e /app1/test1 “test1” 客户端断开连接zk删除ephemeral类型节点)  
持久(persistent) (create -s /app1/test2 “test2” 客户端断开连接zk不删除persistent类型节点) 
b、Znode有四种形式的目录节点(默认是persistent ) 
PERSISTENT  
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )  
EPHEMERAL  
EPHEMERAL_SEQUENTIAL 
c、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护  
        
d、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

第二、ZooKeeper应用场景

2.1 ZooKeeper 应用场景

统一命名服务 
       分布式环境下,经常需要对应用/服务进行统一命名,便于识别不同服务。类似于域名与ip之间对应关系,域名容易记住。通过名称来获取资源或服务的地址,提供者等信息按照层次结构组织服务/应用名称可将服务名称以及地址信息写到Zookeeper上,客户端通过Zookeeper获取可用服务列表类。 
 
配置管理 
       分布式环境下,配置文件管理和同步是一个常见问题。一个集群中,所有节点的配置信息是一致的,比如Hadoop。对配置文件修改后,希望能够快速同步到各个节点上配置管理可交由Zookeeper实现。可将配置信息写入Zookeeper的一个znode上。各个节点监听这个znode。一旦znode中的数据被修改,zookeeper将通知各个节点。 
 
集群管理 
       分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态作出一些调整。Zookeeper可将节点信息写入Zookeeper的一个znode上。监听这个znode可获取它的实时状态变化。典型应用比如Hbase中Master状态监控与选举。 
 
分布式通知/协调 
       分布式环境中,经常存在一个服务需要知道它所管理的子服务的状态。例如,NameNode须知道各DataNode的状态,JobTracker须知道各TaskTracker的状态。心跳检测机制和信息推送也是可通过Zookeeper实现。 
 
分布式锁 
       Zookeeper是强一致的。多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功。Zookeeper实现锁的独占性。多个客户端同时在Zookeeper上创建相同znode ,创建成功的那个客户端得到锁,其他客户端等待。Zookeeper 控制锁的时序。各个客户端在某个znode下创建临时znode (类型为CreateMode. EPHEMERAL _SEQUENTIAL),这样,该znode可掌握全局访问时序。 
 
分布式队列 
       先进先出对列。队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。(可通过分布式锁实现) 
       同步队列。一个job由多个task组成,只有所有任务完成后,job才运行完成。可为job创建一个/job目录,然后在该目录下,为每个完成的task创建一个临时znode,一旦临时节点数目达到task总数,则job运行完成。

第三、ZooKeeper  Windows 单机版搭建

3.1 Windows 环境安装:

1、ZooKeeper 下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

2、从Oracle的Java网站下载,安装很简单,就不再详述.

3、下载后,得到的是一个zookeeper-3.4.12.tar.gz,直接解压即可

4、ZooKeeper相关配置

在你执行启动脚本之前,还有几个基本的配置项需要配置一下,Zookeeper 的配置文件在 conf 目录下,这个目录下有 zoo_sample.cfg 和 log4j.properties

你需要做的就是将 zoo_sample.cfg 改名为 zoo.cfg 
 
因为 Zookeeper 在启动时会找这个文件作为默认配置文件。 
 
注意:在windows下配置dataDir和dataLogDir,路径使用双斜线(\\)

4.1 配置文件详解:

# The number of milliseconds of each tick 
tickTime=2000 
# The number of ticks that the initial  
# synchronization phase can take 
initLimit=10 
# The number of ticks that can pass between  
# sending a request and getting an acknowledgement 
syncLimit=5 
# the directory where the snapshot is stored. 
# do not use /tmp for storage, /tmp here is just  
# example sakes. 
dataDir=E:\\zookeeper\\data 
dataLogDir=E:\\zookeeper\\log 
#dataDir=/tmp/zookeeper 
# the port at which the clients will connect 
clientPort=2181 
# the maximum number of client connections. 
# increase this if you need to handle more clients 
#maxClientCnxns=60 
# 
# Be sure to read the maintenance section of the  
# administrator guide before turning on autopurge. 
# 
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance 
# 
# The number of snapshots to retain in dataDir 
#autopurge.snapRetainCount=3 
# Purge task interval in hours 
# Set to "0" to disable auto purge feature 
#autopurge.purgeInterval=1 
1、tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。通过心跳不仅能够用来监听机器的工作状态,还可以通过心跳来控制Flower跟Leader的通信时间,默认情况下FL的会话时常是心跳间隔的两倍。 
 
2、initLimit:集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。 
syncLimit:集群中flower服务器(F)跟leader(L)服务器之间的发送请求和获取确认最多能容忍的心跳数。 
    
3、dataDir: Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。 
 
4、dataLogDir: Zookeeper 保存日志文件的目录 
 
5、clientPort:客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求,默认是2181。 
 
6、maxClientCnxns:单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。 
 
7、autopurge.purgeInterval:Zookeeper 3.4.0及之后版本,ZK提供了自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个1或更大的整数,默认是0,表示不开启自动清理功能。 
 
8、autopurge.snapRetainCount:Zookeeper 3.4.0及之后版本,这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。 

5、ZooKeeper 启动

进入bin 目录下,Windows 下的启动脚本是 zkServer.cmd

启动后要检查 Zookeeper 是否已经在服务,可以通过 netstat – ano 命令查看是否有你配置的 clientPort 端口号在监听服务。

1、启动中,Zookeeper会主动加载JDK的安装路径

2、启动中,Zookeeper会加载zoo.cfg的配置信息

 第四、ZooKeeper  客户端

ZooKeeper命令行工具类似于Linux的shell环境,不过功能肯定不及shell啦,但是使用它我们可以简单的对ZooKeeper进行访问,数据创建,数据修改等操作.  使用 zkCli.cmd -server 127.0.0.1:2181 连接到 ZooKeeper 服务,连接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息。

ZooKeeper 客户端常用命令行:

1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容 
2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据 
3. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串 
4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串 
5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置 
6. 删除文件: delete /zk 将刚才创建的 znode 删除 
7. 退出客户端: quit 
8. 帮助命令: help

 第五、Java 操作ZooKeeper 

5.1 同步方式创建节点:

/** 
	 * 同步方式创建节点 
	 *  
	 * @param zk 
	 * @throws InterruptedException 
	 * @throws KeeperException 
	 */ 
	public static void synchronization(ZooKeeper zk) throws Exception { 
		String result = zk.create("/sync2", "sync2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
		System.out.println("result:" + result); 
	}

同步方式请求参数说明:

参数1,节点路径《名称) : InodeName (不允许递归创建节点,也就是说在父节点不存在 
的情况下,不允许创建子节点) 
 
参数2,节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序 
列化,可使用java相关序列化框架,如Hessian、Kryo框架) 
 
参數3,节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限即可。(这个参数一般在权展 
没有太高要求的场景下,没必要关注) 
 
参数4,节点类型: 创建节点的类型: CreateMode,提供四种首点象型 
PERSISTENT(持久节点) 
PERSISTENT SEQUENTIAL(持久顺序节点) 
EPHEMERAL(临时节点) 
EPHEMERAL SEQUENTAL(临时顺序节点)

 

5.2 异步方式创建节点

/** 
	 * 异步方式创建节点 
	 * @param zk 
	 * @throws Exception 
	 */ 
	public static void asynchronous(ZooKeeper zk)  throws Exception { 
		zk.create("/async2", "async2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new ZookeeperCreateNode().new IStringCallback(), "I am context."); 
	} 
 
	class IStringCallback implements AsyncCallback.StringCallback { 
		public void processResult(int rc, String path, Object ctx, String name) { 
			System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name); 
		} 
	}

异步方式请求参数说明:

AsyncCallback包含了StatCallback、DataCallback、ACLCallback、ChildrenCallback、Children2Callback、StringCallback和VoidCallback 七种不同回调接口。和同步接口最大的区别就是:节点创建过程(包括网络通信和服务端的节点创建过程)是异步的,并且在同步接口调用过程中,我们需要关注接口抛出的异常,但是在异步接口中,接口本身不会抛出异常的,所有的异常都会在回调函数中通过Result Code 来体现。
 

拓展:

5.3 完整测试代码:

pom.xml 添加zookeeper.jar 包依赖

	<!--zookeeper jar包  --> 
		<dependency> 
			<groupId>org.apache.zookeeper</groupId> 
			<artifactId>zookeeper</artifactId> 
			<version>3.4.10</version> 
		</dependency>

 Java 功能代码:

package com.zzg.zookeeper; 
 
import java.io.IOException; 
import java.util.concurrent.CountDownLatch; 
 
import org.apache.zookeeper.AsyncCallback; 
import org.apache.zookeeper.CreateMode; 
import org.apache.zookeeper.KeeperException; 
import org.apache.zookeeper.WatchedEvent; 
import org.apache.zookeeper.Watcher; 
import org.apache.zookeeper.Watcher.Event.EventType; 
import org.apache.zookeeper.Watcher.Event.KeeperState; 
import org.apache.zookeeper.ZooDefs.Ids; 
import org.apache.zookeeper.ZooKeeper; 
 
public class ZookeeperCreateNode { 
 
	/** 
	 * 集群连接地址 
	 */ 
	private static final String CONNECT_ADDR = "127.0.0.1:2181"; 
	/** 
	 * session超时时间 
	 */ 
	private static final int SESSION_OUTTIME = 2000; 
	/** 
	 * 信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号, 
	 */ 
	private static final CountDownLatch countDownLatch = new CountDownLatch(1); 
 
	public static void main(String[] args) throws Exception { 
		ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() { 
 
			public void process(WatchedEvent event) { 
				// 获取时间的状态 
				KeeperState keeperState = event.getState(); 
				EventType tventType = event.getType(); 
				// 如果是建立连接 
				if (KeeperState.SyncConnected == keeperState) { 
					if (EventType.None == tventType) { 
						// 如果建立连接成功,则发送信号量,让后阻塞程序向下执行 
						countDownLatch.countDown(); 
						System.out.println("zk 建立连接"); 
					} 
				} 
			} 
 
		}); 
		// 进行阻塞 
		countDownLatch.await(); 
		// 同步调用:同步方式创建节点 
		synchronization(zk); 
		// 异步调用:异步方式创建节点 
		asynchronous(zk); 
		 
		Thread.sleep( Integer.MAX_VALUE ); 
		// zk 关闭 
		// zk.close(); 
 
	} 
 
	/** 
	 * 同步方式创建节点 
	 *  
	 * @param zk 
	 * @throws InterruptedException 
	 * @throws KeeperException 
	 */ 
	public static void synchronization(ZooKeeper zk) throws Exception { 
		String result = zk.create("/sync2", "sync2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
		System.out.println("result:" + result); 
	} 
 
	/** 
	 * 异步方式创建节点 
	 * @param zk 
	 * @throws Exception 
	 */ 
	public static void asynchronous(ZooKeeper zk)  throws Exception { 
		zk.create("/async2", "async2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new ZookeeperCreateNode().new IStringCallback(), "I am context."); 
	} 
 
	class IStringCallback implements AsyncCallback.StringCallback { 
		public void processResult(int rc, String path, Object ctx, String name) { 
			System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name); 
		} 
	} 
 
} 

5.4 ZooKeeper 之Watcher

      在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)。

5.4.1 什么是Watcher接口

同一个事件类型在不同的通知状态中代表的含义有所不同,表7-3列举了常见的通知状态和事件类型。

表7-3 Watcher通知状态与事件类型一览

KeeperState

EventType

触发条件

说明

 

None
(-1)

客户端与服务端成功建立连接

 

SyncConnected
(0)

NodeCreated
(1)

Watcher监听的对应数据节点被创建

 

 

NodeDeleted
(2)

Watcher监听的对应数据节点被删除

此时客户端和服务器处于连接状态

 

NodeDataChanged
(3)

Watcher监听的对应数据节点的数据内容发生变更

 

 

NodeChildChanged
(4)

Wather监听的对应数据节点的子节点列表发生变更

 

Disconnected
(0)

None
(-1)

客户端与ZooKeeper服务器断开连接

此时客户端和服务器处于断开连接状态

Expired
(-112)

Node
(-1)

会话超时

此时客户端会话失效,通常同时也会受到SessionExpiredException异常

AuthFailed
(4)

None
(-1)

通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败

通常同时也会收到AuthFailedException异常

表7-3中列举了ZooKeeper中最常见的几个通知状态和事件类型。

回调方法process()

process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义如下:

abstract public void process(WatchedEvent event);

这个回调方法的定义非常简单,我们重点看下方法的参数定义:WatchedEvent。

WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。

提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。

服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。

需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来说,当/zk-book这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能够接收到如下信息

Java 之ZooKeeper Watcher 示例代码:

package com.zzg.zookeeper; 
 
import java.util.concurrent.CountDownLatch; 
 
import org.apache.zookeeper.CreateMode; 
import org.apache.zookeeper.KeeperException; 
import org.apache.zookeeper.WatchedEvent; 
import org.apache.zookeeper.Watcher; 
import org.apache.zookeeper.Watcher.Event.EventType; 
import org.apache.zookeeper.Watcher.Event.KeeperState; 
import org.apache.zookeeper.ZooDefs.Ids; 
import org.apache.zookeeper.ZooKeeper; 
import org.apache.zookeeper.data.Stat; 
 
/** 
 * ZooKeeper 事件观察 
 *  
 * @author Administrator 
 * 
 */ 
public class ZooKeeperWatcher implements Watcher { 
	// 集群连接地址 
	private static final String CONNECT_ADDRES = "127.0.0.1:2181"; 
	// 会话超时时间 
	private static final int SESSIONTIME = 2000; 
	// 信号量,让zk在连接之前等待,连接成功后才能往下走. 
	private static final CountDownLatch countDownLatch = new CountDownLatch(1); 
	private static String LOG_MAIN = "【main】 "; 
	private ZooKeeper zk; 
 
	public static void main(String[] args) throws KeeperException, InterruptedException { 
		ZooKeeperWatcher zkClientWatcher = new ZooKeeperWatcher(); 
		zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); 
//		boolean createResult = zkClientWatcher.createPath("/watcher", "1"); 
 		zkClientWatcher.updateNode("/watcher","2"); 
 
	} 
 
	@Override 
	public void process(WatchedEvent watchedEvent) { 
		// 获取事件状态 
		KeeperState keeperState = watchedEvent.getState(); 
		// 获取事件类型 
		EventType eventType = watchedEvent.getType(); 
		// zk 路径 
		String path = watchedEvent.getPath(); 
		System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path); 
		// 判断是否建立连接 
		if (KeeperState.SyncConnected == keeperState) { 
			if (EventType.None == eventType) { 
				// 如果建立建立成功,让后程序往下走 
				System.out.println(LOG_MAIN + "zk 建立连接成功!"); 
				countDownLatch.countDown(); 
			} else if (EventType.NodeCreated == eventType) { 
				System.out.println(LOG_MAIN + "事件通知,新增node节点" + path); 
			} else if (EventType.NodeDataChanged == eventType) { 
				System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改...."); 
			} else if (EventType.NodeDeleted == eventType) { 
				System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除...."); 
			} 
 
		} 
		System.out.println("--------------------------------------------------------"); 
 
	} 
 
	public void createConnection(String connectAddres, int sessionTimeOut) { 
		try { 
			zk = new ZooKeeper(connectAddres, sessionTimeOut, this); 
			System.out.println(LOG_MAIN + "zk 开始启动连接服务器...."); 
			countDownLatch.await(); 
		} catch (Exception e) { 
			e.printStackTrace(); 
		} 
	} 
 
	public boolean createPath(String path, String data) { 
		try { 
			this.exists(path, true); 
			this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
			System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data); 
		} catch (Exception e) { 
			e.printStackTrace(); 
			return false; 
		} 
		return true; 
	} 
 
	/** 
	 * 判断指定节点是否存在 
	 *  
	 * @param path 节点路径 
	 */ 
	public Stat exists(String path, boolean needWatch) { 
		try { 
			return this.zk.exists(path, needWatch); 
		} catch (Exception e) { 
			e.printStackTrace(); 
			return null; 
		} 
	} 
 
	public boolean updateNode(String path, String data) throws KeeperException, InterruptedException { 
		exists(path, true); 
		this.zk.setData(path, data.getBytes(), -1); 
		return false; 
	} 
 
} 

 第六、ZooKeeper 应用场景之分布式锁

ZooKeeper 应用场景:

1、数据发布订阅 
2、负载均衡 
3、命名服务 
4、分布式协调 
5、集群管理 
6、配置管理 
7、分布式队列 
8、分布式锁

ZooKeeper 应用场景之分布式锁核心代码:

 
#####生成订单号###### 
import java.text.SimpleDateFormat; 
import java.util.Date; 
 
//生成订单号 
public class OrderNumGenerator { 
	private static int count = 0; 
    //生成订单号 
	public String getOrderNumber() { 
		SimpleDateFormat smt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); 
		return smt.format(new Date()) + "-" + ++count; 
	} 
 
} 
#####订单业务逻辑###### 
public class OrderService implements Runnable { 
	private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); 
	private static Object oj = new Object(); 
	private Lock lock = new ZookeeperDistrbuteLock(); 
 
	public void run() { 
		getNumber(); 
	} 
 
	public void getNumber() { 
		// synchronized (oj) { 
		lock.getLock(); 
		String orderNumber = orderNumGenerator.getOrderNumber(); 
		System.out.println("获取订单号:" + orderNumber); 
		lock.unLock(); 
		// } 
 
	} 
 
	public static void main(String[] args) { 
		for (int i = 0; i < 100; i++) { 
			new Thread(new OrderService()).start(); 
		} 
	} 
 
} 
 
#####lock接口 ###### 
public interface Lock { 
	// 获取锁 
	public void getLock(); 
    // 释放锁 
	public void unLock(); 
} 
 
#####ZookeeperAbstractLock抽象类接口 ###### 
public abstract class ZookeeperAbstractLock implements Lock { 
	private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181"; 
 
	protected ZkClient zkClient = new ZkClient(CONNECT_ADDRES); 
	protected String PATH = "/lock"; 
 
	public void getLock() { 
		// 如果当前节点已经存在,则等待 
		if (tryLock()) { 
			System.out.println("获取到锁 get"); 
		} else { 
			// 等待 
			waitLock(); 
			// 重新获取锁 
			getLock(); 
		} 
	} 
 
	protected abstract void waitLock(); 
 
	protected abstract boolean tryLock(); 
 
	public void unLock() { 
		if (zkClient != null) { 
			zkClient.close(); 
		} 
		System.out.println("已经释放锁..."); 
	} 
#####ZookeeperAbstractLock抽象类接口 ###### 
//实现锁 
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock { 
	private CountDownLatch countDownLatch = new CountDownLatch(1); 
 
	@Override 
	protected boolean tryLock() { 
		try { 
			zkClient.createEphemeral(PATH); 
			// 创建成功 
			return true; 
		} catch (Exception e) { 
			// 创建失败 
			return false; 
		} 
 
	} 
 
	@Override 
	protected void waitLock() { 
		try { 
			IZkDataListener iZkDataListener = new IZkDataListener() { 
 
				public void handleDataDeleted(String path) throws Exception { 
					// 唤醒等待线程, 继续往下走. 
					if (countDownLatch != null) { 
						countDownLatch.countDown(); 
					} 
				} 
 
				public void handleDataChange(String path, Object data) throws Exception { 
 
				} 
			}; 
			// 注册到zk监听中 
			zkClient.subscribeDataChanges(PATH, iZkDataListener); 
			if (zkClient.exists(PATH)) { 
				countDownLatch = new CountDownLatch(1); 
 
				// 等待 
				countDownLatch.await(); 
 
			} 
			// 删除事件通知 
			zkClient.unsubscribeDataChanges(PATH, iZkDataListener); 
		} catch (Exception e) { 
			// TODO: handle exception 
		} 
	} 
 
}

核心思想:

分布式锁使用zk,在zk上创建一个临时节点(有效期)  ,使用临时节点作为锁,因为节点不允许重复。 
如果能创建节点成功,生成订单号,如果创建节点失败,等待。临时节点zk关闭,释放锁,其他节点就可以重新生成订单号。

 

标签:zookeeper
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号