目录

很有名 の weblog

X

Redis高并发分布式锁

image.png当我们通过redis实现分布式锁的时候,写的时候总是会有很多的细小bug没有考虑到,那么怎么样才是一个真正有效的redis分布式锁

1. 通过redisTemplate实现

这里模拟一个电商项目中减库存的操作

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Controller
@RequestMapping("/DistributedLockDemoController")
public class DistributedLockDemoController {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 比较完善的分布式锁
     */
    @RequestMapping("/distributedLock")
    public void distributedLock() {
        //redis锁的键名
        final String lock = "REDIS_LOCK";
        //当前调用redis 的客户端id,是为了防止其他线程执行到这的时候,删除了不属于它的锁
        String client = UUID.randomUUID().toString();
        try{
            //获取分布式锁并设置失效时间
            Boolean lockFlag = redisTemplate.opsForValue().setIfAbsent(lock, client, 10, TimeUnit.SECONDS);
            if (!lockFlag){
                return;
            }
            //业务操作
            int stock = (int) redisTemplate.opsForValue().get("stock");
            if (stock>0){
                int realStock = stock - 1;
                redisTemplate.opsForValue().set("stock",stock);
                System.out.println("库存扣减成功,剩余库存:" + realStock);
            }else {
                System.out.println("库存扣减失败");
            }
        }finally {
            //释放redis锁
            if (client.equals(redisTemplate.opsForValue().get(lock))){
                redisTemplate.delete(lock);
            }
        }
    }

}

以上的代码实现的redis分布式锁其他没什么大问题,主要考虑的问题,或者说它解决了哪些分布式,集群环境可能产生的问题。

  • 服务器a的调用 删除了服务器b的调用产生的分布式锁
    这里是通过对每一个线程的调用添加UUID进行识别解决的
    String client = UUID.randomUUID().toString();
    

并且在最后finally代码块中通过判断是否当前进程产生的UUID,才能删除创建的锁

  • 服务器a调用到这个方法时,已经创建了分布式锁,但是服务器a宕机。这里是通过添加分布式锁的失效时间解决的
    Boolean lockFlag = redisTemplate.opsForValue().setIfAbsent(lock, client, 10, TimeUnit.SECONDS);
    

但是,即使是这样的分布式锁仍然是不完善的,它没有解决的问题是,当服务器a的线程1调用该方法超时,导致分布式锁自动失效,恰好此时库存已经为0,此时其他服务器b中的线程2继续创建分布式锁,执行业务代码,导致超卖。

此时我们可以通过框架redisson实现更加完善的Redis分布式锁

2. 通过redisson实现分布式锁

redisson实现的分布式锁原理是,在上述场景中,当服务器a的线程1调用该方法快要超时的时候,给这个分布式锁添加续命时间,直到线程1完成代码执行过程,此时再删除锁。而内部实现是通过lua脚本进行实现的,保证了操作的原子性。

Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:

  1. 减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器
    上完成。使用脚本,减少了网络往返时延。这点跟管道类似。
  2. 原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过
    redis的批量操作命令(类似mset)是原子的。
  3. 替代redis的事务功能:redis自带的事务功能很鸡肋,报错不支持回滚,而redis的lua脚本几乎实现了
    常规的事务功能,支持报错回滚操作,官方推荐如果要使用redis的事务功能可以用redis lua替代。

2.1 redisson的使用

可以和redisTemplate一起使用

maven配置

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.13.6</version>
        </dependency>

在springboot启动时注入容器中,这里我是用了集群环境进行测试

    @Value("${spring.redis.cluster.nodes}")
    private String redisNode;

    @Bean
    public Redisson getRedisson(){
        Config config = new Config();
        ClusterServersConfig clusterServersConfig = config.useClusterServers();

        String[] split = redisNode.split(",");
        for (String node : split) {
            clusterServersConfig.addNodeAddress("redis://" + node);
        }
        return (Redisson) Redisson.create(config);
    }

实际场景应用中只需要在加锁的时候调用lock()方法,解锁的时候调用unlock()方法,相比之前的大段代码,可谓简单不少,而且还不用担心分布式锁实现的是否有问题

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;


@Controller
@RequestMapping("/RedissonDemoController")
public class RedissonDemoController {

    @Autowired
    private Redisson redisson;

    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/distributedLock")
    public void distributedLock() {
        final String lock = "REDIS_LOCK";
        RLock redissonLock = redisson.getLock(lock);
        try{
            //加锁,实现锁续命功能
            redissonLock.lock();
            int stock = (int) redisTemplate.opsForValue().get("stock");
            if (stock > 0){
                int realStock = stock - 1;
                redisTemplate.opsForValue().set("stock",stock);
                System.out.println("库存扣减成功,剩余库存:" + realStock);
            }else {
                System.out.println("库存扣减失败");
            }
        }finally {
            redissonLock.unlock();
        }
    }

}

2.2 redisson原理

image.png


标题:Redis高并发分布式锁
作者:MingGH
地址:https://runnable.run/articles/2021/03/25/1616667262150.html