模拟一个电商里面下单减库存的场景。

1.首先在redis里加入商品库存数量。

2.新建一个Spring Boot项目,在pom里面引入相关的依赖。

  <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3.接下来,在application.yml配置redis属性和指定应用的端口号:

server:
port: 8090 spring:
redis:
host: 192.168.0.60
port: 6379

4.新建一个Controller类,扣减库存第一版代码:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource;
import java.util.Objects; @RestController
public class StockController { private static final Logger logger = LoggerFactory.getLogger(StockController.class); @Resource
private StringRedisTemplate stringRedisTemplate; @RequestMapping("/reduceStock")
public String reduceStock() {
// 从redis中获取库存数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 减库存
int restStock = stock - 1;
// 剩余库存再重新设置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣减成功,剩余库存:{}", restStock);
} else {
logger.info("库存不足,扣减失败。");
} return "success";
}
}

上面第一版的代码存在什么问题:超卖。假如多个线程同时调用获取库存数量的代码,那么每个线程拿到的都是100,判断库存都大于0,都可以执行减库存的操作。假如两个线程都做减库存更新缓存,那么缓存的库存变成99,但实际上,应该是减掉2个库存。

那么很多人的第一个想法是加synchronized同步代码块,因为获取数量和减库存不是原子性操作,有多个线程来执行代码的时候,只允许一个线程执行代码块里的代码。那么改完的第二版的代码如下:

 @RequestMapping("/reduceStock")
public String reduceStock() {
synchronized (this) {
// 从redis中获取库存数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 减库存
int restStock = stock - 1;
// 剩余库存再重新设置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣减成功,剩余库存:{}", restStock);
} else {
logger.info("库存不足,扣减失败。");
}
} return "success";
}

但使用synchronize存在的问题,就是只能保证单机环境运行时没有问题的。但现在的软件公司里,基本上都是集群架构,是多实例,前面使用Nginx做负载均衡,大概架构如下:

Nginx分发请求,把请求发送到不同的Tomcat容器,而synchronize只能保证一个应用是没有问题的。

那么代码改进第三版,就是引入redis分布式锁,具体代码如下:

 @RequestMapping("/reduceStock")
public String reduceStock() {
String lockKey = "stockKey";
try {
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if (!result) {
return "errorCode";
}
// 从redis中获取库存数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 减库存
int restStock = stock - 1;
// 剩余库存再重新设置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣减成功,剩余库存:{}", restStock);
} else {
logger.info("库存不足,扣减失败。");
}
} finally {
stringRedisTemplate.delete(lockKey)
}
return "success";
}

如果有一个线程拿到锁,那么其他的线程就会等待。一定要记得在finally里面把使用完的锁要删除掉。否则一旦抛出异常,只有一个线程会一直持有锁,其他线程没有机会获取。

但如果在执行if (stock > 0) {代码块里的代码,因为宕机或重启没有执行完,也会一直持有锁,所以,这里需要把锁加一个超时时间:

   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

但如果上面两行代码在中间执行出问题了,设置超时时间的代码还没执行,也会出现锁不能释放的问题。好在有对应的方法:就是把上面两行代码设置成一个原子操作:

   // 这里默认设置超时时间为10秒
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);

到此为止,如果并发量不是很大的话,基本上是没有问题的。

但是,如果请求的并发量很大,就会出现新的问题:有种比较特殊的情况,第一个线程执行了15秒,但是执行到10秒钟的时候,锁已经失效释放了,那么在高并发场景下,第二个线程发现锁已经失效,那么它就可以拿到这把锁进行加锁,

假设第二个线程执行需要8秒,它执行到5秒钟后,此时第一个线程已经执行完了,执行完那一刻,进行了删除key的操作,但是此时的锁是第二个线程加的,这样第一个线程把第二个线程加的锁删掉了。

那意味着第三个线程又可以拿到锁,第三个线程执行了3秒钟,此时第二个线程执行完毕,那么第二个线程把第三个线程的锁又删除了。导致锁失效。

那么解决的思路就是,我自己加的锁,不要被别删掉。那么可以为每个进来的请求生成一个唯一的id,作为分布式锁的值,然后在释放时,判断一下当前线程的id,是不是和缓存里的id是否相等。

 @RequestMapping("/reduceStock")
