文章目录1 为什么要使用分布式锁2 基于RedisTemplate实现分布式锁(推荐)21 添加相关maven依赖22 注入分布式锁工具类23 相关配置24 实现分布式锁25 功能测试3 基于Jedis客户端,实现分布式锁31 添加依赖32 创建Jedis实例对象33 相关配置34 实现分布式锁35 功能测试4 基于Lettuce客户端,实现分布式锁41 添加依赖42 创建LettuceConnection对象43 相关配置44 实现分布式锁45 功能测试5 基于Redisson客户端,实现分布式锁(推荐)51 添加依赖52 相关配置53 实现分布式锁

本系列文章,笔者准备对互联网缓存利器Redis的使用,做一下简单的总结,内容大概如下:

博文内容资源链接Linux环境下搭建Redis基础运行环境sblogcsdnnetsmilehappinessarticledetails107298145互联网缓存利器Redis的使用详解(基础篇)sblogcsdnnetsmilehappinessarticledetails107592368Redis基础命令使用Api详解sblogcsdnnetsmilehappinessarticledetails107593218Redis编程客户端Jedis、Lettuce和Redisson的基础使用sblogcsdnnetsmilehappinessarticledetails107301988互联网缓存利器Redis的使用详解(进阶篇)sblogcsdnnetsmilehappinessarticledetails107592336如何基于Redis实现分布式锁sblogcsdnnetsmilehappinessarticledetails107592896基于Redis的主从复制、哨兵模式以及集群的使用,史上最详细的教程来啦~sblogcsdnnetsmilehappinessarticledetails107433525Redis相关的面试题总结sblogcsdnnetsmilehappinessarticledetails1075926861 为什么要使用分布式锁

使用分布式锁的目的,为了保证一个方法在高并发情况下,同一时间只能被同一个线程执行,即保证同一时间只有一个客户端可以对共享资源进行操作。

在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

2 基于RedisTemplate实现分布式锁(推荐)

使用Spring提供的redisTemplate,操作redis还是比较简单的,使用RedisTemplate如何实现分布式锁呢?

21 添加相关maven依赖

这里,建议使用210以上的版本,因为Redis在210以上版本,已经实现了原子性操作,在设置key、value时,直接可以设置超时时间

 ! springdataredis
dependency
groupId orgspringframeorkdata groupId
artifactId springdataredis artifactId
version 210RELEASE version
dependency
dependency
groupId iolettuce groupId
artifactId lettucecore artifactId
version 522RELEASE version
dependency

22 注入分布式锁工具类

 使用RedisTemplate实现分布式锁
private final RedisLockHelper redisLockHelper;
public GoodsServiceImpl(RedisLockHelper redisLockHelper) {
thisredisLockHelper = redisLockHelper;

23 相关配置

【redis配置】

#ip地址
redishostName=127001
redisport=6379
#如果有密码
redispassord=123456
redisdatabase=0

【spring配置文件】

 ?xml version="10" encoding="UTF8"?
beans xmlns="springframeorkorgschemabeans"
xmlnsxsi="3org01XMLSchemainstance"
xmlnscontext="springframeorkorgschemacontext"
xsischemaLocation="springframeorkorgschemabeans
sspringframeorkorgschemabeansspringbeansxsd
springframeorkorgschemacontext
sspringframeorkorgschemacontextspringcontextxsd"
contextcomponentscan basepackage="cnsmilehappinessdistributed"
!读取redisproperties属性配置文件
contextpropertyplaceholder location="classpathredisproperties" ignoreunresolvable="true"
! 配置redis连接工厂
bean id="lettuceConnectionFactory"
!构造方法初始化
constructorarg index="0" ref="redisStandaloneConfiguration"
bean
! lettuce连接配置信息
bean id="redisStandaloneConfiguration"
property name="hostName" value="${redishostName}"
property name="port" value="${redisport}"
property name="database" value="${redisdatabase}"
!配置redis密码
property name="passord" ref="redisPassord"
bean
! lettuce连接密码信息
bean id="redisPassord"
!构造方法初始化
constructorarg index="0" value="${redispassord}"
bean
!手动设置 key 与 value的序列化方式
bean id="keySerializer"
bean id="valueSerializer"
!redis的操作模板
bean id="redisTemplate"
property name="connectionFactory" ref="lettuceConnectionFactory"
property name="keySerializer" ref="keySerializer"
property name="valueSerializer" ref="valueSerializer"
property name="hashKeySerializer" ref="keySerializer"
property name="hashValueSerializer" ref="valueSerializer"
bean
beans

如果你是spring boot项目,参考网上配置即可,网上资源有很多。

下面,重点来啦,如何实现分布式锁?

24 实现分布式锁

【分布式锁工具类】

package cnsmilehappinessdistributedlockredistemplate;
import orgapachecommonslang3StringUtils;
import orgslf4jLogger;
import orgslf4jLoggerFactory;
import orgspringframeorkbeansfactoryannotationAutoired;
import orgspringframeorkcoreannotationOrder;
import orgspringframeorkdataredisconnectionRedisStringCommands;
import orgspringframeorkdatarediscoreRedisTemplate;
import orgspringframeorkdatarediscoretypesExpiration;
import orgspringframeorkdataredisserializerRedisSerializer;
import orgspringframeorkstereotypeComponent;
import orgspringframeorkutilAssert;
import javautilSet;
import javautilUUID;
import javautilconcurrentExecutors;
import javautilconcurrentScheduledExecutorService;
import javautilconcurrentTimeUnit;
* Redis中,基于redisTemplate的分布式锁
* 注解Order或者接口Ordered的作用是定义Spring IOC容器中Bean的执行顺序的优先级(默认是最低优先级,值越小优先级越高),而不是定义Bean的加载顺序
* p
* author smilehappiness
* Date 82 1353
Order(1)
Component
public class RedisLockHelper {
private static Logger logger = LoggerFactorygetLogger(RedisLockHelperclass);
* 设置分布式锁业务前缀
private static final String REDIS_LOCK_PREFIX = "redislock";
* 设置分布式锁超时(过期)时间,单位是秒
private static final Long LOCK_TIME_OUT = 60L;
* 获取分布式锁超时时间,单位是秒,如果指定时间内还未获取到锁,则不能进行业务处理
private static final Long ACQUIRE_LOCK_TIME_OUT = 15L;
Autoired
private RedisTemplate String, Object redisTemplate;
* p
* 创建RedisLockHelper对象的时候,监听redis节点key是否需要续期(不推荐使用)
* 这里进行redis分布式锁的续期,针对业务执行时,如果超过了锁超时时间的一半还没有释放锁,说明该业务方法比较耗时,进行自动续期。
* 防止业务未执行完,锁过期了导致自动释放锁,造成业务数据问题
* p
* 注意:通常情况下不需要考虑续期问题,如果业务方法确实执行的比较耗时,才考虑此种问题
* p
* param
* return
* Date 82 1718
public RedisLockHelper() {
分布式锁的自动续期,因为锁超时时间内,可能还没有执行完业务处理
启动一个后台任务,定时去检查redis的分布式锁是否需要续期(过期时间往后延一下)
ScheduledExecutorService scheduledExecutorService = ExecutorsneSingleThreadScheduledExecutor();
scheduledExecutorServicescheduleAtFixedRate(thischeckRedisExpire, 5, 10, TimeUnitSECONDS);
private void checkRedisExpire() {
Set String keys = redisTemplatekeys(REDIS_LOCK_PREFIX + "*");
keysforEach(key {
目前redis中锁剩下的过期时间
从redis中获取key对应的过期时间:如果该值有过期时间,就返回相应的过期时间,如果该值没有设置过期时间,就返回1,如果没有该值,就返回2
Long expireTime = redisTemplateopsForValue()getOperations()getExpire(key);
如果小于一半过期时间,因为还没有执行完,延长过期时间
if (expireTime = 1 expireTime = LOCK_TIME_OUT 2) {
redisTemplateexpire(key, LOCK_TIME_OUT, TimeUnitSECONDS);
Systemoutprintln("执行redisTemplate分布式锁【" + key + "】续期");
* p
* Redis老版本实现方案(21以下)
* p
* param lockName
* return javalangString
* Date 82 1540
public String getLockOld(String lockName) {
String redisLockKey = REDIS_LOCK_PREFIX + lockName;
String uniqueValue = UUIDrandomUUID()toString();
往后延acquireLockTimeOut秒(比如30秒)
Long endTime = SystemcurrentTimeMillis() + ACQUIRE_LOCK_TIME_OUT * 1000;
如果不超过指定的锁获取时间,有资格重复获取锁
hile (SystemcurrentTimeMillis() endTime) {
注意:这里不是原子操作(不在一个步骤中,是分了两步),有可能会有小问题(如果设置了锁还没来得及设置过期时间,redis服务挂了,可能导致死锁问题)
解决方案:下边判断过期时间,如果没有设置超时时间,来避免死锁问题
if (redisTemplate != null !redisTemplatehasKey(redisLockKey) redisTemplateopsForValue()setIfAbsent(redisLockKey, uniqueValue)) {
redisTemplateexpire(redisLockKey, LOCK_TIME_OUT, TimeUnitSECONDS);
return uniqueValue;
* 从redis中获取key对应的过期时间:
* 如果该值有过期时间,就返回相应的过期时间
* 如果该值没有设置过期时间,就返回1
* 如果没有该值,就返回2
Long expireTime = redisTemplateopsForValue()getOperations()getExpire(redisLockKey);
if (expireTime != null expireTime == 1) {
设置过期时间
redisTemplateexpire(redisLockKey, LOCK_TIME_OUT, TimeUnitSECONDS);
try {
立刻马上去循环再获取锁,其实不是很好也没有什么意义,最好是稍等片刻再去重试获取锁(这里可以根据实际场景去设计,是否需要重试获取锁,如果不需要,就不用设置hile循环)
Threadsleep(100);
} catch (InterruptedException e) {
eprintStackTrace();
return null;
* p
* Redis老版本实现方案(21以下),老版本未提供原子操作,自己实现原子操作
* 注:在21以上版本,已经实现了原子性操作,无需自己实现
* p
* param lockName
* return javalangString
* Date 82 1630
public String getLockOldTo(String lockName) {
try {
String redisLockKey = REDIS_LOCK_PREFIX + lockName;
String uniqueValue = UUIDrandomUUID()toString();
往后延acquireLockTimeOut秒(比如30秒)
Long endTime = SystemcurrentTimeMillis() + ACQUIRE_LOCK_TIME_OUT * 1000;
如果不超过指定的锁获取时间,有资格获取锁
hile (SystemcurrentTimeMillis() endTime) {
上面方案中,虽然可以解决可能的死锁问题,但是,既然本质上是因为不是原子操作导致的问题,那么,能不能改成原子操作呢?
if (redisTemplate != null !redisTemplatehasKey(redisLockKey) thissetIfAbsent(redisLockKey, uniqueValue, LOCK_TIME_OUT, TimeUnitSECONDS)) {
return uniqueValue;
* 从redis中获取key对应的过期时间:
* 如果该值有过期时间,就返回相应的过期时间,如果该值没有设置过期时间,就返回1
* 如果没有该值,就返回2
* 这里由于上面可以保证原子性操作了,所以这里可以不用再判断是否有设置过过期时间
Long expireTime = redisTemplateopsForValue()getOperations()getExpire(redisLockKey);
if (expireTime != null expireTime == 1) {
设置过期时间
redisTemplateexpire(redisLockKey, LOCK_TIME_OUT, TimeUnitSECONDS);
try {
立刻马上去循环再获取锁,其实不是很好也没有什么意义,最好是稍等片刻再去重试获取锁(这里可以根据实际场景去设计,是否需要重试获取锁,如果不需要,就不用设置hile循环)
Threadsleep(100);
} catch (InterruptedException e) {
eprintStackTrace();
} catch (Exception e) {
eprintStackTrace();
return null;
* p
* Set {code key} to hold the string {code value} and expiration {code timeout} if {code key} is absent
* p
* param key
* param value
* param timeout
* param unit
* return javalangBoolean
* Date 313 1103
private Boolean setIfAbsent(Object key, Object value, long timeout, TimeUnit unit) {
byte[] raKey = raKey(key);
byte[] raValue = raValue(value);
Expiration expiration = Expirationfrom(timeout, unit);
return redisTemplateexecute(connection connectionset(raKey, raValue, expiration, RedisStringCommandsSetOptionifAbsent()), true);
byte[] raKey(Object key) {
AssertnotNull(key, "non null key required");
return thiskeySerializer() == null key instanceof byte[] ? (byte[]) ((byte[]) key) thiskeySerializer()serialize(key);
byte[] raValue(Object value) {
return thisvalueSerializer() == null value instanceof byte[] ? (byte[]) ((byte[]) value) thisvalueSerializer()serialize(value);
RedisSerializer keySerializer() {
return thisredisTemplategetKeySerializer();
RedisSerializer valueSerializer() {
return thisredisTemplategetValueSerializer();
* p
* Redis在210以上版本,已经实现了原子性操作,无需自己实现
* p
* param lockName
* return javalangString
* Date 82 1730
public String getLock(String lockName) {
try {
String redisLockKey = REDIS_LOCK_PREFIX + lockName;
String uniqueValue = UUIDrandomUUID()toString();
往后延acquireLockTimeOut秒(比如30秒)
Long endTime = SystemcurrentTimeMillis() + ACQUIRE_LOCK_TIME_OUT * 1000;
如果不超过指定的锁获取时间,有资格获取锁
hile (SystemcurrentTimeMillis() endTime) {
上面方案中,虽然可以解决可能的死锁问题,但是,既然本质上是因为不是原子操作导致的问题,那么,能不能改成原子操作呢?
if (redisTemplate != null !redisTemplatehasKey(redisLockKey) redisTemplateopsForValue()setIfAbsent(redisLockKey, uniqueValue, LOCK_TIME_OUT, TimeUnitSECONDS)) {
return uniqueValue;
* 从redis中获取key对应的过期时间:
* 如果该值有过期时间,就返回相应的过期时间,如果该值没有设置过期时间,就返回1
* 如果没有该值,就返回2
* 这里由于上面可以保证原子性操作了,所以这里可以不用再判断是否有设置过过期时间
Long expireTime = redisTemplateopsForValue()getOperations()getExpire(redisLockKey);
if (expireTime != null expireTime == 1) {
设置过期时间
redisTemplateexpire(redisLockKey, LOCK_TIME_OUT, TimeUnitSECONDS);
try {
立刻马上去循环再获取锁,其实不是很好也没有什么意义,最好是稍等片刻再去重试获取锁(这里可以根据实际场景去设计,是否需要重试获取锁,如果不需要,就不用设置hile循环)
Threadsleep(100);
} catch (InterruptedException e) {
eprintStackTrace();
} catch (Exception e) {
eprintStackTrace();
return null;
* p
* 释放分布式锁,保证只能释放自己的锁(自己的锁自己解,不要把别人的锁给解了)
* p
* param lockName
* param value
* return javalangBoolean
* Date 82 1657
public Boolean releaseLock(String lockName, String value) {
redis锁的key
String redisLockKey = REDIS_LOCK_PREFIX + lockName;
保证只能释放自己的锁(自己的锁自己解,不要把别人的锁给解了)
Object object = redisTemplateopsForValue()get(redisLockKey);
if (object != null StringUtilsequals(value, StringvalueOf(object))) {
return redisTemplatedelete(redisLockKey);
return false;

25 功能测试

**
* p
* 分布式锁测试
* p
* param
* return void
* Date 82 56
private void testDistributedLock() {
String lockName = "lockName";
String lockUniqueValue = null;
try {
获取分布式锁,然后下面的业务代码就会按顺序排队执行
lockUniqueValue = redisLockHelpergetLock(lockName);
拿到redis分布式锁
if (lockUniqueValue != null) {
TODO 执行业务代码
} catch (Exception e) {
thro ne RuntimeException(e);
} finally {
释放分布式锁
redisLockHelperreleaseLock(lockName, lockUniqueValue);

以上,就基于RedisTemplate模板,实现了分布式锁。有需要的童鞋们,可以实际操作一下,该方案适用于多线程且高并发的场景。

3 基于Jedis客户端,实现分布式锁
31 添加依赖

 ! springdataredis
dependency
groupId orgspringframeorkdata groupId
artifactId springdataredis artifactId
version 2110RELEASE version
dependency
! jedis
dependency
groupId redisclients groupId
artifactId jedis artifactId
version 330 version
dependency

32 创建Jedis实例对象

package cnsmilehappinessdistributedlockjedisutil;
import redisclientsjedisJedisPool;
import redisclientsjedisJedisPoolConfig;
* p
* 获取Jedis实例对象
* p
* author smilehappiness
* Date 82 1158
public class JedisPoolInstance {
* redis服务器的ip地址
private static final String HOST = "localhost";
* redis服务器的端口
private static final int PORT = 6379;
* 连接redis服务器的密码
private static final String PASSWORD = "123456";
private static final int TIMEOUT = 10000;
* redis连接池对象,单例的连接池对象
private static JedisPool jedisPool = null;
私有构造方法
private JedisPoolInstance() {
* 获取线程池实例对象
* return
public static JedisPool getJedisPoolInstance() {
双重检测锁
if (null == jedisPool) {
synchronized (JedisPoolInstanceclass) {
if (null == jedisPool) {
对连接池的参数进行配置,根据项目的实际情况配置这些参数
JedisPoolConfig poolConfig = ne JedisPoolConfig();
最大连接数
poolConfigsetMaxTotal(1000);
最大空闲连接数
poolConfigsetMaxIdle(32);
获取连接时的最大等待毫秒数
poolConfigsetMaxWaitMillis(90 * 1000);
在获取连接的时候检查连接有效性
poolConfigsetTestOnBorro(true);
jedisPool = ne JedisPool(poolConfig, HOST, PORT, TIMEOUT, PASSWORD);
return jedisPool;

33 相关配置

【redis配置】

#ip地址
redishostName=127001
redisport=6379
#如果有密码
redispassord=123456
redisdatabase=0

【Spring配置】

 ?xml version="10" encoding="UTF8"?
beans xmlns="springframeorkorgschemabeans"
xmlnsxsi="3org01XMLSchemainstance"
xmlnscontext="springframeorkorgschemacontext"
xsischemaLocation="springframeorkorgschemabeans
sspringframeorkorgschemabeansspringbeansxsd
springframeorkorgschemacontext
sspringframeorkorgschemacontextspringcontextxsd"
!读取redisproperties属性配置文件
contextpropertyplaceholder location="classpathredisproperties" ignoreunresolvable="true"
! 配置redis连接工厂
bean id="jedisConnectionFactory"
!构造方法初始化
constructorarg index="0" ref="redisStandaloneConfiguration"
bean
! jedis连接配置信息
bean id="redisStandaloneConfiguration"
property name="hostName" value="${redishostName}"
property name="port" value="${redisport}"
property name="database" value="${redisdatabase}"
!配置redis密码
property name="passord" ref="redisPassord"
bean
! jedis连接密码信息
bean id="redisPassord"
!构造方法初始化
constructorarg index="0" value="${redispassord}"
bean
!手动设置 key 与 value的序列化方式
bean id="keySerializer"
bean id="valueSerializer"
!redis的操作模板
bean id="redisTemplate"
property name="connectionFactory" ref="jedisConnectionFactory"
property name="keySerializer" ref="keySerializer"
property name="valueSerializer" ref="valueSerializer"
property name="hashKeySerializer" ref="keySerializer"
property name="hashValueSerializer" ref="valueSerializer"
bean
beans

34 实现分布式锁

package cnsmilehappinessdistributedlockjedis;
import cnsmilehappinessdistributedlockjedisutilJedisPoolInstance;
import redisclientsjedisJedis;
import redisclientsjedisJedisPool;
import javautilSet;
import javautilUUID;
import javautilconcurrentExecutors;
import javautilconcurrentScheduledExecutorService;
import javautilconcurrentTimeUnit;
* p
* 基于Jedis客户端,实现分布式锁
* p
* author smilehappiness
* Date 82 15
public class JedisDistributeLock {
private static final String redisLockPrefix = "redislock";
* 设置锁超时时间,单位是毫秒
private static final Long LockTimeOut = 30000L;
* 静态代码块在类加载的时候执行,只会一次
static {
分布式锁的自动续期,因为锁超时时间内,可能还没有执行完业务处理
启动一个后台任务,定时去检查redis的分布式锁是否需要续期(过期时间往后延一下)
ScheduledExecutorService scheduledExecutorService = ExecutorsneSingleThreadScheduledExecutor();
scheduledExecutorServicescheduleAtFixedRate(() {
Systemoutprintln("执行Jedis分布式锁续期");
Jedis jedis = JedisPoolInstancegetJedisPoolInstance()getResource();
try {
Set String setKeys = jediskeys(redisLockPrefix + "*");
setKeysforEach((String key) {
目前redis中锁剩下的过期时间
key不存在的时候返回2
Long leftExpire = jedisttl(key);
if (leftExpire = 1 leftExpire = (LockTimeOut 10000) 1000) {
jedispexpire(key, LockTimeOut);
} finally {
if (jedis != null) {
jedisclose();
TimeUnitSECONDS);
* p
* 获取分布式锁
* p
* param lockName
* param acquireTimeOut 单位是毫秒
* param lockTimeOut 单位是毫秒
* return javalangString
* Date 82 1213
public String getRedisLock(String lockName, Long acquireTimeOut, Long lockTimeOut) {
String redisLockKey = redisLockPrefix + lockName;
String uniqueValue = UUIDrandomUUID()toString();
JedisPool jedisPool = JedisPoolInstancegetJedisPoolInstance();
Jedis jedis = jedisPoolgetResource();
Systemoutprintln("获取jedis连接池:" + ThreadcurrentThread()getName() + "" + jedisPool + ", " + jedisPoolgetNumActive() + ", " + jedisPoolgetNumIdle());
try {
往后延acquireTimeOut秒(比如3秒)
Long endTime = SystemcurrentTimeMillis() + acquireTimeOut;
如果不超过指定的锁获取时间,有资格获取锁
hile (SystemcurrentTimeMillis() endTime) {
设置key和设置key的过期时间
注意:这里不是原子操作(不在一个步骤中,是分了两步),有可能会有小问题(如果设置了锁还没来得及设置过期时间,redis服务挂了,可能导致死锁问题,下边双重判断过期时间,如果没有设置超时时间,来避免死锁问题)
if (jedissetnx(redisLockKey, uniqueValue) == 1) {
设置key成功,表示拿到锁
jedispexpire(redisLockKey, lockTimeOut);
return uniqueValue;
这里如果不做处理,可能产生死锁,因为上边不是原子操作
if (jedisttl(redisLockKey) == 1) {
设置过期时间
jedispexpire(redisLockKey, lockTimeOut);
try {
立刻马上去循环再获取锁,其实不是很好也没有什么意义,最好是稍等片刻再去重试获取锁
Threadsleep(100);
} catch (InterruptedException e) {
eprintStackTrace();
} finally {
if (jedis != null) {
jedisclose();
Systemoutprintln("关闭jedis连接池:" + ThreadcurrentThread()getName() + "" + jedisPool + ", " + jedisPoolgetNumActive() + ", " + jedisPoolgetNumIdle());
return null;
* p
* 释放redis锁
* p
* param lockName
* param uniqueValue
* return void
* Date 82 1212
public void releaseRedisLock(String lockName, String uniqueValue) {
redis锁的key
String redisLockKey = redisLockPrefix + lockName;
Jedis jedis = JedisPoolInstancegetJedisPoolInstance()getResource();
try {
保证只能释放自己的锁(自己的锁自己解,不要把别人的锁给解了)
if (jedisget(redisLockKey)equals(uniqueValue)) {
jedisdel(redisLockKey);
} finally {
if (jedis != null) {
jedisclose();

35 功能测试

**
* p
* 分布式锁测试
* p
* param
* return void
* Date 82 56
private void testDistributedLock() {
使用jedis客户端实现的redis锁
private JedisDistributeLock redisDistributeLock = ne JedisDistributeLock();
String lockName = "lockName";
String lockUniqueValue = null;
try {
获取分布式锁,然后下面的业务代码就会按顺序排队执行
lockUniqueValue = redisDistributeLockgetRedisLock(lockName, 3000L, 30000L);
拿到redis分布式锁
if (lockUniqueValue != null) {
TODO 执行业务代码
} catch (Exception e) {
thro ne RuntimeException(e);
} finally {
释放分布式锁
redisDistributeLockreleaseRedisLock(lockName, lockUniqueValue);

4 基于Lettuce客户端,实现分布式锁
41 添加依赖

 ! springdataredis
dependency
groupId orgspringframeorkdata groupId
artifactId springdataredis artifactId
version 2110RELEASE version
dependency
! lettucecore
dependency
groupId iolettuce groupId
artifactId lettucecore artifactId
version 522RELEASE version
dependency

42 创建LettuceConnection对象

package cnsmilehappinessdistributedlocklettuceutil;
import iolettucecoreRedisClient;
import iolettucecoreapiStatefulRedisConnection;
public class LettuceConnection {
private static final String REDIS_ADDRESS = "redis123456localhost63790";
private static StatefulRedisConnection String, String statefulRedisConnection = null;
private LettuceConnection() {
public static StatefulRedisConnection String, String getStatefulRedisConnection() {
双重检测锁
if (null == statefulRedisConnection) {
synchronized (LettuceConnectionclass) {
if (null == statefulRedisConnection) {
RedisClient redisClient = RedisClientcreate(REDIS_ADDRESS);
statefulRedisConnection = redisClientconnect();
return statefulRedisConnection;

43 相关配置

【redis配置】

#ip地址
redishostName=127001
redisport=6379
#如果有密码
redispassord=123456
redisdatabase=0

【Spring配置】

 ?xml version="10" encoding="UTF8"?
beans xmlns="springframeorkorgschemabeans"
xmlnsxsi="3org01XMLSchemainstance"
xmlnscontext="springframeorkorgschemacontext"
xsischemaLocation="springframeorkorgschemabeans
sspringframeorkorgschemabeansspringbeansxsd
springframeorkorgschemacontext
sspringframeorkorgschemacontextspringcontextxsd"
contextcomponentscan basepackage="cnsmilehappinessdistributed"
!读取redisproperties属性配置文件
contextpropertyplaceholder location="classpathredisproperties" ignoreunresolvable="true"
! 配置redis连接工厂
bean id="lettuceConnectionFactory"
!构造方法初始化
constructorarg index="0" ref="redisStandaloneConfiguration"
bean
! lettuce连接配置信息
bean id="redisStandaloneConfiguration"
property name="hostName" value="${redishostName}"
property name="port" value="${redisport}"
property name="database" value="${redisdatabase}"
!配置redis密码
property name="passord" ref="redisPassord"
bean
! lettuce连接密码信息
bean id="redisPassord"
!构造方法初始化
constructorarg index="0" value="${redispassord}"
bean
!手动设置 key 与 value的序列化方式
bean id="keySerializer"
bean id="valueSerializer"
!redis的操作模板
bean id="redisTemplate"
property name="connectionFactory" ref="lettuceConnectionFactory"
property name="keySerializer" ref="keySerializer"
property name="valueSerializer" ref="valueSerializer"
property name="hashKeySerializer" ref="keySerializer"
property name="hashValueSerializer" ref="valueSerializer"
bean
beans

44 实现分布式锁

package cnsmilehappinessdistributedlocklettuce;
import cnsmilehappinessdistributedlocklettuceutilLettuceConnection;
import iolettucecoreapisyncRedisCommands;
import javautilList;
import javautilUUID;
import javautilconcurrentExecutors;
import javautilconcurrentScheduledExecutorService;
import javautilconcurrentTimeUnit;
* p
* 基于Lettuce客户端,实现分布式锁
* p
* author smilehappiness
* Date 82 15
public class LettuceDistributeLock {
private static final String redisLockPrefix = "redislock";
* 设置锁超时时间,单位是毫秒
private static final Long LockTimeOut = 30000L;
* 静态代码块在类加载的时候执行,只会一次
static {
分布式锁的自动续期,因为锁超时时间内,可能还没有执行完业务处理
启动一个后台任务,定时去检查redis的分布式锁是否需要续期(过期时间往后延一下)
ScheduledExecutorService scheduledExecutorService = ExecutorsneSingleThreadScheduledExecutor();
scheduledExecutorServicescheduleAtFixedRate(() {
Systemoutprintln("执行Lettuce分布式锁续期");
RedisCommands String, String syncCommands = LettuceConnectiongetStatefulRedisConnection()sync();
List String setKeys = syncCommandskeys(redisLockPrefix + "*");
setKeysforEach((String key) {
目前redis中锁剩下的过期时间
key不存在的时候返回2
Long leftExpire = syncCommandsttl(key);
if (leftExpire = 1 leftExpire = (LockTimeOut 10000) 1000) {
syncCommandspexpire(key, LockTimeOut);
TimeUnitSECONDS);
* p
* 获取分布式锁
* p
* param lockName
* param acquireTimeOut 单位是毫秒
* param lockTimeOut 单位是毫秒
* return javalangString
* Date 82 1213
public String getRedisLock(String lockName, Long acquireTimeOut, Long lockTimeOut) {
String redisLockKey = redisLockPrefix + lockName;
String uniqueValue = UUIDrandomUUID()toString();
RedisCommands String, String syncCommands = LettuceConnectiongetStatefulRedisConnection()sync();
往后延acquireTimeOut秒(比如3秒)
Long endTime = SystemcurrentTimeMillis() + acquireTimeOut;
如果不超过指定的锁获取时间,有资格获取锁
hile (SystemcurrentTimeMillis() endTime) {
设置key和设置key的过期时间
注意:这里不是原子操作(不在一个步骤中,是分了两步),有可能会有小问题(如果设置了锁还没来得及设置过期时间,redis服务挂了,可能导致死锁问题,下边双重判断过期时间,如果没有设置超时时间,来避免死锁问题)
if (syncCommandssetnx(redisLockKey, uniqueValue)) {
设置key成功,表示拿到锁
syncCommandspexpire(redisLockKey, lockTimeOut);
return uniqueValue;
if (syncCommandsttl(redisLockKey) == 1) {
设置过期时间
syncCommandspexpire(redisLockKey, lockTimeOut);
try {
立刻马上去循环再获取锁,其实不是很好也没有什么意义,最好是稍等片刻再去重试获取锁
Threadsleep(100);
} catch (InterruptedException e) {
eprintStackTrace();
return null;
* p
* 释放redis锁
* p
* param lockName
* param uniqueValue
* return void
* Date 82 1212
public void releaseRedisLock(String lockName, String uniqueValue) {
redis锁的key
String redisLockKey = redisLockPrefix + lockName;
RedisCommands String, String syncCommands = LettuceConnectiongetStatefulRedisConnection()sync();
保证只能释放自己的锁(自己的锁自己解,不要把别人的锁给解了)
if (syncCommandsget(redisLockKey)equals(uniqueValue)) {
syncCommandsdel(redisLockKey);

45 功能测试

**
* p
* 分布式锁测试
* p
* param
* return void
* Date 82 56
private void testDistributedLock() {
使用Lettuce客户端实现的分布式锁
private LettuceDistributeLock redisDistributeLock = ne LettuceDistributeLock();
String lockName = "lockName";
String lockUniqueValue = null;
try {
获取分布式锁,然后下面的业务代码就会按顺序排队执行
lockUniqueValue = redisDistributeLockgetRedisLock(lockName, 3000L, 30000L);
拿到redis分布式锁
if (lockUniqueValue != null) {
TODO 执行业务代码
} catch (Exception e) {
thro ne RuntimeException(e);
} finally {
释放分布式锁
redisDistributeLockreleaseRedisLock(lockName, lockUniqueValue);

5 基于Redisson客户端,实现分布式锁(推荐)
51 添加依赖

 ! springdataredis
dependency
groupId orgspringframeorkdata groupId
artifactId springdataredis artifactId
version 2110RELEASE version
dependency
! redisson
dependency
groupId orgredisson groupId
artifactId redisson artifactId
version 3125 version
dependency

52 相关配置

【redis配置】

#ip地址
redishostName=127001
redisport=6379
#如果有密码
redispassord=123456
redisdatabase=0

【Spring配置】

 ?xml version="10" encoding="UTF8"?
beans xmlns="springframeorkorgschemabeans"
xmlnsxsi="3org01XMLSchemainstance"
xmlnsredisson="redissonorgschemaredisson"
xsischemaLocation="springframeorkorgschemabeans
sspringframeorkorgschemabeansspringbeansxsd
redissonorgschemaredisson
redissonorgschemaredissonredissonxsd"
!redisson客户端对象
redissonclient
redissonsingleserver address="redislocalhost6379" passord="123456"
redissonclient
beans

53 实现分布式锁

Redisson客户端还是非常强大的,客户端本身提供的有分布式锁,而且支持自动续期等,推荐使用。

**
* p
* 分布式锁测试
* p
* param
* return void
* Date 82 56
private void testDistributedLock() {
String lockName = "lockName";
RLock rLock = redissonClientgetLock(lockName);
try {
获取分布式锁,然后下面的业务代码就会按顺序排队执行
rLocklock();
TODO 拿到redis分布式锁,执行业务代码
} catch (Exception e) {
thro ne RuntimeException(e);
} finally {
释放分布式锁
if (rLockisHeldByCurrentThread() rLockisLocked()) {
rLockunlock();

好啦,本界内容就介绍到这里了,如果对你有帮助,老铁们给个赞支持下呗!

有问题的伙伴们,欢迎评论讨论哈。鉴于笔者理解的深度,可能理解的有不到位的地方,欢迎各路大神指点!

写博客是为了记住自己容易忘记的东西,另外也是对自己工作的总结,希望尽自己的努力,做到更好,大家一起努力进步!
如果有什么问题,欢迎大家评论,一起探讨,代码如有问题,欢迎各位大神指正!
给自己的梦想添加一双翅膀,让它可以在天空中自由自在的飞翔!