#分布式锁 #Java

1-分布式锁概念

1.1-什么是分布式锁?

首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法、变量。

在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在jdk java.util 并发包中已经为我们提供了这些方法去加锁, 比如synchronized 关键字 或者Lock 锁,都可以处理。

但是我们现在的应用程序如果只部署一台服务器,那并发量是很差的,如果同时有上万的请求那么很有可能造成服务器压力过大,而瘫痪。

1. 常规锁会造成什么情况?

首先说一下我们为什么要搞集群,简单理解就是,需求量(请求并发量)变大了,一个工人处理能力有限,那就多招一些工人来一起处理。

假设1千万个请求平均分配到100台服务器上,每个服务器 接收10w的请求(这10w个请求并不是在同一秒中来的,可能是在1,2个小时内,可以联想下我们三十晚上开红包,等到10.20开始,有的人立马开了,有的人是不是等到12点了才想起来~)

那这样的话,平均到每一秒上的请求也就不到1千个,这种压力一般的服务器还是可以承受的。

第一个请求到来后,是不是需要在1亿里面给他分一部分钱,金额随机,假设第一个人分到了100,那是不是要在这1亿中减去100块,剩下99999900 块~

第二个用户再来分,金额随机,这次分200块,那是不是就需要在剩下的99999900块中再减去200块,剩下99999700 块。

等到第10w个用户来,一看还有1000w,那这1000w全成他的了。

等于是在每个服务器中去分1亿,也就是10w个用户分了一个亿,最后总计有100个服务器,要分100亿。

如果真这样了,虽说马云爸爸不会破产(据最新统计马云有2300亿人民币),那分红包的开发项目组,以及产品经理,可以GG了~

简化结构图如下:

2. 分布式锁怎么去处理?

那么为了解决这个问题,让1000万用户只分1亿,而不是100亿,这个时候分布式锁就派上用处了。

分布式锁可以把整个集群就当作是一个应用一样去处理,那么也就需要这个锁,要独立于每一个服务之外,而不是在服务里面。

假设第一个服务器接收到用户1的请求后,那么这个时候,他就不能只在自己的应用中去判断还有多少钱可以分了,而需要去外部请求专门负责管理这1亿红包的人(服务),问他:哎,我这里要分100块,给我100。

管理红包的妹子(服务)一看,还有1个亿,那好,给你100块,然后剩下99999900块。

第二个请求到来后,被服务器2获取,继续去询问,管理红包的妹子,我这边要分10块,管理红包的妹子先查了下还有99999900,那就说:好,给你10块。那就剩下99999890块

等到第1000w个请求到来后,服务器100拿到请求,继续去询问,管理红包的妹子,你要100,妹子翻了翻白眼,对你说,就剩1块了,爱要不要,那这个时候就只能给你1块了(1块也是钱啊,买根辣条还是可以的)。

这些请求编号1,2不代表执行的先后顺序,正式的场景下,应该是 100台服务器每个服务器持有一个请求去访问负责管理红包的妹子(服务),那在管红包的妹子那里同时会接收到100个请求,这个时候就需要在负责红包的妹子那里加个锁就可以了(抛绣球),你们100个服务器谁拿到锁(抢到绣球),谁就进来和我谈,我给你分,其他人就等着去吧

经过上面的分布式锁的处理后,马云爸爸终于放心了,决定给红包团队每人加一个鸡腿。

简化的结构图如下:

3. 什么是分布式锁?

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在
分布式系统中,常常需要协调他们的动作。如果不同的系统或是同
一个系统的不同主机之间共享了一个或一组资源,那么访问这些资
源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情
况下,便需要使用到分布式锁。

4. 为什么要使用分布式锁?

为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

举个例子:
机器A , 机器B是一个集群, A, B两台机器上的程序都是一样的, 具备高可用性能.
A, B机器都有一个定时任务, 每天晚上凌晨2点需要执行一个定时任务, 但是这个定时任务只能执行一遍, 否则的话就会报错, 那A,B两台机器在执行的时候, 就需要抢锁, 谁抢到锁, 谁执行, 谁抢不到, 就不用执行了.

