Spring自带分布式锁你用过吗?
创始人
2025-07-10 17:20:32
0

环境:SpringBoot2.7.12

本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。

1. 简介

Spring Integration 是一个框架,用于构建事件驱动的应用程序。在 Spring Integration 中,LockRegistry 是一个接口,用于管理分布式锁。分布式锁是一种同步机制,用于确保在分布式系统中的多个节点之间对共享资源的互斥访问。

LockRegistry及相关子接口(如:RenewableLockRegistry) 接口的主要功能:

  • 获取锁:当应用程序需要访问共享资源时,它可以通过 LockRegistry 获取一个锁。
  • 释放锁:当应用程序完成对共享资源的访问后,它应该释放锁,以便其他应用程序可以获取它(第一点中提到,并没有提供直接释放锁的操作,而是内部自动完成)。
  • 续期:提供续期机制,以便在需要时延长锁的持有时间。

常见的 LockRegistry 实现包括基于数据库、ZooKeeper 和 Redis 的实现。

公共依赖


  org.springframework.boot
  spring-boot-starter-integration

2. 基于数据库分布式锁

引入依赖


  org.springframework.integration
  spring-integration-jdbc


  com.zaxxer
  HikariCP
  compile


  mysql
  mysql-connector-java
  8.0.30

配置

spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false
    username: root
    password: xxxooo
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
---
spring:
  integration:
    jdbc:
      initialize-schema: always
      # 基于数据库需要执行初始化脚本
      schema: classpath:schema-mysql.sql

注册核心Bean对象

@Bean
public DefaultLockRepository defaultLockRepository(DataSource dataSource) {
  DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
  // 这里根据你的业务需要,配置表前缀,默认:IN_
  lockRepository.setPrefix("T_") ;
  return lockRepository ;
}


// 注册基于数据库的分布式锁
@Bean
public JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) {
  return new JdbcLockRegistry(lockRepository) ;
}

测试用例

@Test
public void testLock() throws Exception
  int len = 10 ;
  CountDownLatch cdl = new CountDownLatch(len) ;
  CountDownLatch waiter = new CountDownLatch(len) ;
  Thread[] ts = new Thread[len] ;
  for (int i = 0; i < len; i++) {
    ts[i] = new Thread(() -> {
      waiter.countDown() ;
      System.out.println(Thread.currentThread().getName() + " - 准备获取锁") ;
      try {
        waiter.await() ;
      } catch (InterruptedException e1) {
        e1.printStackTrace();
      }
      // 获取锁
      Lock lock = registry.obtain("drug_store_key_001") ;
      lock.lock() ;
      System.out.println(Thread.currentThread().getName() + " - 获取锁成功") ;
      try {
        try {
          TimeUnit.SECONDS.sleep(2) ;
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      } finally {
        // 释放锁
        lock.unlock() ;
        cdl.countDown() ;
        System.out.println(Thread.currentThread().getName() + " - 锁释放成功") ;
      }
    }, "T - " + i) ;
  }
  for (int i = 0; i < len; i++) {
    ts[i].start() ; 
  }
  cdl.await() ;
}

数据库

图片图片

锁的实现JdbcLock,该对象实现了java.util.concurrent.locks.Lock,所以该锁是支持重入等操作的。

配置锁获取失败后的重试间隔,默认值100ms

JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository);
// 定义锁对象时设置当获取锁失败后重试间隔时间。
jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ;

锁续期

jdbcLockRegistry.renewLock("drug_store_key_001");

3. 基于Redis分布式锁

引入依赖


  org.springframework.integration
  spring-integration-jdbc


  org.springframework.boot
  spring-boot-starter-data-redis

配置

spring:
  redis:
    host: localhost
    port: 6379
    password: xxxooo
    database: 8
    lettuce:
      pool:
        maxActive: 8
        maxIdle: 100
        minIdle: 10
        maxWait: -1

测试用例

测试代码与上面基于JDBC的一样,只需要修改调用加锁的代码即可

Lock lock = redisLockRegistry.obtain("001") ;

设置锁的有效期,默认是60s

// 第三个参数设置了key的有效期,这里改成10s
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ;

注意:redis key的有效期设置为10s,如果你的业务执行超过了10s,那么程序将会报错。并没有redission watch dog机制。

Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.
  at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450)
  at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83)
  at java.lang.Thread.run(Thread.java:748)

如果10s过期后key自动删除后,其它线程是否能立马获取到锁呢?如果是单节点中其它现在也不能获取锁,必须等上一个线程结束后才可以,这是因为在内部还维护了一个ReentrantLock锁,在获取分布式锁前要先获取本地的一个锁。

private abstract class RedisLock implements Lock {
  private final ReentrantLock localLock = new ReentrantLock();
  public final void lock() {
      this.localLock.lock();
      while (true) {
        try {
          if (tryRedisLock(-1L)) {
            return;
          }
        } catch (InterruptedException e) {
        } catch (Exception e) {
          this.localLock.unlock();
          rethrowAsLockException(e);
        }
      }
    }
}

注意:不管是基于数据库还是Redis都要先获取本地的锁

Spring Cloud Task就使用到了Spring Integration中的锁基于数据库的。

总结:Spring Integration 的分布式锁为开发者提供了一种在分布式系统中实现可靠同步的有效方法。通过合理选择和使用这些锁实现,可以确保对共享资源的访问在多个节点之间保持协调一致,从而提高系统的整体可靠性和性能。

完毕!!!

相关内容

热门资讯

如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
Windows恶意软件20年“... 在Windows的早期年代,病毒游走于系统之间,偶尔删除文件(但被删除的文件几乎都是可恢复的),并弹...
20个非常棒的扁平设计免费资源 Apple设备的平面图标PSD免费平板UI 平板UI套件24平图标Freen平板UI套件PSD径向平...
德国电信门户网站可实时显示全球... 德国电信周三推出一个门户网站,直观地实时提供其安装在全球各地的传感器网络检测到的网络攻击状况。该网站...
着眼MAC地址,解救无法享受D... 在安装了DHCP服务器的局域网环境中,每一台工作站在上网之前,都要先从DHCP服务器那里享受到地址动...
为啥国人偏爱 Mybatis,... 关于 SQL 和 ORM 的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行...