redis分布式锁深入源码理解
前言
大型互联网平台都是服务多机部署,或者是分布式系统,对于并发问题的处理,使用redis的分布式锁防止多机的并发。这种分布式锁是基于Redlock实现的。
Redlock实现
antirez提出的redlock算法大概是这样的:
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
获取当前Unix时间,以毫秒为单位。
依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
Redlock源码
redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。
引入pom工程依赖:
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
初始化redisson客户端:
public void init() throws IOException {
Config config = new Config();
config.useSingleServer()
.setAddress(host)
.setPassword(password)
.setDatabase(0)
.setFailedAttempts(3)
.setPingTimeout(3000)
.setTimeout(5000)
.setSubscriptionConnectionMinimumIdleSize(1)
.setSubscriptionConnectionPoolSize(256)
.setConnectTimeout(connectTimeout)
.setReconnectionTimeout(3000)
.setConnectionPoolSize(256)
.setConnectionMinimumIdleSize(1)
.setRetryAttempts(3)
.setRetryInterval(3000)
.setIdleConnectionTimeout(30000)
.setClientName("com.fshows.bankliquidation.redisclient");
if (redisson == null) {
redisson = Redisson.create(config);
LOGGER.info("redis连接成功,server={}", host);
} else {
LOGGER.warn("redis 重复连接,config={}", config);
}
}
redission封装的redlock算法实现的分布式锁用法:
RLock redisLock = redisCache.getRedisLock(redisKey);
if (redisLock.isLocked()) {
//分布式锁已经被占用
throw CommonException.REPEAT_ACTION_ERROR;
}
try {
//尝试加锁 第一个时间是加锁等待时间,第二个redis锁失效时间 第三个是时间单位
boolean isLocked = redisLock.tryLock(3L, 10L, TimeUnit.SECONDS);
if (isLocked) {
//锁获取成功 执行你的业务逻辑
} else {
//否则报错重复加锁
throw CommonException.REPEAT_ACTION_ERROR;
}
} catch (InterruptedException e) {
throw CommonException.REPEAT_ACTION_ERROR;
} finally {
//最后解锁
redisLock.unlock();
}
唯一ID
实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
return id + ":" + threadId;
}
获取锁
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
获取锁的命令中,
KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;
ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;
ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:
释放锁
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Redis几种架构
Redis发展到现在,几种常见的部署架构有:
单机模式;
主从模式;
哨兵模式;
集群模式;
我们首先基于这些架构讲解Redisson普通分布式锁实现,需要注意的是,只有充分了解普通分布式锁是如何实现的,才能更好的了解Redlock分布式锁的实现,因为Redlock分布式锁的实现完全基于普通分布式锁。
哨兵模式
即sentinel模式,实现代码和单机模式几乎一样,唯一的不同就是Config的构造
Config config = new Config();
config.useSentinelServers().addSentinelAddress(
"redis://172.29.3.245:26378","redis://172.29.3.245:26379", "redis://172.29.3.245:26380")
.setMasterName("mymaster")
.setPassword("a123456").setDatabase(0);
集群模式
Config config = new Config();
config.useClusterServers().addNodeAddress(
"redis://172.29.3.245:6375","redis://172.29.3.245:6376", "redis://172.29.3.245:6377",
"redis://172.29.3.245:6378","redis://172.29.3.245:6379", "redis://172.29.3.245:6380")
.setPassword("a123456").setScanInterval(5000);
总结
普通分布式实现非常简单,无论是那种架构,向Redis通过EVAL命令执行LUA脚本即可。
Redlock分布式锁
既然核心变化是使用了RedissonRedLock,那么我们看一下它的源码有什么不同。这个类是RedissonMultiLock的子类,所以调用tryLock方法时,事实上调用了RedissonMultiLock的tryLock方法,精简源码如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 实现要点之允许加锁失败节点限制(N-(N/2+1))
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
// 实现要点之遍历所有节点通过EVAL命令执行lua加锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 对节点尝试加锁
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
} catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
// 成功获取锁集合
acquiredLocks.add(lock);
} else {
// 如果达到了允许加锁失败节点限制,那么break,即此次Redlock加锁失败
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
}
}
return true;
}
以sentinel模式架构为例,如下图所示,有sentinel-1,sentinel-2,sentinel-3总计3个sentinel模式集群,如果要获取分布式锁,那么需要向这3个sentinel集群通过EVAL命令执行LUA脚本,需要3/2+1=2,即至少2个sentinel集群响应成功,才算成功的以Redlock算法获取到分布式锁: