手写redis实现分布式锁详细教程,满足可续锁、可重入等分布式锁条件

news/2024/9/18 21:30:58 标签: redis, 分布式, 数据库

前言

本文将讨论的做一个高并发场景下避不开的话题,即redis分布式锁。比如在淘宝 的秒杀场景、热点新闻和热搜排行榜等。可见分布式锁是一个程序员面向高级的一门必修课,下面请跟着本篇文章好好学习。

redis_2">redis分布式锁有哪些面试题

1.Redis做分布式的时候需要注意什么问题?
2.你们公司自己实现的分布式锁是否用的setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
3.如果Redis是单点部署的,会带来什么问题?准备怎么解决单点问题呢?
Redis集群模式下,比如主从模式下,CAP方面有没有什么问题?

1.分布式锁是什么?

1.1 锁的种类介绍

锁的种类锁的概念
单机单机版同一个JVM虚拟机内,synchronized或者lock接口
分布式分布式多个不同的java虚拟机,单机的线程锁机制不再起作用了,资源类在不同的服务器之间共享了。

1.2 一个正经的分布式锁具有哪些刚需

在这里插入图片描述
独占性:任何时刻只能有且仅有一个线程持有
高可用:若redis集群环境下,不能因为某个节点挂了而出现获取锁或者释放锁失败。高并发请求下依旧能够保证良好使用。
防止死锁:杜绝死锁,必须有超时控制或者撤销操作,有个兜底终止跳出方案
不乱抢:防止张冠李戴,不能私下uolock别人的锁,只能自己加锁自己释放,自己约的锁自己要释放,可以设置过期时间,或者业务代码执行完毕以后删除对一个的锁。
可重入:同一个节点的同一个线程如果获得锁之后,他也可以再次获得这个锁。

redis_20">1.3 redis分布式

setnx key values

1.4 java实现分布式锁的案例

先来个乞丐版的分布式锁,并没有遵循上面五大原则。然后慢慢进行优化,乞丐版分布锁案例如下代码所示:

public String sale() {
    String resMessgae = "";
    String key = "luojiaRedisLocak";
    String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();

    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
    // 抢不到的线程继续重试
    if (!flag) {
        // 线程休眠20毫秒,进行递归重试
        try {
            TimeUnit.MILLISECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        sale();
    } else {
        try {
            // 1 抢锁成功,查询库存信息
            String result = stringRedisTemplate.opsForValue().get("inventory01");
            // 2 判断库存书否足够
            Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
            // 3 扣减库存,每次减少一个库存
            if (inventoryNum > 0) {
                stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
                resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
                log.info(resMessgae);
            } else {
                resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
                log.info(resMessgae);
            }
        } finally {
            stringRedisTemplate.delete(key);
        }
    }
    return resMessgae;
}

请看看以上代码有哪些问题?既没有删除过期时间 ,也没有判断redis获取的redis值进行删除,有可能删除错锁。如果进一步优化可以redis可以存一个流水号,业务代码执行完了以后,判断流水号是否相等,然后进行删除。可重入问题可以通过递归实现重试,但是依旧有问题:手工设置5000个线程来抢占锁,压测OK,但是容易导致StackOverflowError,在高并发不推荐使用,需要进一步完善。改进获取重试方法代码如下所示:

public String sale() {
    String resMessgae = "";
    String key = "luojiaRedisLocak";
    // 标记线程id,知道使哪个线程在执行
    String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();

    // 不用递归了,高并发容易出错,我们用自旋代替递归方法重试调用;也不用if,用while代替
    while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {
        // 线程休眠20毫秒,进行递归重试
        try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    }
    try {
        // 1 抢锁成功,查询库存信息
        String result = stringRedisTemplate.opsForValue().get("inventory01");
        // 2 判断库存书否足够
        Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
        // 3 扣减库存,每次减少一个库存
        if (inventoryNum > 0) {
            stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
            resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        } else {
            resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        }
    } finally {
        stringRedisTemplate.delete(key);
    }
    return resMessgae;
}

为了防止出现死锁,需要给锁设置过期时,关键点在于过期时间设置,以避免代码异常出现,而该线程持续占有该锁。其java代码如下所示:

while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
            // 线程休眠20毫秒,进行递归重试
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
        }

为了防止误删key,在执行完了业务代码以后需要删掉锁,在try-catch-finally 中添加如下删除锁的代码 :

 try {
     //和上一个代码块重复,省略掉了
    } finally {
        // v5.0 改进点,判断加锁与解锁是不同客户端,自己只能删除自己的锁,不误删别人的锁
        if (stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)) {
            stringRedisTemplate.delete(key);
        }
    }