5. 锁的处理

在某些场景中,多个进程必须以互斥的方式独占共享资源,这时用分布式锁是最直接有效的。

随着互联网技术快速发展,数据规模增大,分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。

在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。

而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。

6. 分布式锁的实现

  1. 基于数据库实现分布式锁
  2. 基于zookeeper实现分布式锁
  3. 基于Redis缓存实现分布式锁

7. 高效分布式锁

当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。

  1. 互斥

在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。

  1. 防止死锁

在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。

所以分布式非常有必要设置锁的 有效时间 ,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。

  1. 性能

对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。

所以在锁的设计时,需要考虑两点。

  •  锁的颗粒度要尽量小
    。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。

  •  锁的范围尽量要小 。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。

  1. 重入

我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。

  1. 容错性

只要大多数Redis节点正常运行,客户端就能够获取和释放锁。

2-数据库实现分布式锁

2.1- 基于表记录

对于数据库来说,实现分布式锁,最简单的方式就是直接创建一张锁表。然后通过操作表中的数据来实现,当你想要获取锁的时候,在表中添加一条新的记录,想要释放锁的时候就删除这条记录。

1. 创建一张表

CREATE TABLE `database_lock` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`resource` int NOT NULL COMMENT '锁定的资源',
`description` varchar(1024) NOT NULL DEFAULT "" COMMENT '描述',
PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表';

2. 利用这张表去获取或者释放锁

  • 加锁
    当我们需要给某个资源添加锁的时候,就插入一条数据
    INSERT INTO database_lock(resource, description) VALUES (1, ‘lock’);
    resource字段是唯一索引,多个请求请求添加同一条数据,那么其他的就会报错。
  • 释放锁
    释放锁时,删除当前条数据
    DELETE FROM database_lock WHERE resource=1;
    然后其他资源就可以再次添加去获取这个锁了。

3. 特点

  • 没有失效时间
    一旦释放锁的操作失败就会导致锁记录一直在数据库中,其它线程无法获得锁。这个缺陷也很好解决,比如可以做一个定时任务去定时清理。

  • 依赖数据库
    这种锁的可靠性依赖于数据库。建议设置备库,避免单点,进一步提高可靠性。

  • 非阻塞
    这种锁是非阻塞的,因为插入数据失败之后会直接报错,想要获得锁就需要再次操作。如果需要阻塞式的,可以弄个for循环、while循环之类的,直至INSERT成功再返回。

  • 非可重入
    这种锁也是非可重入的,因为同一个线程在没有释放锁之前无法再次获得锁,因为数据库中已经存在同一份记录了。想要实现可重入锁,可以在数据库中添加一些字段,比如获得锁的主机信息、线程信息等,那么在再次获得锁的时候可以先查询数据,如果当前的主机信息和线程信息等能被查到的话,可以直接把锁分配给它。

2.2-乐观锁

认为数据的更新在大多数情况下是不会产生冲突的,只在数据库更新操作提交的时候才对数据作冲突检测。如果检测的结果出现了与预期数据不一致的情况,则返回失败信息。

乐观锁大多数是基于数据版本(version)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个 “version”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。

在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。

1. 创建表

	CREATE TABLE `optimistic_lock` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`resource` int NOT NULL COMMENT '锁定的资源',
`version` int NOT NULL COMMENT '版本信息',
`created_at` datetime COMMENT '创建时间',
`updated_at` datetime COMMENT '更新时间',
`deleted_at` datetime COMMENT '删除时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表';

在使用之前需要添加对应的数据

INSERT INTO optimistic_lock(resource, version, created_at, updated_at) VALUES(20, 1, CURTIME(), CURTIME());

2. 执行过程

