Spring自带分布式锁你用过吗?
创始人
2025-07-10 17:20:32
0

环境:SpringBoot2.7.12

本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。

1. 简介

Spring Integration 是一个框架,用于构建事件驱动的应用程序。在 Spring Integration 中,LockRegistry 是一个接口,用于管理分布式锁。分布式锁是一种同步机制,用于确保在分布式系统中的多个节点之间对共享资源的互斥访问。

LockRegistry及相关子接口(如:RenewableLockRegistry) 接口的主要功能:

  • 获取锁:当应用程序需要访问共享资源时,它可以通过 LockRegistry 获取一个锁。
  • 释放锁:当应用程序完成对共享资源的访问后,它应该释放锁,以便其他应用程序可以获取它(第一点中提到,并没有提供直接释放锁的操作,而是内部自动完成)。
  • 续期:提供续期机制,以便在需要时延长锁的持有时间。

常见的 LockRegistry 实现包括基于数据库、ZooKeeper 和 Redis 的实现。

公共依赖


  org.springframework.boot
  spring-boot-starter-integration

2. 基于数据库分布式锁

引入依赖


  org.springframework.integration
  spring-integration-jdbc


  com.zaxxer
  HikariCP
  compile


  mysql
  mysql-connector-java
  8.0.30

配置

spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false
    username: root
    password: xxxooo
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
---
spring:
  integration:
    jdbc:
      initialize-schema: always
      # 基于数据库需要执行初始化脚本
      schema: classpath:schema-mysql.sql

注册核心Bean对象

@Bean
public DefaultLockRepository defaultLockRepository(DataSource dataSource) {
  DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
  // 这里根据你的业务需要,配置表前缀,默认:IN_
  lockRepository.setPrefix("T_") ;
  return lockRepository ;
}


// 注册基于数据库的分布式锁
@Bean
public JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) {
  return new JdbcLockRegistry(lockRepository) ;
}

测试用例

@Test
public void testLock() throws Exception
  int len = 10 ;
  CountDownLatch cdl = new CountDownLatch(len) ;
  CountDownLatch waiter = new CountDownLatch(len) ;
  Thread[] ts = new Thread[len] ;
  for (int i = 0; i < len; i++) {
    ts[i] = new Thread(() -> {
      waiter.countDown() ;
      System.out.println(Thread.currentThread().getName() + " - 准备获取锁") ;
      try {
        waiter.await() ;
      } catch (InterruptedException e1) {
        e1.printStackTrace();
      }
      // 获取锁
      Lock lock = registry.obtain("drug_store_key_001") ;
      lock.lock() ;
      System.out.println(Thread.currentThread().getName() + " - 获取锁成功") ;
      try {
        try {
          TimeUnit.SECONDS.sleep(2) ;
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      } finally {
        // 释放锁
        lock.unlock() ;
        cdl.countDown() ;
        System.out.println(Thread.currentThread().getName() + " - 锁释放成功") ;
      }
    }, "T - " + i) ;
  }
  for (int i = 0; i < len; i++) {
    ts[i].start() ; 
  }
  cdl.await() ;
}

数据库

图片图片

锁的实现JdbcLock,该对象实现了java.util.concurrent.locks.Lock,所以该锁是支持重入等操作的。

配置锁获取失败后的重试间隔,默认值100ms

JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository);
// 定义锁对象时设置当获取锁失败后重试间隔时间。
jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ;

锁续期

jdbcLockRegistry.renewLock("drug_store_key_001");

3. 基于Redis分布式锁

引入依赖


  org.springframework.integration
  spring-integration-jdbc


  org.springframework.boot
  spring-boot-starter-data-redis

配置

spring:
  redis:
    host: localhost
    port: 6379
    password: xxxooo
    database: 8
    lettuce:
      pool:
        maxActive: 8
        maxIdle: 100
        minIdle: 10
        maxWait: -1

测试用例

测试代码与上面基于JDBC的一样,只需要修改调用加锁的代码即可

Lock lock = redisLockRegistry.obtain("001") ;

设置锁的有效期,默认是60s

// 第三个参数设置了key的有效期,这里改成10s
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ;

注意:redis key的有效期设置为10s,如果你的业务执行超过了10s,那么程序将会报错。并没有redission watch dog机制。

Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.
  at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450)
  at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83)
  at java.lang.Thread.run(Thread.java:748)

如果10s过期后key自动删除后,其它线程是否能立马获取到锁呢?如果是单节点中其它现在也不能获取锁,必须等上一个线程结束后才可以,这是因为在内部还维护了一个ReentrantLock锁,在获取分布式锁前要先获取本地的一个锁。

private abstract class RedisLock implements Lock {
  private final ReentrantLock localLock = new ReentrantLock();
  public final void lock() {
      this.localLock.lock();
      while (true) {
        try {
          if (tryRedisLock(-1L)) {
            return;
          }
        } catch (InterruptedException e) {
        } catch (Exception e) {
          this.localLock.unlock();
          rethrowAsLockException(e);
        }
      }
    }
}

注意:不管是基于数据库还是Redis都要先获取本地的锁

Spring Cloud Task就使用到了Spring Integration中的锁基于数据库的。

总结:Spring Integration 的分布式锁为开发者提供了一种在分布式系统中实现可靠同步的有效方法。通过合理选择和使用这些锁实现,可以确保对共享资源的访问在多个节点之间保持协调一致,从而提高系统的整体可靠性和性能。

完毕!!!

相关内容

热门资讯

PHP新手之PHP入门 PHP是一种易于学习和使用的服务器端脚本语言。只需要很少的编程知识你就能使用PHP建立一个真正交互的...
网络中立的未来 网络中立性是什... 《牛津词典》中对“网络中立”的解释是“电信运营商应秉持的一种原则,即不考虑来源地提供所有内容和应用的...
各种千兆交换机的数据接口类型详... 千兆交换机有很多值得学习的地方,这里我们主要介绍各种千兆交换机的数据接口类型,作为局域网的主要连接设...
什么是大数据安全 什么是大数据... 在《为什么需要大数据安全分析》一文中,我们已经阐述了一个重要观点,即:安全要素信息呈现出大数据的特征...
如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
P2P的自白|我不生产内容,我... 现在一提起P2P,人们就会联想到正在被有关部门“围剿”的互联网理财服务。×租宝事件使得劳...
Intel将Moblin社区控... 本周二,非营利机构Linux基金会宣布,他们将担负起Moblin社区的管理工作,而这之前,Mobli...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
Windows恶意软件20年“... 在Windows的早期年代,病毒游走于系统之间,偶尔删除文件(但被删除的文件几乎都是可恢复的),并弹...