在finally中删除key并不能保持它的原子性,当业务执行时间大于锁的过期时间是,其他线程可以抢占该锁,没法保证业务执行的完整性,所以以下代码借助Lua脚本进行优化,java代码如下所示:

public String sale() {
    String resMessgae = "";
    String key = "luojiaRedisLocak";
    String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();

    // 不用递归了,高并发容易出错,我们用自旋代替递归方法重试调用;也不用if,用while代替
    while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
        // 线程休眠20毫秒,进行递归重试
        try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    }

    try {
        // 1 抢锁成功,查询库存信息
        String result = stringRedisTemplate.opsForValue().get("inventory01");
        // 2 判断库存书否足够
        Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
        // 3 扣减库存,每次减少一个库存
        if (inventoryNum > 0) {
            stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
            resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        } else {
            resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        }
    } finally {
        // 改进点,修改为Lua脚本的Redis分布式锁调用,必须保证原子性,参考官网脚本案例
        String luaScript =
            "if redis.call('get',KEYS[1]) == ARGV[1] then " +
            "return redis.call('del',KEYS[1]) " +
            "else " +
            "return 0 " +
            "end";
        stringRedisTemplate.execute(new DefaultRedisScript(luaScript, Boolean.class), Arrays.asList(key), uuidValue);

    }
    return resMessgae;
}

上述代码,既然所已经被删除了,如何兼顾锁的可重入问题?这个问题下文会做出解释,希望读者耐心看完。可以在业务代码(同步代码块)前后添加lock和unlock实现加锁。而今重新进入时没必要重新获得一把锁。
可重入锁的概念:
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有synchronized修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可以一定程度避免死锁。下文1.5将展示详细的可重入分布式锁。

1.5 优化分布式

本次优化主要解决的问题有:宕机防止死锁、防止误删key、Lua保证原子性。设置 过期时间的同时,当业务执行时间大于过期时间,自动续锁功能等,分布式锁实现可重入需要使用的hset,记录进入次数。java代码如下所示:

