分布式锁
1.介绍
1.1 背景
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题. 但当我们的应用是分布式部署的情况下,那么就需要一种更加高级的锁机制来处理这个进程级别的代码同步问题
1.2 分布式锁应该具备的条件
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
- 高可用的获取锁与释放锁
- 高性能的获取锁与释放锁
- 具备可重入特性
- 具备锁失效机制,防止死锁
- 具备阻塞锁特性,即没有获取到锁将继续等待获取锁
- 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
1.3 分布式锁的三种实现方式
- 基于数据库实现分布式锁
- 基于缓存(Redis等)实现分布式锁
- 基于Zookeeper实现分布式锁
2.基于数据库的实现方式
要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了. 当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录.
2.1 数据库操作
创建数据表
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
锁方法
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
释放锁
delete from methodLock where method_name ='method_name'
2.2 问题及优化
- 因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能
- --> 数据库需要双机部署、数据同步、主备切换
- 不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据
- --> 需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁
- 没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁
- --> 需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据
- 不具备阻塞锁特性,获取不到锁直接返回失败
- --> 需要优化获取逻辑,循环多次去获取
2.3 排他锁
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name=xxx for update;
if(result==null){
return true;
}
}catch(Exception e){
}
sleep(1000);
}
return false;
}
释放锁
public void unlock(){
connection.commit();
}
在查询语句后加
for update,数据库会在查询过程中给数据库表增加排它锁.当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁. InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁.这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题.重载方法的话建议把参数类型也加上.
2.4 优缺点
- 优点: 借助数据库,方案简单.
- 缺点: 在实际实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑.
3.基于redis的分布式锁的实现
3.1 背景
选用Redis实现分布式锁原因
- Redis有很高的性能
- Redis命令对此支持较好,实现起来比较方便
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令
- SETNX
- SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0
- expire
- expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁
- delete
- delete key:删除key
- SETNX
3.2 加锁和解锁
加锁操作的正确姿势为:
- 使用setnx命令保证互斥性
- 需要设置锁的过期时间,避免死锁
- setnx和设置过期时间需要保持原子性,避免在设置setnx成功之后在设置过期时间客户端崩溃导致死锁
- 加锁的Value 值为一个唯一标示.可以采用UUID作为唯一标示.加锁成功后需要把唯一标示返回给客户端来用来客户端进行解锁操作 解锁的正确姿势为:
- 需要拿加锁成功的唯一标示要进行解锁,从而保证加锁和解锁的是同一个客户端
- 解锁操作需要比较唯一标示是否相等,相等再执行删除操作.这2个操作可以采用Lua脚本方式使2个命令的原子性
注意问题
- 加锁时:应该保证setnx和expire原子性;要保证解锁和解锁必须是同一个客户端
- 解锁时:如果没有事先判断拥有者而直接解锁,会导致任何客户端都可以随时解锁;保证get锁和解锁操作的原子性
- 举例:客户端A加锁,一段时间之后客户端A进行解锁操作时,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del方法,则客户端A将客户端B的锁给解除了.
3.3 Redis实现
public class RedisTool{
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
// 获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)
// 使用Lua语言来实现,是要确保上述操作是原子性的
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
4.基于ZooKeeper的实现方式
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名.
4.1 Zookeeper性质
- 有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推.
- 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点.
- 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端.当前zookeeper有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更.
4.2 实现步骤
- 创建一个目录mylock
- 线程A想获取锁就在mylock目录下创建临时顺序节点
- 获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁
- 线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点
- 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁
4.3 代码实现
public class DistributedLock implements Watcher{
private ZooKeeper zk = null;
// 根节点
private String ROOT_LOCK = "/locks";
// 竞争的资源
private String lockName;
// 等待的前一个锁
private String WAIT_LOCK;
// 当前锁
private String CURRENT_LOCK;
// 计数器
private CountDownLatch countDownLatch;
private int sessionTimeout = 30;
private List<Exception> exceptionList = new ArrayList<>();
/**
* 配置分布式锁
*
* @param config 连接的url
* @param lockName 竞争资源
*/
public DistributedLock(String config, String lockName){
this.lockName = lockName;
try{
// 连接zookeeper
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null){
// 如果根节点不存在,则创建根节点
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}catch(IOException | InterruptedException | KeeperException e){
e.printStackTrace();
}
}
// 节点监视器
public void process(WatchedEvent event){
if (this.countDownLatch != null){
this.countDownLatch.countDown();
}
}
public void lock(){
if (exceptionList.size() > 0){
throw new RuntimeException(exceptionList.get(0));
}
try{
if (this.tryLock()){
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
}else{
// 等待锁
waitForLock(WAIT_LOCK, sessionTimeout, TimeUnit.SECONDS);
}
}catch(InterruptedException | KeeperException e){
e.printStackTrace();
}
}
public boolean tryLock(){
try{
String splitStr = "_lock_";
if (lockName.contains(splitStr)){
throw new RuntimeException("锁名有误");
}
// 创建临时有序节点
CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(CURRENT_LOCK + " 已经创建");
// 取所有子节点
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
// 取出所有lockName的锁
List<String> lockObjects = Lists.newArrayList();
for (String node : subNodes){
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)){
lockObjects.add(node);
}
}
Collections.sort(lockObjects);
System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
// 若当前节点为最小节点,则获取锁成功
if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))){
return true;
}
// 若不是最小节点,则找到自己的前一个节点
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
}catch(InterruptedException | KeeperException e){
e.printStackTrace();
}
return false;
}
public boolean tryLock(long timeout, TimeUnit unit){
try{
if (this.tryLock()){
return true;
}
return waitForLock(WAIT_LOCK, timeout, unit);
}catch(Exception e){
e.printStackTrace();
}
return false;
}
// 等待锁
private boolean waitForLock(String prev, long waitTime, TimeUnit unit) throws KeeperException, InterruptedException{
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null){
System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
this.countDownLatch = new CountDownLatch(1);
// 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
this.countDownLatch.await(waitTime, unit);
this.countDownLatch = null;
System.out.println(Thread.currentThread().getName() + " 等到了锁");
}
return true;
}
public void unlock(){
try{
System.out.println("释放锁 " + CURRENT_LOCK);
zk.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zk.close();
}catch(InterruptedException | KeeperException e){
e.printStackTrace();
}
}
}
zk客户端curator实现
public class CuratorTest {
public static void main(String[] args) throws Exception {
//创建zookeeper的客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
client.start();
//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
mutex.acquire();
//获得了锁, 进行业务流程
System.out.println("Enter mutex");
//完成业务流程, 释放锁
mutex.release();
//关闭客户端
client.close();
}
}