现在我们需要对resource字段进行-1操作
  • 原来的执行过程
    STEP1 - 获取资源:SELECT resource FROM optimistic_lock WHERE id = 1
    STEP2 - 执行业务逻辑
    STEP3 - 更新资源:UPDATE optimistic_lock SET resource = resource -1 WHERE id = 1
  • 添加version之后
    STEP1 - 获取资源: SELECT resource, version FROM optimistic_lock WHERE id = 1
    STEP2 - 执行业务逻辑
    STEP3 - 更新资源:UPDATE optimistic_lock SET resource = resource -1, version = version + 1 WHERE id = 1 AND version = oldVersion
    也可以将version改成时间戳,检测库中的更新时间是否和之前的一致。

3. 优缺点

乐观锁的优点比较明显,由于在检测数据冲突时并不依赖数据库本身的锁机制,不会影响请求的性能,当产生并发且并发量较小的时候只有少部分请求会失败。

缺点是需要对表的设计增加额外的字段,增加了数据库的冗余,另外,当应用并发量高的时候,version值在频繁变化,则会导致大量请求失败,影响系统的可用性。我们通过上述sql语句还可以看到,数据库锁都是作用于同一行数据记录上,这就导致一个明显的缺点,在一些特殊场景,如大促、秒杀等活动开展的时候,大量的请求同时请求同一条记录的行锁,会对数据库产生很大的写压力。所以综合数据库乐观锁的优缺点,乐观锁比较适合并发量不高,并且写操作不频繁的场景。

3.1-悲观锁

1. 利用数据库行锁

利用select for update ,数据库的行锁来实现悲观锁,但是mysql的innoDB在加锁的时候,只有明确地指定主键(或者索引)的才能指定被选中的数据,执行行锁,否则会执行表锁。

2. 执行过程

需要关闭mysql的自动提交功能

# mysql关闭自动提交
mysql> SET AUTOCOMMIT = 0; Query OK, 0 rows affected (0.00 sec)

使用@Transaction也会关闭自动提交
然后具体流程
- 获取锁
SELECT * FROM database_lock WHERE id = 1 FOR UPDATE;
- 执行业务逻辑
- 释放锁:COMMIT

3. 注意点

  • 必须能查到数据
    指定主键并且能查询到数据的过程(触发行锁),如果查不到数据那么也就无从“锁”起了。

  • 如果没有命中索引就会变成表锁

  • 悲观锁特点
    在悲观锁中,每一次行数据的访问都是独占的,只有当正在访问该行数据的请求事务提交以后,其他请求才能依次访问该数据,否则将阻塞等待锁的获取。悲观锁可以严格保证数据访问的安全。但是缺点也明显,即每次请求都会额外产生加锁的开销且未获取到锁的请求将会阻塞等待锁的获取,在高并发环境下,容易造成大量请求阻塞,影响系统可用性。另外,悲观锁使用不当还可能产生死锁的情况。

3-ZooKeeper实现分布式锁

3.1-zookeeper客户端选型

  • 原生zookeeper客户端,有watcher一次性、无超时重连机制等一系列问题
  • ZkClient,解决了原生客户端一些问题,一些存量老系统中还在使用
  • curator,提供了各种应用场景(封装了分布式锁,计数器等),新项目首选

3.2-zookeeper分布式锁实现原理

  • zookeeper中规定,在同一时刻,不能有多个客户端创建同一个节点,我们可以利用这个特性实现分布式锁。zookeeper临时节点只在session生命周期存在,session一结束会自动销毁。
  • watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁,这也是zookeeper分布式锁较其他分布式锁方案的一大优势。

1. 基于临时节点方案

第一种方案实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁。

我们模仿原生jdk的lock接口设计,采用模板方法设计模式来编写分布式锁,这样的好处是扩展性强,我们可以快速切换到redis分布式锁、数据库分布式锁等实现方式。

  • 创建Lock接口