public String reduceStock() {
String lockKey = "stockKey";
String id = UUID.randomUUID().toString();
try {
// 这里默认设置超时时间为30秒
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
if (!result) {
return "errorCode";
}
// 从redis中获取库存数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 减库存
int restStock = stock - 1;
// 剩余库存再重新设置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣减成功,剩余库存:{}", restStock);
} else {
logger.info("库存不足,扣减失败。");
}
} finally {
if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
stringRedisTemplate.delete(lockKey);
}
}
return "success";
}

到此为止,一个比较完善的锁就实现了,可以应付大部分场景。

当然,上面的代码还有一个问题,就是一个线程执行时间超过了过期时间,后面的代码还没有执行完,锁就已经删除了,还是会有些bug存在。解决的方法是给锁续命的操作。

在当前主线程获取到锁以后,可以fork出一个线程,执行Timer定时器操作,假如默认超时时间为30秒,那么定时器每隔10秒去看下这把锁还是否存在,存在就说明这个锁里的逻辑还没有执行完,

那么就可以把当前主线程的超时时间重新设置为30秒;如果不存在,就直接结束掉。

但是上面的逻辑,在高并发场景下,实现比较完善还是比较困难的。好在现在已经有比较成熟的框架,那就是Redisson。官方:https://redisson.org

下面用Redisson来实现分布式锁。

首先引入依赖包:

       <dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>

配置类:

@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
// 单机模式
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}

接下来用redisson重写上面的减库存操作:

 @Resource
private Redisson redisson; @RequestMapping("/reduceStock")
public String reduceStock() {
String lockKey = "stockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
// 加锁,锁续命
redissonLock.lock();
// 从redis中获取库存数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 减库存
int restStock = stock - 1;
// 剩余库存再重新设置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣减成功,剩余库存:{}", restStock);
} else {
logger.info("库存不足,扣减失败。");
}
} finally {
redissonLock.unlock();
}
return "success";
}

其实就是三个步骤:获取锁,加锁,释放锁。

先简单看下Redisson的实现原理:

这里先说一下Redis很多操作使用Lua脚本来实现原子性操作,关于Lua语法,可以去网上找下相关教程。

使用Lua脚本的好处有:

1.减少网络开销,多个命令可以使用一次请求完成;

2.实现了原子性,Redis会把Lua脚本作为一个整体去执行;

3.实现事务,Redis自带的事务功能有限,而Lua脚本实现了事务的常规操作,而且还支持回滚。

但是Lua实际上不会使用很多,如果Lua脚本执行时间过长,因为Redis是单线程,因此会导致堵塞。

最后,说下Redisson分布式锁的代码实现,

找到上面的redissonLock.lock();

lock方法点进去,一直点到RedissonLock类里面的lockInterruptibly方法:

    @Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取线程id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
} RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future); try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
} // waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

重点看下tryAcquire方法,把线程id作为一个参数传递进来,在这个方法里面,找到tryLockInnerAsync方法点进去,

  <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

这里就是一堆Lua脚本,先看第一个if命令,先去判断 KEYS[1](就是对应的锁key的名字),如果不存在,在hashmap里,设置一个属性为线程id,值为1,再把map的过期时间设置为internalLockLeaseTime,这个值默认是30秒,



上面的操作对应的命令是:

hset keyname id:thread 1
pexpire keyname 30

然后返回nil,相当于null,那程序return了。

另外,Redisson还支持重入锁,那第二个if就是执行重入锁的操作,会判断锁是否存在,并且传入的线程id是否是当前线程的id,若果是,支持重复加锁进行自增操作;

如果是其他线程调用lock方法,上面两个if判断不会走,会返回锁剩余过期时间。

接着返回到tryAcquireAsync方法里面往下看:

实际上是加了一个监听器,在监听器里面有个很重要的方法scheduleExpirationRenewal,一看这个名字就能大概猜出是什么功能,

里面有个定时任务的轮询,

 private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
} Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 判断传递进来的线程id是否是我们之前主线程设置的id,如果是,则增加续命,增加30秒。
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
} if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}

接着推迟10秒钟(internalLockLeaseTime / 3),再执行续命操作逻辑。

到最后,再回到lockInterruptibly方法,

