SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

发布时间:2022-07-02 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

目录

    • 一、简介
    • 二、maven依赖
    • 三、配置类
      • 3.1、属性配置文件
      • 3.2、属性配置类
      • 3.3、ZookeePErconfig配置类(重要
      • 3.4、ZookeeperClient配置类(重要
    • 四、业务编写
      • 4.1、抽象类AbstractLock
      • 4.2、锁使实现(核心
      • 4.3、controller层
    • 五、测试
      • 5.1、配置文件application.yML
      • 5.2、nginx转发配置
      • 5.3、使用jemeter并发测试
      • 5.4、测试结果
      • 5.5、加锁时间
    • 六、InterPRocessMutex原理分析
      • 6.1、实例化
        • 6.1.1、本地调用实例化
        • 6.1.2、码-实例化
      • 6.2、获取锁acquire()
        • 6.2.1、本地调用获取锁方法
        • 6.2.2、源码-internalLock()
        • 6.2.3、源码-attemptLock()(重要
        • 6.2.4、源码-createstheLock()
        • 6.2.5、源码-internalLockLoop()(核心
        • 6.2.6、源码-getsTheLock()(重要
      • 6.3、释放锁
        • 6.3.1、本地调用释放锁
        • 6.3.2、源码-release()(重要
        • 6.3.3、源码-releaseLock

一、简介@H_139_126@

  我们知道在JDK 的 java.util.concurrent.locks包中提供了可重入锁,读写锁,及超时获取锁的方法等,但在分布式系统中,当多个应用需要共同操作某一个资源时,就没办法使用JDK里的锁实现了,所以今天的主角就是ZooKeeper + Curator 来完成分布式锁,Curator 提供的四种方案F1a;

  • InterProcessMutex:分布式可重入排它锁
  • InterProcesssemaphoreMutex:分布式排它锁
  • InterProcessReadWrITeLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

本文主要介绍可重入排它锁 InterProcessMutex 的相关使用及源码解读

二、maven依赖

pom.XMl

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent From repository -->
    </parent>
    <groupId>com.alian</groupId>
    <artifactId>zookeeper-curator</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zookeeper-curator</name>
    <description>SpringBoot基于Zookeeper和Curator实现分布式锁</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-LOG4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、配置类

3.1、属性配置文件

# zookeeper服务器地址(ip+port)
zookeeper.server=10.130.3.16:2181
# 休眠时间
zookeeper.sleep-time=1000
# 最大重试次数
zookeeper.max-retries=3
# 会话超时时间
zookeeper.session-timeout=15000
# 连接超时时间
zookeeper.connection-timeout=5000

本机环境有限就不搭建集群了,具体还是在于curator分布式锁的使用及原理。

3.2、属性配置类

  此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式

ZookeeperProperties.java

package com.alian.zookeepercurator.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.component;

@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
//读取指定路径配置文件,暂不支持*.yaml文件
@PropertySource(value = "classpath:config/zookeeper.properties", encoding = "UTF-8", ignoreResourceNotFound = true)
public class ZookeeperProperties {

    /**
     * zookeeper服务地址
     */
    private String server;

    /**
     * 重试等待时间
     */
    private int sleepTime;

    /**
     * 最大重试次数
     */
    private int maxRetries;

    /**
     * session超时时间
     */
    private int sessionTimeout;

    /**
     * 连接超时时间
     */
    private int connectionTimeout;

}

3.3、ZookeeperConfig配置类(重要

ZookeeperConfig.java

  此配置类主要是使用CuratorFramework来连接zookeeper。

package com.alian.zookeepercurator.config;

import com.alian.zookeepercurator.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.Retrypolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class ZookeeperConfig {

    @Autowired
    private ZookeeperProperties zookeeperProperties;

    @Bean
    public CuratorFramework curatorFrameworkClient() {
        //重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
        RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(), zookeeperProperties.getMaxRetries());
        //构建CuratorFramework实例
        CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory
                .builder()
                .connectString(zookeeperProperties.getServer())
                .sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
                .connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
                .retryPolicy(policy)
                .build();
        //启动实例
        curatorFrameworkClient.start();
        return curatorFrameworkClient;
    }

    //采用这种方式注册bean可以比较优雅的关闭连接
    @Bean(destroyMethod = "destroy")
    public ZookeeperClient zookeeperClient(CuratorFramework curatorFrameworkClient) {
        return new ZookeeperClient(curatorFrameworkClient);
    }

}

3.4、ZookeeperClient配置类(重要

ZookeeperClient.java

  这个bean是在上面的配置类里定义的,还定义了销毁的方法,这样的好处是,当服务断开后,可以关闭连接,如果直接关闭服务可能会抛出一个异常。使用和其他的使用是一样的,当然如果你为了方便,使用@Component也没有问题。

package com.alian.zookeepercurator.common;

import com.alian.zookeepercurator.lock.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

@Slf4j
public class ZookeeperClient {

    private CuratorFramework curatorFramework;

    public ZookeeperClient(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    public <T> T lock(AbstractLock<T> abstractLock) {
        //获取锁路径
        String lockPath = abstractLock.getLockPath();
        //创建InterProcessMutex实例
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); //创建锁对象
        boolean success = false;
        try {
            try {
                //加锁
                success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
            } catch (Exception e) {
                throw new RuntimeException("oBTain lock error " + e.getMessage() + ", lockPath " + lockPath);
            }
            //判断是否加锁成功
            if (success) {
                return abstractLock.execute();
            } else {
            	log.info("获取锁失败,返回null");
                return null;
            }
        } finally {
            try {
                if (success) {
                    //释放锁
                    lock.release();
                }
            } catch (Exception e) {
                log.error("release lock error {}, lockPath {}", e.getMessage(), lockPath);
            }
        }
    }

    //bean的销毁方法
    public void destroy() {
        try {
            log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
            if (getCuratorFramework() != null) {
                //这种方式比较优雅的关闭连接
                getCuratorFramework().close();
            }
        } catch (Exception e) {
            log.error("stop zookeeper client error {}", e.getMessage());
        }
    }

    public CuratorFramework getCuratorFramework() {
        return curatorFramework;
    }
}

四、业务编写

4.1、抽象类AbstractLock

AbstractLock.java   定义一个抽象锁的类,包含锁路径,过期时间及时间单位,子类只需要实现execute方法即可。

package com.alian.zookeepercurator.common;

import java.util.concurrent.TimeUnit;

public abstract class AbstractLock<T> {

    /**
     * 锁路径
     */
    protected String lockPath;

    /**
     * 超时时间
     */
    protected long time;

    protected TimeUnit timeUnit;

    public AbstractLock(String lockPath, long time, TimeUnit timeUnit) {
        this.lockPath = lockPath;
        this.time = time;
        this.timeUnit = timeUnit;
    }

    public void setLockPath(String lockPath) {
        this.lockPath = lockPath;
    }

    public String getLockPath() {
        return lockPath;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public void setTimeUnit(TimeUnit timeUnit) {
        this.timeUnit = timeUnit;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    /**
     * 执行业务的方法
     *
     * @return
     */
    public abstract T execute();
}

4.2、锁使实现(核心

CuratorLockService.java

package com.alian.zookeepercurator.service;

import com.alian.zookeepercurator.common.ZookeeperClient;
import com.alian.zookeepercurator.common.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class CuratorLockService {

    @Autowired
    private ZookeeperClient zookeeperClient;

    @Autowired
    private CuratorFramework curatorFramework;

    //库存存取的路径
    private static final String dataPath = "/root/data/stock";

    //初始化库存的路径
    private static final String initPath = "/root/init/stock";

    /**
     * 此方法系统启动执行,使用zookeeper存一个库存用于测试,这里也使用了锁。(只是一个模拟初始化库存的方法)
     */
    @PostConstruct
    public void init() {
        zookeeperClient.lock(new AbstractLock<Boolean>(initPath, 20, TimeUnit.SECONDS) {
            @override
            public Boolean execute() {
                try {
                    //判断是否存在路径
                    Stat stat = curatorFramework.checkExists().forPath(dataPath);
                    if (stat == null) {
                        //为空则不存在,则创建并设置库存值
                        curatorFramework.create().forPath(dataPath, "1000".getBytes());
                        log.info("初始化数据完成");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return false;
                }
                return true;
            }
        });
    }

    public String inventoryDeduct(String lockId) {
    	//我这里是演示,实际对于不同的业务锁路径设置不同,比如支付和订单设置为"/root/pay/"和"/root/order/"
        String lockPath = "/root/alian/" + lockId;
        //调用加锁方法
        Integer result = zookeeperClient.lock(new AbstractLock<Integer>(lockPath, 10, TimeUnit.SECONDS) {
            @Override
            public Integer execute() {
                try {
                    //模拟业务处理
                    byte[] bytes = curatorFramework.getData().forPath(dataPath);
                    String data = new String(bytes);
                    int stock = Integer.parseint(data);
                    if (stock > 0) {
                        //扣减库存
                        stock--;
                        curatorFramework.setData().forPath(dataPath, (stock + "").getBytes());
                    }
                    return stock;
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
        });
        if (result==null){
            log.info("业务执行失败");
            return "业务执行失败";
        }else {
            log.info("执行成功,剩余库存:"+result);
            return "执行成功,剩余库存:"+result;
        }
    }

}

4.3、controller层

CuratorController.java

package com.alian.zookeepercurator.controller;

import com.alian.zookeepercurator.service.CuratorLockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;

@Slf4j
@RequestMapping("/test")
@RestController
public class CuratorController {

    @Autowired
    private CuratorLockService curatorLockService;

    @RequestMapping("/deduct")
    public String deduct(HttpServletRequest request) {
        String lockId = request.getParameter("lockId");
        return curatorLockService.inventoryDeduct(lockId);
    }

}

五、测试

5.1、配置文件application.yml

  由于我们是windows环境下的本机开发及测试,我们使用idea启动两个实例,端口分别为70807081。如果不懂的可以参考我另一篇文章:windows下Nginx配置及负载均衡使用 application.yml

server:
  port: 7080
  servlet:
    context-path: /curator

两个实例启动的示例图:

SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

5.2、nginx转发配置

  自定义配置文件localhost_80.confserver模块里增加转发配置,通过负载均衡到两个实例上。

	location ~ ^/curator/ {
        Proxy_redirect off;
		#端口
        proxy_set_header Host $host;
		#远程地址
        proxy_set_header X-Real-IP $remote_addr;
		#程序可获取远程ip地址
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
		#此处会用的upstream.conf,此文件在nginx.conf已经引入了
        proxy_pass http://curator;
    }

负载均衡配置upstream.conf文件增加下面的配置,其中zookeeper-id 就是localhost_80.conf文件里配置的http://curator;

	upstream zookeeper-id {
	    server 127.0.0.1:7080 ;
		server 127.0.0.1:7081 ;
	}

  如果你的nginx已经启动,最后记得使用命令nginx -t 检查和nginx -s reload应用。

5.3、使用jemeter并发测试

  本文中使用使用50个线程请求我的接口获取id,50表示线程数,0表示0秒内一起发送,1表示请求循环的次数。

SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

  我们请求的地址是:http://localhost/curator/test/deduct,注意是没有端口的,会通过nginx转发到后台实例。

@H_267_3603@

5.4、测试结果

端口为7081实例的结果:

2021-10-21 17:35:47 168 [http-nio-7081-exec-12] INFO inventoryDeduct 83:执行成功,剩余库存:999
2021-10-21 17:35:47 172 [http-nio-7081-exec-2] INFO inventoryDeduct 83:执行成功,剩余库存:998
2021-10-21 17:35:47 174 [http-nio-7081-exec-9] INFO inventoryDeduct 83:执行成功,剩余库存:997
2021-10-21 17:35:47 176 [http-nio-7081-exec-8] INFO inventoryDeduct 83:执行成功,剩余库存:996
2021-10-21 17:35:47 192 [http-nio-7081-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:991
2021-10-21 17:35:47 195 [http-nio-7081-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:990
2021-10-21 17:35:47 201 [http-nio-7081-exec-23] INFO inventoryDeduct 83:执行成功,剩余库存:988
2021-10-21 17:35:47 206 [http-nio-7081-exec-20] INFO inventoryDeduct 83:执行成功,剩余库存:986
2021-10-21 17:35:47 209 [http-nio-7081-exec-16] INFO inventoryDeduct 83:执行成功,剩余库存:985
2021-10-21 17:35:47 216 [http-nio-7081-exec-18] INFO inventoryDeduct 83:执行成功,剩余库存:983
2021-10-21 17:35:47 221 [http-nio-7081-exec-15] INFO inventoryDeduct 83:执行成功,剩余库存:981
2021-10-21 17:35:47 225 [http-nio-7081-exec-14] INFO inventoryDeduct 83:执行成功,剩余库存:980
2021-10-21 17:35:47 228 [http-nio-7081-exec-25] INFO inventoryDeduct 83:执行成功,剩余库存:979
2021-10-21 17:35:47 235 [http-nio-7081-exec-10] INFO inventoryDeduct 83:执行成功,剩余库存:976
2021-10-21 17:35:47 242 [http-nio-7081-exec-21] INFO inventoryDeduct 83:执行成功,剩余库存:973
2021-10-21 17:35:47 246 [http-nio-7081-exec-5] INFO inventoryDeduct 83:执行成功,剩余库存:971
2021-10-21 17:35:47 248 [http-nio-7081-exec-6] INFO inventoryDeduct 83:执行成功,剩余库存:970
2021-10-21 17:35:47 255 [http-nio-7081-exec-24] INFO inventoryDeduct 83:执行成功,剩余库存:967
2021-10-21 17:35:47 261 [http-nio-7081-exec-7] INFO inventoryDeduct 83:执行成功,剩余库存:964
2021-10-21 17:35:47 263 [http-nio-7081-exec-22] INFO inventoryDeduct 83:执行成功,剩余库存:963
2021-10-21 17:35:47 265 [http-nio-7081-exec-4] INFO inventoryDeduct 83:执行成功,剩余库存:962
2021-10-21 17:35:47 272 [http-nio-7081-exec-19] INFO inventoryDeduct 83:执行成功,剩余库存:959
2021-10-21 17:35:47 276 [http-nio-7081-exec-11] INFO inventoryDeduct 83:执行成功,剩余库存:957
2021-10-21 17:35:47 280 [http-nio-7081-exec-13] INFO inventoryDeduct 83:执行成功,剩余库存:955
2021-10-21 17:35:47 283 [http-nio-7081-exec-17] INFO inventoryDeduct 83:执行成功,剩余库存:954

端口为7080实例的结果:

2021-10-21 17:35:47 183 [http-nio-7080-exec-15] INFO inventoryDeduct 83:执行成功,剩余库存:995
2021-10-21 17:35:47 186 [http-nio-7080-exec-21] INFO inventoryDeduct 83:执行成功,剩余库存:994
2021-10-21 17:35:47 188 [http-nio-7080-exec-20] INFO inventoryDeduct 83:执行成功,剩余库存:993
2021-10-21 17:35:47 190 [http-nio-7080-exec-6] INFO inventoryDeduct 83:执行成功,剩余库存:992
2021-10-21 17:35:47 197 [http-nio-7080-exec-17] INFO inventoryDeduct 83:执行成功,剩余库存:989
2021-10-21 17:35:47 203 [http-nio-7080-exec-4] INFO inventoryDeduct 83:执行成功,剩余库存:987
2021-10-21 17:35:47 212 [http-nio-7080-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:984
2021-10-21 17:35:47 218 [http-nio-7080-exec-16] INFO inventoryDeduct 83:执行成功,剩余库存:982
2021-10-21 17:35:47 230 [http-nio-7080-exec-19] INFO inventoryDeduct 83:执行成功,剩余库存:978
2021-10-21 17:35:47 232 [http-nio-7080-exec-24] INFO inventoryDeduct 83:执行成功,剩余库存:977
2021-10-21 17:35:47 237 [http-nio-7080-exec-23] INFO inventoryDeduct 83:执行成功,剩余库存:975
2021-10-21 17:35:47 239 [http-nio-7080-exec-8] INFO inventoryDeduct 83:执行成功,剩余库存:974
2021-10-21 17:35:47 244 [http-nio-7080-exec-5] INFO inventoryDeduct 83:执行成功,剩余库存:972
2021-10-21 17:35:47 251 [http-nio-7080-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:969
2021-10-21 17:35:47 253 [http-nio-7080-exec-10] INFO inventoryDeduct 83:执行成功,剩余库存:968
2021-10-21 17:35:47 257 [http-nio-7080-exec-12] INFO inventoryDeduct 83:执行成功,剩余库存:966
2021-10-21 17:35:47 259 [http-nio-7080-exec-11] INFO inventoryDeduct 83:执行成功,剩余库存:965
2021-10-21 17:35:47 268 [http-nio-7080-exec-25] INFO inventoryDeduct 83:执行成功,剩余库存:961
2021-10-21 17:35:47 270 [http-nio-7080-exec-22] INFO inventoryDeduct 83:执行成功,剩余库存:960
2021-10-21 17:35:47 274 [http-nio-7080-exec-14] INFO inventoryDeduct 83:执行成功,剩余库存:958
2021-10-21 17:35:47 278 [http-nio-7080-exec-9] INFO inventoryDeduct 83:执行成功,剩余库存:956
2021-10-21 17:35:47 285 [http-nio-7080-exec-2] INFO inventoryDeduct 83:执行成功,剩余库存:953
2021-10-21 17:35:47 288 [http-nio-7080-exec-18] INFO inventoryDeduct 83:执行成功,剩余库存:952
2021-10-21 17:35:47 290 [http-nio-7080-exec-7] INFO inventoryDeduct 83:执行成功,剩余库存:951
2021-10-21 17:35:47 292 [http-nio-7080-exec-13] INFO inventoryDeduct 83:执行成功,剩余库存:950

从结果上看来我们的库存正常扣减了1000-50=950,

5.5、加锁时间

  关于加锁超时的一些说明和举例:假设我们加锁的超时时间是10秒钟,现在有20个并发线程进行库存扣减,每个线程需要执行1秒钟,那么最后我们成功获取到锁的线程只有10个,其他10个线程获取锁失败了,就只会减10个库存,结果如下:

端口为7081实例的结果:

2021-10-21 17:51:15 081 [http-nio-7081-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:948
2021-10-21 17:51:19 145 [http-nio-7081-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:944
2021-10-21 17:51:20 162 [http-nio-7081-exec-9] INFO inventoryDeduct 83:执行成功,剩余库存:943
2021-10-21 17:51:21 175 [http-nio-7081-exec-6] INFO inventoryDeduct 83:执行成功,剩余库存:942
2021-10-21 17:51:23 057 [http-nio-7081-exec-2] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 057 [http-nio-7081-exec-5] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 057 [http-nio-7081-exec-7] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 058 [http-nio-7081-exec-7] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 058 [http-nio-7081-exec-2] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 058 [http-nio-7081-exec-8] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 058 [http-nio-7081-exec-4] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 058 [http-nio-7081-exec-8] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 058 [http-nio-7081-exec-4] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 058 [http-nio-7081-exec-5] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 059 [http-nio-7081-exec-10] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 060 [http-nio-7081-exec-10] INFO inventoryDeduct 80:业务执行失败

端口为7080实例的结果:

2021-10-21 17:51:14 069 [http-nio-7080-exec-10] INFO inventoryDeduct 83:执行成功,剩余库存:949
2021-10-21 17:51:16 100 [http-nio-7080-exec-1] INFO inventoryDeduct 83:执行成功,剩余库存:947
2021-10-21 17:51:17 118 [http-nio-7080-exec-8] INFO inventoryDeduct 83:执行成功,剩余库存:946
2021-10-21 17:51:18 131 [http-nio-7080-exec-3] INFO inventoryDeduct 83:执行成功,剩余库存:945
2021-10-21 17:51:22 193 [http-nio-7080-exec-5] INFO inventoryDeduct 83:执行成功,剩余库存:941
2021-10-21 17:51:23 056 [http-nio-7080-exec-7] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 056 [http-nio-7080-exec-7] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 057 [http-nio-7080-exec-9] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 057 [http-nio-7080-exec-9] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 057 [http-nio-7080-exec-6] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 057 [http-nio-7080-exec-2] INFO lock 33:获取锁失败,返回null
2021-10-21 17:51:23 057 [http-nio-7080-exec-6] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 057 [http-nio-7080-exec-2] INFO inventoryDeduct 80:业务执行失败
2021-10-21 17:51:23 208 [http-nio-7080-exec-4] INFO inventoryDeduct 83:执行成功,剩余库存:940

六、InterProcessMutex原理分析

  首先声明下我这里的zookeeper服务的版本3.6.3zookeeper版本为3.6.3(jar版本),curator版本为5.2.0

6.1、实例化

6.1.1、本地调用实例化

  传入CuratorFramework和锁路径即可。

//创建InterProcessMutex实例:public InterProcessMutex(CuratorFramework client, String path)
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); 

6.1.2、源码-实例化

  首先是我们的实例化方法,这个过程中最重要的就是构建LockInternals对象,这个也是整个锁的最核心的实现。 所在的类的具体路径:org.apache.curator.framework.recipes.locks.InterProcessMutex

	//锁的名字
	private static final String LOCK_NAME = "lock-";

    /**
     * 实例化第一步(增加LockInternalsDriver 驱动)
     * @param client client
     * @param path   the path to lock
     */
    public InterProcessMutex(CuratorFramework client, String path) {
        this(client, path, new StandardLockInternalsDriver());
    }
    /**
     * 实例化第二步
     * @param client client
     * @param path   the path to lock
     * @param driver lock driver
     */
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
        this(client, path, LOCK_NAME, 1, driver);
    }
    /**
     * 实例化第三步
     */
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
    	//校验路径
        basePath = PathUtils.validatePath(path);
        //实例化LockInternals(所有申请锁与释放锁的核心实现)
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }

6.2、获取锁acquire()

6.2.1、本地调用获取锁方法

  传入过期时间和时间单位即可。

	//获取锁:public boolean acquire(long time, TimeUnit unit)
	boolean success = lock.acquire(time, timeUnit); 

6.2.2、源码-internalLock()

    /**
     * 获取锁直到它可用或者时间过期了,同一个线程可以重复调用,也就是可冲入锁
     *
     * @param time time to wait
     * @param unit time unit
     * @return 为true就是获取到了,false就是没有获取到
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }

	private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
		//关于并发性的说明:给定的lockData实例只能由单个线程执行,因此无需锁定
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering,说明是同一个线程:可重入锁
            lockData.lockCount.incrementAndGet();
            return true;
        }
		//核心方法:
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null ) {
        	//路径不为空则获取到锁
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        //未获取到锁
        return false;
    }

  从上面源码可以知道,此方法中会根据线程号获取数据,如果获取到了说明是同一个线程再次进入了,也就是可重入锁,没有获取到则开始调用核心方法attemptLock去获取锁,此方法会返回锁路径,如果锁路径不为空则表示获取到了锁,并把数据放到线程中,否则表示未获取到锁

6.2.3、源码-attemptLock()(重要

  此方法在另一个核心类org.apache.curator.framework.recipes.locks.LockInternals

	String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        final long startMillis = System.currentTimeMillis();
        //把过期时间转为毫秒数
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //传入的lockNodeBytes是null
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        //尝试次数
        int retryCount = 0;
        //锁路径
        String ourPath = null;
        //是否拥有锁,一般是针对该线程
        boolean hasTheLock = false;
        //是否完成
        boolean isDone = false;
        while (!isDone) {
        	//默认会完成,如果异常则未完成
            isDone = true;
            try {
            	//此处的driver就是实例化时的StandardLockInternalsDriver,创建一个临时序列节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //判断当前节点是否获取到了锁
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (KeeperException.NoNodeException e) {
                // 当StandardLockInternalsDriver找不到锁节点时抛出异常
                //这可能发生在会话到期时,等等。因此,如果重试允许,请重试
                if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                	//标志为完成
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
		//判断是否拥有锁
        if (hasTheLock) {
        	//获取到了锁则返回路径
            return ourPath;
        }
        return null;
    }

  完成标志isDone默认为false,只要未完成则循环,先创建临时顺序节点,然后判断当前节点是否获取到了锁,如果获取了则isDone置为true,并返回路径。

6.2.4、源码-createsTheLock()

接下里我们分析下:String ourPath = driver.createsTheLock(client, path, localLockNodeBytes);此方法的具体实现在org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        if (lockNodeBytes != null) {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

6.2.5、源码-internalLockLoop()(核心

  接下里我们分析下:boolean hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);此方法的实现在org.apache.curator.framework.recipes.locks.LockInternals

	//
	private final watcherRemoveCuratorFramework client;
	//也就是对对象引用进行原子级操作,线程安全的
	private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null);
	//curator监听器
	private final CuratorWatcher revocableWatcher = new CuratorWatcher()

	//判断自身是否能够持有锁。如果不能,进入wait,等待被唤醒
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        try {
            if (revocable.get() != null) {
            	//使用CuratorWatcher监听
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
			//客户端状态是启动,并且未获取到锁
            while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            	//获取子节点并排序
                List<String> children = getSortedChildren();
                //就是截取生成路径中的最后一个节点,根据最后一个"/",进行截取
                String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
				//判断是否可以持有锁(当前创建的节点是否在上一步获取到的子节点列表的首位)
				//如果是,说明可以持有锁,那么封装PredicateResults里getsTheLock = true。
				//如果不是,说明有其他线程早已先持有了锁,那么封装PredicateResults里getsTheLock = false,
				//此处还需要获取到自己前一个临时节点的名称pathToWatch
                PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                	//未获取到锁,得到前一个节点的完整路径
                    String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized (this) {
                        try {
                            // 使用getData()而不是exists(),以避免留下不必要的观察者,这是一种资源泄漏
                           //添加监听,监听的是前一个节点
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            //判断过期时间
                            if (millisToWait != null) {
                            	//过期时间不为空
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                //判断剩余等待时间,如果小于等于0则置标志位为true,用于删除节点
                                if (millisToWait <= 0) {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
								//等待响应时间
                                wait(millisToWait);
                            } else {
                            	//过期时间为空,则一直等待
                                wait();
                            }
                        } catch (KeeperException.NoNodeException e) {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        } catch (Exception e) {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        } finally {
            if (doDelete) {
            	//获取到了锁,或者是抛出异常了,删除节点
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

  当客户端状态是启动,并且未获取到锁,进入循环,获取子节点列表并按升序排序,得到当前节点的节点名,然后判断当前节点是否已经获取到到了锁,如果没有获取到则获取它前一个节点名称,然后通过同步方式监听该节点,如果有过期时间则进行响应时间的等待,如果没有则一直等待。如若已经获取到锁则删除相应的节点,这样下一个监听该节点的监听器可以收到通知。

6.2.6、源码-getsTheLock()(重要

    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
    	//获取当前节点的索引
        int ourIndex = children.indexOf(sequenceNodeName);
        //校验索引号
        validateOurIndex(sequenceNodeName, ourIndex);
		//maxLeases为1,如果获取到时0表示获取到了锁
        boolean getsTheLock = ourIndex < maxLeases;
        //如果未获取到锁则获取前一个节点
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
		//把结果封装到PredicateResults里
        return new PredicateResults(pathToWatch, getsTheLock);
    }

  每一段代码都看懂了,我相信大家也就懂了整个的加锁的逻辑了。

6.3、释放锁

6.3.1、本地调用释放锁

	lock.release();

6.3.2、源码-release()(重要

  此方法的实现类在:org.apache.curator.framework.recipes.locks.InterProcessMutex

	/**
	 * 如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放
	 * 如线程已多次调用acquire,当此方法返回时,互斥锁仍将保持。
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
    @Override
    public void release() throws Exception {
		//关于并发性的说明:给定的lockData实例,只能由单个线程执行,因此无需锁定
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
		//可重入计次数减1,因为可重入锁的原因
        int newLockCount = lockData.lockCount.decrementAndGet();
        //判断次数
        if (newLockCount > 0) {
        	//大于则什么都不做,不能释放锁,只是重入锁的处理
            return;
        }
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
        	//释放锁
            internals.releaseLock(lockData.lockPath);
        } finally {
        	//移除线程数据
            threadData.remove(currentThread);
        }
    }

  如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如线程已多次调用acquire,当此方法返回时,互斥锁仍将保持。

6.3.3、源码-releaseLock

	final void releaseLock(String lockPath) throws Exception {
		//移除监听
        client.removeWatchers();
        //把加锁时加入的数据清除
        revocable.set(null);
        //删除节点数据
        deleteOurPath(lockPath);
    }
    
	private void deleteOurPath(String ourPath) throws Exception {
        try  {
            client.delete().guaranteed().forPath(ourPath);
        } catch ( KeeperException.NoNodeException e ) {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

  加锁和释放锁的代码已经解释的很清楚了,也不是很复杂,我们也可以根据这个思路自己实现加锁和释放锁,后续我们就写一个吧。

脚本宝典总结

以上是脚本宝典为你收集整理的SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理全部内容,希望文章能够帮你解决SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。