public interface Lock {  
/**
  * 获取锁
  */
void getLock() throws Exception;

/**
  * 释放锁
  */
void unlock() throws Exception;
}
  • AbstractTemplateLock抽象类

    public abstract class AbstractTemplateLock implements Lock {
    @Override
    public void getLock() {
    if (tryLock()) {
    System.out.println(Thread.currentThread().getName() + "获取锁成功");
    } else {
    //等待
    waitLock();//事件监听 如果节点被删除则可以重新获取
    //重新获取
    getLock();
    }
    }

    protected abstract void waitLock();

    protected abstract boolean tryLock();

    protected abstract void releaseLock();

    @Override
    public void unlock() {
    releaseLock();
    }
    }
  • zookeeper分布式锁逻辑

    @Slf4j
    public class ZkTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;

    private static final String lockPath = "/lockPath";


    private ZkClient client;

    public ZkTemplateLock() {
    client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
    log.info("zk client 连接成功:{}", zkServers);
    }

    @Override
    protected void waitLock() {
    CountDownLatch latch = new CountDownLatch(1);

    IZkDataListener listener = new IZkDataListener() {
    @Override
    public void handleDataDeleted(String dataPath) throws Exception {
    System.out.println("监听到节点被删除");
    latch.countDown();
    }

    @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {
    }
    };
    //完成 watcher 注册
    client.subscribeDataChanges(lockPath, listener);

    //阻塞自己
    if (client.exists(lockPath)) {
    try {
    latch.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    //取消watcher注册
    client.unsubscribeDataChanges(lockPath, listener);
    }

    @Override
    protected boolean tryLock() {
    try {
    client.createEphemeral(lockPath);
    System.out.println(Thread.currentThread().getName() + "获取到锁");
    } catch (Exception e) {
    log.error("创建失败");
    return false;
    }
    return true;
    }

    @Override
    public void releaseLock() {
    client.delete(this.lockPath);
    }
    }

  • 缺点

每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“惊群”现象,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lockPath节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣。

2. 基于临时顺序节点方案

临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点继续判断谁为最小序号的节点。

临时顺序节点操作源码

@Slf4j
public class ZkSequenTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;
private static final String lockPath = "/lockPath";
private String beforePath;
private String currentPath;
private ZkClient client;

public ZkSequenTemplateLock() {
client = new ZkClient(zkServers);
if (!client.exists(lockPath)) {
client.createPersistent(lockPath);

}
log.info("zk client 连接成功:{}", zkServers);

}

@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点被删除");
latch.countDown();
}

@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
//给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点
client.subscribeDataChanges(beforePath, listener);

//阻塞自己
if (client.exists(beforePath)) {
try {
System.out.println("阻塞" + currentPath);
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消watcher注册
client.unsubscribeDataChanges(beforePath, listener);
}

@Override
protected boolean tryLock() {
if (currentPath == null) {
//创建一个临时顺序节点
currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
System.out.println("current:" + currentPath);
}

//获得所有的子节点并排序。临时节点名称为自增长的字符串
List<String> childrens = client.getChildren(lockPath);
//排序list,按自然顺序排序
Collections.sort(childrens);
if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
return true;
} else {
//如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath
int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
beforePath = lockPath + "/" + childrens.get(curIndex - 1);
}
System.out.println("beforePath" + beforePath);
return false;
}

@Override
public void releaseLock() {
System.out.println("delete:" + currentPath);
client.delete(currentPath);
}
}

3.3-Curator分布式锁工具

curator提供了以下种类的锁:

  • 共享可重入锁(Shared Reentrant Lock):全局同步锁,同一时间不会有两个客户端持有一个锁
  • 共享锁:与共享可重入锁类似,但是不可重入(有时候会因为这个原因造成死锁)
  • 共享可重入读写锁
  • 共享信号量
  • Multi Shared Lock:管理多种锁的容器实体

我们采用第一种Shared Reentrant Lock中的InterProcessMutex来完成上锁、释放锁的的操作