新增续锁功能,java代码如下所示,其中实现步骤为:
步骤一:复原程序为初识无锁版本(即上述乞丐版)
步骤二:新建RedisDistributedLock类实现JUC里面的Lock接口
步骤三:满足JUC里面AQS对Lock锁的接口规范定义来进行实现落地代码
步骤四:结合设计模式开发属于自己的Redis分布式锁工具类
```java
package com.luojia.redislock.mylock;

import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自研的分布式锁,实现了Lock接口
 */
public class RedisDistributedLock implements Lock {

    private StringRedisTemplate stringRedisTemplate;

    private String lockName; // KEYS[1]
    private String uuidValule; // ARGV[1]
    private long expireTime; // ARGV[2]

    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValule = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
        this.expireTime = 50L;
    }

    @Override
    public void lock() {
        tryLock();
    }

    @Override
    public boolean tryLock() {
        try {
            tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (-1 == time) {
	        //lua脚本加锁
            String script =
                    "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
                        "redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                        "redis.call('expire', KEYS[1], ARGV[2]) " +
                        "return 1 " +
                    "else " +
                         "return 0 " +
                    "end";
            System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);

            // 加锁失败需要自旋一直获取锁
            while (!stringRedisTemplate.execute(
                    new DefaultRedisScript<>(script, Boolean.class),
                    Arrays.asList(lockName),
                    uuidValule,
                    String.valueOf(expireTime))) {
                // 休眠60毫秒再来重试
                try {TimeUnit.MILLISECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}
            }
            return true;
        }
        return false;
    }

    @Override
    public void unlock() {
        //lua脚本解锁
        String script = "" +
                "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
                "return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +
                "return redis.call('del', KEYS[1]) " +
                "else " +
                "return 0 " +
                "end";
        System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
        
        // LUA脚本由C语言编写,nil -> false; 0 -> false; 1 -> true;
        // 所以此处DefaultRedisScript构造函数返回值不能是Boolean,Boolean没有nil
        Long flag = stringRedisTemplate.execute(
                new DefaultRedisScript<>(script, Long.class),
                Arrays.asList(lockName),
                uuidValule);
        if (null == flag) {
            throw new RuntimeException("this lock does not exists.");
        }
    }

    // 下面两个暂时用不到,不用重写
    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

其中lua脚本加锁说明,如下图所示:
在这里插入图片描述
返回为1情况说明。
在这里插入图片描述
lua脚本解锁说明
在这里插入图片描述
完整的分布式锁java代码如下所示:

// v7.0 使用自研的lock/unlock+LUA脚本自研的Redis分布式
Lock redisDistributedLock = new RedisDistributedLock(stringRedisTemplate, "luojiaRedisLock");
public String sale() {
    String resMessgae = "";
    redisDistributedLock.lock();
    try {
        // 1 抢锁成功,查询库存信息
        String result = stringRedisTemplate.opsForValue().get("inventory01");
        // 2 判断库存书否足够
        Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
        // 3 扣减库存,每次减少一个库存
        if (inventoryNum > 0) {
            stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
            resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        } else {
            resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        }
    } finally {
        redisDistributedLock.unlock();
    }
    return resMessgae;
}

小总结
在这里插入图片描述
引入工厂模式
可重入测试,在InventoryService类新增可重入测试方法。

// v7.1 使用工厂类创建锁
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale() {
    String resMessgae = "";
    Lock redisLock = distributedLockFactory.getDistributedLock("REDIS", "luojiaRedisLock");
    redisLock.lock();
    try {
        // 1 抢锁成功,查询库存信息
        String result = stringRedisTemplate.opsForValue().get("inventory01");
        // 2 判断库存书否足够
        Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
        // 3 扣减库存,每次减少一个库存
        if (inventoryNum > 0) {
            stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
            resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
            testReEntry();
        } else {
            resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
            log.info(resMessgae);
        }
    } finally {
        redisLock.unlock();
    }
    return resMessgae;
}

private void testReEntry() {
    Lock redisLock = distributedLockFactory.getDistributedLock("REDIS", "luojiaRedisLock");
    redisLock.lock();
    try {
        log.info("=================测试可重入锁=================");
    } finally {
        redisLock.unlock();
    }
}

引入工厂模式

package com.luojia.redislock.mylock;

import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.locks.Lock;

@Component
public class DistributedLockFactory {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private String uuid;
    public DistributedLockFactory() {
        this.uuid = IdUtil.simpleUUID();
    }
    public Lock getDistributedLock(String lockType, String lockName) {
        if (lockType == null) {
            return null;
        }
        if ("REDIS".equalsIgnoreCase(lockType)) {
            return new RedisDistributedLock(stringRedisTemplate, lockName, uuid);
        } else if ("ZOOKEEPER".equalsIgnoreCase(lockType)) {
            // 后面存在就返回对应的分布式
        } else if ("MYSQL".equalsIgnoreCase(lockType)) {
            // 后面存在就返回对应的分布式
        }
        return null;
    }
}

在RedisDistributedLock中,修改构造方法:

public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
    this.stringRedisTemplate = stringRedisTemplate;
    this.lockName = lockName;
    this.uuidValule = uuid + ":" + Thread.currentThread().getId();
    this.expireTime = 50L;
}

锁的自动续费功能
确保RedisLock过期时间大于业务执行时间的问题,以便确保时间到了,业务没有执行完需要自动续期,对tryLock进行改正。
自动续锁的Lua脚本:

// 自动续期的LUA脚本
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
    return redis.call('expire', KEYS[1], ARGV[2])
else
    return 0
end
```java
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    if (-1 == time) {
        String script =
            "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
            "redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
            "redis.call('expire', KEYS[1], ARGV[2]) " +
            "return 1 " +
            "else " +
            "return 0 " +
            "end";
        System.out.println("lock() lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);

        // 加锁失败需要自旋一直获取锁
        while (!stringRedisTemplate.execute(
            new DefaultRedisScript<>(script, Boolean.class),
            Arrays.asList(lockName),
            uuidValule,
            String.valueOf(expireTime))) {
            // 休眠60毫秒再来重试
            try {TimeUnit.MILLISECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}
        }
        // 新建一个后台扫描程序,来检查Key目前的ttl,是否到我们规定的剩余时间来实现锁续期
        resetExpire();
        return true;
    }
    return false;
}