如果ttl 为null,说明加锁成功了,就返回null,那如果其他线程的话,就会返回剩余过期时间,那么就会进入到while死循环里,一直尝试加锁,调用tryAcquire方法,在琐失效以后,再会尝试获取加锁。

到此为止,分析完毕。

Redis分布式锁的正确使用与实现原理的更多相关文章

  1. Redis全方位详解--数据类型使用场景和redis分布式锁的正确姿势

    一.Redis数据类型 1.string string是Redis的最基本数据类型,一个key对应一个value,每个value最大可存储512M.string一半用来存图片或者序列化的数据. 2.h ...

  2. Redis分布式锁的正确实现方式

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介 ...

  3. 【分布式缓存系列】集群环境下Redis分布式锁的正确姿势

    一.前言 在上一篇文章中,已经介绍了基于Redis实现分布式锁的正确姿势,但是上篇文章存在一定的缺陷——它加锁只作用在一个Redis节点上,如果通过sentinel保证高可用,如果master节点由于 ...

  4. Redis(十三):Redis分布式锁的正确实现方式

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介 ...

  5. Redis分布式锁的正确实现方式(Java版)

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介 ...

  6. Redis 分布式锁的正确实现方式(转)

    _ 前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各 ...

  7. 【转】Redis 分布式锁的正确实现方式( Java 版 )

    链接:wudashan.cn/2017/10/23/Redis-Distributed-Lock-Implement/ 前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布 ...

  8. Redis 分布式锁的正确打开方式

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介 ...

  9. Redis 分布式锁的正确实现方式(Java版)[转]

    本文来源: https://www.cnblogs.com/linjiqin/p/8003838.html 前言 分布式锁一般有三种实现方式: 数据库乐观锁: 基于Redis的分布式锁: 基于ZooK ...

  10. Redis分布式锁【正确实现方式】

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介 ...

随机推荐

  1. SQLite如何测试

    原文 http://www.sqlite.org/testing.html 目录 1. 介绍 1.1. 执行总结 2. 测试套件 3.异常测试 3.1 内存溢出测试 3.2 I/O错误测试 3.3 c ...

  2. ServletConfig接口默认是哪里实现的?

    问题:Servlet接口默认是哪里实现的? 答:GenericServlet 1.结构 2.ServletConfig.GenericServlet.HttpServlet的关系如下: public ...

  3. windows下php,redis配置

    在windows环境下搭建redis是一般是为了测试,官方redis并没有给windows版redis但是微软有. windows版下载地址:https://github.com/MSOpenTech ...

  4. swift 关于闭包和函数

    调用函数,有闭包参数时: 函数的实现中:闭包为参数时,有参数返回值类型: 调用闭包时,传入参数 调用函数时:闭包为参数,是闭包的实现,当闭包为最后一个参数时,可写在参数括号外面 即===>函数在 ...

  5. C语言实现的OOP

    我倒不是为了OOP而OOP,实在是OOP的一些特征,例如封装,多态其实是软件工程思想,这些思想不分语言,遵循了这些思想可以使得程序更有弹性,更易修改和维护,避免僵化,脆弱 shape.h 该文件定义的 ...

  6. UESTC_秋实大哥下棋 2015 UESTC Training for Data Structures&lt;Problem I&gt;

    I - 秋实大哥下棋 Time Limit: 3000/1000MS (Java/Others)     Memory Limit: 65535/65535KB (Java/Others) Submi ...

  7. Java Annotation 应用 -- 导出Excel表格

    相关知识链接: Introspector(内省) POI 1.声明注解 package com.ciic.component.excel; import java.lang.annotation.El ...

  8. IDEA发布应用时发布到lib下面的包不全

    IDEA发布应用时发布到lib下面的包不全,Tomcate启动时就报:At least one JAR was scanned for TLDs yet contained no TLDs. Enab ...

  9. thinkphp相关

    thinkphp相关1.thinkphp调试sql方法:echo M("table_name")->getLastSql(); 2. 条件查询设置多个条件参数的写法:(1). ...

  10. koa2学习笔记01 - 创建项目 —— koa生成器一键生成koa项目

    前言 从17年开始尝试学习搭建个人网站开始,就开始学习摸索node了,至今差不多快两年了. 说起来现在都9102年了,所以最近打算整体设计重构一下网站,索性node后台也重写一遍. 重温一下node, ...