public class ZkLockWithCuratorTemplate implements Lock {
// zk host地址
private String host = "localhost";

// zk自增存储node
private String lockPath = "/curatorLock";

// 重试休眠时间
private static final int SLEEP_TIME_MS = 1000;
// 最大重试1000次
private static final int MAX_RETRIES = 1000;
//会话超时时间
private static final int SESSION_TIMEOUT = 30 * 1000;
//连接超时时间
private static final int CONNECTION_TIMEOUT = 3 * 1000;
//curator核心操作类
private CuratorFramework curatorFramework;

InterProcessMutex lock;

public ZkLockWithCuratorTemplate() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
.build();
curatorFramework.start();
lock = new InterProcessMutex(curatorFramework, lockPath);
}

@Override
public void getLock() throws Exception {
//5s后超时释放锁
lock.acquire(5, TimeUnit.SECONDS);
}

@Override
public void unlock() throws Exception {
lock.release();
}
}

4-Redis实现分布式锁

4.1-SETNX+Lua脚本

主要思路:通过 set key value px milliseconds nx命令实现加锁, 通过Lua脚本实现解锁。

SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]

- EX seconds 设置指定的到期时间(单位为秒)
- PX milliseconds 设置指定的到期时间(单位毫秒)
- NX: 仅在键不存在时设置键
- XX: 只有在键已存在时设置

核心实现命令如下

//获取锁(unique_value可以是UUID等)  
SET resource_name unique_value NX PX 30000

//释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call(&quot;get&quot;,KEYS[1]) == ARGV[1] then
return redis.call(&quot;del&quot;,KEYS[1])
else
return 0
end

这种实现方式主要有以下几个要点:

  • set 命令要用 set key value px milliseconds nx,替代 setnx + expire 需要分两次执行命令的方式,保证了原子性
  • value 要具有唯一性,可以使用UUID.randomUUID().toString()方法生成,用来标识这把锁是属于哪个请求加的,在解锁的时候就可以有依据
  • 释放锁时要验证 value 值,防止误解锁
  • 通过 Lua 脚本来避免 Check And Set 模型的并发问题,因为在释放锁的时候因为涉及到多个Redis操作 (利用了eval命令执行Lua脚本的原子性)

完整代码如下

public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;