// 自动续期
private void resetExpire() {
    String script =
        "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
        "return redis.call('expire', KEYS[1], ARGV[2]) " +
        "else " +
        "return 0 " +
        "end";
    new Timer().schedule(new TimerTask() {
        @Override
        public void run() {
            if (stringRedisTemplate.execute(
                new DefaultRedisScript<>(script, Boolean.class),
                Arrays.asList(lockName),
                uuidValule,
                String.valueOf(expireTime))) {
                // 续期成功,继续监听
                System.out.println("resetExpire() lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
                resetExpire();
            }
        }
    }, (this.expireTime * 1000 / 3));
}

总结

1.synchronized单机版OK;
2.Nginx分布式微服务,轮询多台服务器,单机锁不行;
3.取消单机锁,上redis分布式锁setnx,中小企业使用没问题;
4.​ 只是加锁了,没有释放锁,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁
5.​ 如果服务宕机,部署了微服务代码层面根本就没有走到finally这块,没办法保证解锁,这个Key没有被删除,需要对锁设置过期时间 -
6.​ 为redis分布式锁key增加过期时间,还必须要保证setnx+过期时间在同一行,保证原子性
7.​ 程序由于执行超过锁的过期时间,所以在finally中必须规定只能自己删除自己的锁,不能把别人的锁删除了,防止张冠李戴
8.将Lock、unlock变成LUA脚本保证原子性;
9.保证锁的可重入性,hset替代setnx+Lock变成LUA脚本,保障可重入性;
10.锁的自动续期 。


http://www.niftyadmin.cn/n/5664555.html

相关文章

Linux权限理解【Shell的理解】【linux权限的概念、管理、切换】【粘滞位理解】

目录 Linux权限理解1.Xshell命令以及运行原理2.linux权限的学习2.1linux权限的切换2.2linux权限的概念2.3linux权限管理2.3.1linux中文件访问者的分类2.3.2文件类型和访问权限(文件属性)2.3.2.1文件类型2.3.2.2文件权限拓展—文件的起始权限 2.3.3文件权限管理2.3.4文件权限的应…

【我的 PWN 学习手札】Largebin Attack(< glibc-2.30)

目录 前言 一、Largebin 的组织结构 &#xff08;1&#xff09;largebin 里放了单个 chunk &#xff08;2&#xff09;largebin 里放了一组同样大小的 chunk &#xff08;3&#xff09;largebin 里放了多组不同大小的 chunk &#xff08;4&#xff09;重点关注和理解 …

人工智能相关SCI期刊笔记—影响因子、中科院/JCR 分区 、收录方向

人工智能相关SCI期刊网址&#x1f680; 导师给我发了一个总结了人工智能相关内容相关的SCI集合&#xff0c;影响力从大到小&#xff0c;公众号叫IEEE论文那些事&#xff0c;非常感谢&#xff0c;我自己也是简单整理了一下&#xff0c;然后打算以博客的形式记录一下&#xff0c;…

启信产业大脑助力市北高新园区数字化升级,开启智慧园区新篇章

在数字化时代的浪潮中&#xff0c;产业园区的数字化转型已成为推动经济发展、提升管理效率和增强竞争力的关键因素。上海市市北高新技术服务业园区&#xff08;以下简称“市北高新园区”&#xff09;携手合合信息旗下启信产业大脑™推出“一企一画像”创新平台&#xff0c;在国…

ubuntu24系统普通用户免密切换到root用户

普通用户登录系统后需要切换到root用户&#xff0c;这边需要密码&#xff0c;现在不想让用户知道密码是多少。 sudo: 1 incorrect password attempt $ su - Password: root-security-cm5:~#开始配置普通用户免密切换到root用户&#xff0c;编辑配置文件 /etc/sudoers 最后增加…

Redis——常用数据类型hash

目录 hash常用命令hsethgethdelhkeyshvalshgetallhmgethlenhsetnxhincrbyhdecrby 哈希的编码方式哈希的应用 hash 常用命令 hset HSET key field value [field value ...]//时间复杂度O(1) //返回值&#xff1a;设置成功的键值对的个数hget HGET key field//hdel HDEL key…

[yotroy.cool] MGT 388 - Finance for Engineers - notes 笔记

个人博客https://www.yotroy.cool/,感谢关注~ 图片资源可能显示不全,请前往博客查看哦! ============================================================ Lecture 1 What is Accounting? The process of identifying, measuring and communicating economic informati…

数据结构与算法——Java实现 5.链表

目录 一、定义 链表的分类 二、性能 随机访问 插入或删除 三、单向链表 链表内部节点类 ① 增加&#xff08;插入&#xff09; 1.头插法 2.寻找最后一个节点位置 3.尾插法 4.根据索引位置插入 ② 删除 1.删除首个结点 2.获取链表的指定索引节点 3.删除链表指定索引元素节点 4.删…