继上一篇 SpringBoot 集成 redis 分布式锁 写到最后,我们发现这种分布式锁也存在着缺陷,如果A在 setnx 成功后,A成功获取锁了,也就是锁已经存到 Redis 里面了,此时服务器异常关闭或是重启,将不会执行我们的设置过期时间操作,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁。
为了解决上篇出现的死锁问题,提出了双重防死锁,可以更好的解决死锁问题。
原理图如下:
过程分析
1、当A通过 setnx(lockkey,currenttime+timeout) 命令能成功设置 lockkey 时,即返回值为1;
2、当A通过 setnx(lockkey,currenttime+timeout) 命令不能成功设置 lockkey 时,这是不能直接断定获取锁失败;因为我们在设置锁时,设置了锁的超时时间 timeout,当前时间大于 redis 中存储键值为 lockkey 的 value 值时,可以认为上一任的拥有者对锁的使用权已经失效了,A就可以强行拥有该锁;具体判定过程如下;
3、A通过 get(lockkey),获取 redis 中的存储键值为 lockkey 的 value 值,即获取锁的相对时间 lockvalueA;
4、lockvalueA!=null && currenttime>lockvalue,A通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;
5、步骤4返回结果为 true 后,通过 getSet 设置新的超时时间,并返回旧值 lockvalueB,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁;
6、lockvalueB == null || lockvalueA==lockvalueB,判断:若果 lockvalueB 为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的 lockvalueB 一致说明中间未被其他进程获取该锁,可以获取锁;否则不能获取锁,结束,获取锁失败。
代码实现
项目代码结构图
把上篇的拦截器类(LockMethodInterceptor)的代码修改如下:
package com.tuhu.twosample.chen.distributed.interceptor;
import com.tuhu.twosample.chen.distributed.annotation.CacheLock;
import com.tuhu.twosample.chen.distributed.common.CacheKeyGenerator;
import org.apache.logging.log4j.util.PropertiesUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
/**
* @author chendesheng
* @create 2019/10/11 16:11
*/
@Aspect
@Configuration
public class LockMethodInterceptor {
@Autowired
public LockMethodInterceptor(StringRedisTemplate lockRedisTemplate, CacheKeyGenerator cacheKeyGenerator) {
this.lockRedisTemplate = lockRedisTemplate;
this.cacheKeyGenerator = cacheKeyGenerator;
}
private final StringRedisTemplate lockRedisTemplate;
private final CacheKeyGenerator cacheKeyGenerator;
@Around("execution(public * *(..)) && @annotation(com.tuhu.twosample.chen.distributed.annotation.CacheLock)")
public Object interceptor(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
CacheLock lock = method.getAnnotation(CacheLock.class);
if (StringUtils.isEmpty(lock.prefix())) {
throw new RuntimeException("lock key can't be null...");
}
final String lockKey = cacheKeyGenerator.getLockKey(pjp);
final long lockTime = lock.expire();
try {
//key不存在才能设置成功,获得了分布式锁,设置锁过期时间
final Boolean success = lockRedisTemplate.opsForValue().setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()+lockTime));
if (success) {
lockRedisTemplate.expire(lockKey, lock.expire(), lock.timeUnit());
} else {
String lockValueA = lockRedisTemplate.opsForValue().get(lockKey);
//查到锁的值并与当前时间比较检查其是否已经超时,若超时则可以重新获取锁
if (lockValueA!=null && System.currentTimeMillis() > Long.valueOf(lockValueA)){
//通过用当前时间戳 getAndSet 操作会给对应的key设置新的值并返回旧值,这是一个原子操作
String lockValueB = lockRedisTemplate.opsForValue().getAndSet(lockKey,String.valueOf(System.currentTimeMillis()+lockTime));
//redis返回nil,则说明该值已经无效
if (lockValueB == null && StringUtils.pathEquals(lockValueA,lockValueB)){
//获取锁成功
lockRedisTemplate.expire(lockKey, lock.expire(), lock.timeUnit());
}else {
//获取锁失败
throw new RuntimeException("请勿重复请求");
}
}
//按理来说 我们应该抛出一个自定义的 CacheLockException 异常;
throw new RuntimeException("请勿重复请求");
}
try {
return pjp.proceed();
} catch (Throwable throwable) {
throw new RuntimeException("系统异常");
}
} finally {
//如果演示的话需要注释该代码;实际应该放开
// lockRedisTemplate.delete(lockKey);
}
}
}
这样我们双重防死锁的 redis 分布式锁也已经实现了。
优化点
加入了超时时间判断锁是否超时了,即使A在成功设置了锁之后,服务器就立即出现宕机或是重启,也不会出现死锁问题;因为B在尝试获取锁的时候,如果不能setnx成功,会去获取 redis 中锁的超时时间与当前的系统时间做比较,如果当前的系统时间已经大于锁超时时间,说明A已经对锁的使用权失效,B能继续判断能否获取锁,解决了redis分布式锁的死锁问题。