/**
* 获取分布式锁(加锁代码)
*
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean getDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}

/**
* 释放分布式锁(解锁代码)
*
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;

}
}

加锁代码分析

首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。

其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。

最后,因为我们将value赋值为requestId,用来标识这把锁是属于哪个请求加的,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。

解锁代码分析

将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。

在执行的时候,首先会获取锁对应的value值,检查是否与requestId相等,如果相等则解锁(删除key)。

这种方式仍存在单点风险,以上实现在 Redis 正常运行情况下是没问题的,但如果存储锁对应key的那个节点挂了的话,就可能存在丢失锁的风险,导致出现多个客户端持有锁的情况,这样就不能实现资源的独享了。

  1. 客户端A从master获取到锁
  2. 在master将锁同步到slave之前,master宕掉了(Redis的主从同步通常是异步的)
  3. 主从切换,slave节点被晋级为master节点
  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。导致存在同一时刻存在不止一个线程获取到锁的情况

所以在这种实现之下,不论Redis的部署架构是单机模式、主从模式、哨兵模式还是集群模式,都存在这种风险,因为Redis的主从同步是异步的。

幸运的是,Redis 之父 antirez 提出了 redlock算法 可以解决这个问题。

4.2-Redlock

Redlock 红锁是为了解决主从架构中当出现主从切换导致多个客户端持有同一个锁而提出的一种算法。

大家可以看官方文档(https://redis.io/topics/distlock),以下来自官方文档的翻译。

想用使用 Redlock,官方建议在不同机器上部署 5 个 Redis 主节点,节点都是完全独立,也不使用主从复制,使用多个节点是为容错。

一个客户端要获取锁有 5 个步骤

  1. 客户端获取当前时间 T1(毫秒级别);

  2. 使用相同的 key和 value顺序尝试从 N个 Redis实例上获取锁。

    • 每个请求都设置一个超时时间(毫秒级别),该超时时间要远小于锁的有效时间,这样便于快速尝试与下一个实例发送请求。
    • 比如锁的自动释放时间 10s,则请求的超时时间可以设置 5~50 毫秒内,这样可以防止客户端长时间阻塞。
  3. 客户端获取当前时间 T2 并减去步骤 1 的 T1 来计算出获取锁所用的时间(T3 = T2 -T1)。当且仅当客户端在大多数实例(N/2 + 1)获取成功,且获取锁所用的总时间 T3 小于锁的有效时间,才认为加锁成功,否则加锁失败。

  4. 如果第 3 步加锁成功,则执行业务逻辑操作共享资源,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。

  5. 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个 Redis 实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。

另外部署实例的数量要求是奇数,为了能很好的满足过半原则,如果是 6 台则需要 4 台获取锁成功才能认为成功,所以奇数更合理

事情可没这么简单,Redis 作者把这个方案提出后,受到了业界著名的分布式系统专家的质疑

两人好比神仙打架,两人一来一回论据充足的对一个问题提出很多论断……

4.3-Redisson

1. 什么是Redission?

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。
它不仅提供了一系列的分布式的Java常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

Redisson提供了使用Redis的最简单和最便捷的方法。宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

2. Redisson 分布式锁使用

  1. 引入Maven依赖
    基于 SpringBoot starter 方式,添加 starter。
    <dependency>  
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.4</version>
    </dependency>
    不过这里需要注意 springboot 与 redisson 的版本,因为官方推荐 redisson 版本与 springboot 版本配合使用。

将 Redisson 与 Spring Boot 库集成,还取决于 Spring Data Redis 模块。

使用 SpringBoot 2.5.x 版本, 所以需要添加 redisson-spring-data-25。

<dependency>  
<groupId>org.redisson</groupId>
<!-- for Spring Data Redis v.2.5.x -->
<artifactId>redisson-spring-data-25</artifactId>
<version>3.16.4</version>
</dependency>
  1. 添加配置文件:
    spring:  
    redis:
    database:
    host:
    port:
    password:
    ssl:
    timeout:
    # 根据实际情况配置 cluster 或者哨兵
    cluster:
    nodes:
    sentinel:
    master:
    nodes:

就这样在 Spring 容器中我们拥有以下几个 Bean 可以使用:

  • RedissonClient
  • RedissonRxClient
  • RedissonReactiveClient
  • RedisTemplate
  • ReactiveRedisTemplate

失败无限重试

RLock lock = redisson.getLock("码哥字节");
try {

// 1.最常用的第一种写法
lock.lock();

// 执行业务逻辑
//.....

} finally {
lock.unlock();
}

拿锁失败时会不停的重试,具有 Watch Dog 自动延期机制,默认续 30s 每隔 30/3=10 秒续到 30s。

失败超时重试,自动续命

// 尝试拿锁10s后停止重试,获取失败返回false,具有Watch Dog 自动延期机制, 默认续30s  
boolean flag = lock.tryLock(10, TimeUnit.SECONDS);

超时自动释放锁

// 没有Watch Dog ,10s后自动释放,不需要调用 unlock 释放锁。  
lock.lock(10, TimeUnit.SECONDS);

超时重试,自动解锁

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁,没有 Watch dog  
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}

Watch Dog 自动延时

如果获取分布式锁的节点宕机,且这个锁还处于锁定状态,就会出现死锁。

为了避免这个情况,我们都会给锁设置一个超时自动释放时间。

然而,还是会存在一个问题。

假设线程获取锁成功,并设置了 30 s 超时,但是在 30s 内任务还没执行完,锁超时释放了,就会导致其他线程获取不该获取的锁。

所以,Redisson 提供了 watch dog 自动延时机制,提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。

也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。

默认情况下,看门狗的续期时间是 30s,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。

另外 Redisson 还提供了可以指定 leaseTime 参数的加锁方法来指定加锁的时间。

超过这个时间后锁便自动解开了,不会延长锁的有效期。

原理如下图:

有两个点需要注意:

  • watchDog 只有在未显示指定加锁超时时间(leaseTime)时才会生效。
  • lockWatchdogTimeout 设定的时间不要太小 ,比如设置的是 100 毫秒,由于网络直接导致加锁完后,watchdog 去延期时,这个 key 在 redis 中已经被删除了。

4.4-Redisson原理分析

为了更好的理解分布式锁的原理,我这边自己画张图通过这张图来分析。

1. 加锁机制

线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。

线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。

2. watch dog自动延期机制

这个比较难理解,找了些许资料感觉也并没有解释的很清楚。这里我自己的理解就是:

在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。

但在实际开发中会有下面一种情况:

 //设置锁1秒过去  
redissonLock.lock("redisson", 1);
/**
  * 业务逻辑需要咨询2秒
  */
redissonLock.release("redisson");

/**
  * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业 务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒 后,这个锁就自动过期了,
  * 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
  * 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
  */

所以这个时候 看门狗 就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch
dog后台线程,不断的延长锁key的生存时间。

注意 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。

3. 为啥要用lua脚本呢?

这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的
原子性

4. 可重入加锁机制

Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关:

    1、Redis存储锁的数据类型是 Hash类型    
2、Hash数据类型的key值包含了当前线程信息。

下面是redis存储的数据  

这里表面数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>> 类型,这里key是指 ‘redisson’

它的有效期还有9秒,我们再来看里们的key1值为 078e44a3-5f95-4e24-b6aa-80684655a15a:45 它的组成是:

guid + 当前线程的ID。后面的value是就和可重入加锁有关。

举图说明

上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。

4.5-源码导读

在调用 lock 方法时,会最终调用到 tryAcquireAsync

调用链为:lock()->tryAcquire->tryAcquireAsync,详细解释如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
//如果指定了加锁时间,会直接去加锁
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//没有指定加锁时间 会先进行加锁,并且默认时间就是 LockWatchdogTimeout的时间
//这个是异步操作 返回RFuture 类似netty中的future
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}

//这里也是类似netty Future 的addListener,在future内容执行完成后执行
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) {
// leaseTime不为-1时,不会自动延期
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//这里是定时执行 当前锁自动延期的动作,leaseTime为-1时,才会自动延期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}

scheduleExpirationRenewal 中会调用 renewExpiration 启用了一个 timeout 定时,去执行延期动作。

private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager()
.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 省略部分代码....

RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
....

if (res) {
//如果 没有报错,就再次定时延期
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// 这里我们可以看到定时任务 是 lockWatchdogTimeout 的1/3时间去执行 renewExpirationAsync
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

scheduleExpirationRenewal 会调用到 renewExpirationAsync,执行下面这段 lua 脚本。

他主要判断就是 这个锁是否在 redis 中存在,如果存在就进行 pexpire 延期。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
  • watch dog 在当前节点还存活且任务未完成则每 10 s 给锁续期 30s。
  • 程序释放锁操作时因为异常没有被执行,那么锁无法被释放,所以释放锁操作一定要放到 finally {} 中;
  • 要使 watchLog 机制生效 ,lock 时 不要设置 过期时间。
  • watchlog 的延时时间 可以由 lockWatchdogTimeout 指定默认延时时间,但是不要设置太小。
  • watchdog 会每 lockWatchdogTimeout/3 时间,去延时。
  • 通过 lua 脚本实现延迟。

4.6-Redis分布式锁的缺点

Redis分布式锁会有个缺陷,就是在Redis哨兵模式下:

客户端1 对某个 master节点 写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生
master节点宕机,主备切换,slave节点从变为了 master节点。

这时 客户端2 来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。

这时系统在业务语义上一定会出现问题, 导致各种脏数据的产生

缺陷 在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。

参考文章
实战!用Redisson来实现分布式锁,真香! (qq.com)
肝一下ZooKeeper实现分布式锁的方案,附带实例! (qq.com)
Redis分布式锁的最佳实践 - Redisson (qq.com)
Redisson是如何实现分布式锁的? (qq.com)
从源码角度看 Redis 分布式锁的 3 种实现方案! (qq.com)