之前的博客介绍过 zookeeper 的分布式锁,只不过是基于 Spring 的实现(技术太老了),现在肯定使用 SpringBoot 进行实现,因此有必要再写一篇博客。
有关 zookeeper 的部署,以及分布式锁细节,这里不再赘述,可以访问我之前编写的博客。
zookeeper 的单机和集群部署:https://www.cnblogs.com/studyjobs/p/18227639.html
使用 zookeeper 实现分布式锁:https://www.cnblogs.com/studyjobs/p/16488794.html
一、搭建工程
新建一个名称为 springboot_zk_lock 的 springboot 工程,结构如下图所示:
为了简单,本 demo 操作数据库就不写 service 了,直接使用 mapper 进行操作,首先看一下 pom 文件引用的依赖包:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jobs</groupId>
<artifactId>springboot_zk_lock</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--导入 curator 的相关依赖包-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.0</version>
</dependency>
<!--导入 druid 连接池依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.5</version>
</plugin>
</plugins>
</build>
</project>
本 demo 实现的简单案例就是:提供一个 http 接口,每次调用就相当于购买了 1 件商品,库存数量减一,直到数量为零为止。
这里简单的创建了一个 test 数据库,里面只有一个结构非常简单的表 stock,具体建表的 sql 语句如下:
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`num` int(11) NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `stock` VALUES (1, 10);
将该工程部署多个,那么互相都是独立的,此时就无法使用 synchronized 来控制线程的并发访问了,只能使用分布式锁。
本 demo 实现使用 zookeeper 实现分布式锁,连接操作 zookeeper 使用的是第三方提供的 curator 相关的依赖包。
我们看一下 application.yml 配置文件,有关 zookeeper 的连接信息,需要我们自己定义配置内容:
server:
port: 8080
spring:
datasource:
# 使用 druid 连接池
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.136.128:3306/test?serverTimeZone=Asia/Shanghai&useSSL=false
username: root
password: root
# 自定义编写的连接 zookeeper 的配置信息
zk:
# 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址
# 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3
connectString: 192.168.136.128:2181
# zookeeper的会话超时时间(单位:毫秒,默认是 60 秒)
sessionTimeoutMs: 60000
# zookeeper的连接超时时间(单位:毫秒,默认是 15 秒)
connectionTimeoutMs: 15000
# zookeeper默认操作的根节点,所有的增删改查操作,默认在该节点下进行
namespace: jobs
二、代码细节
数据库表 stock 只有 2 个字段,建立一个实体类:
package com.jobs.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@TableName("stock")
@Data
public class Stock {
@TableId
private Integer id;
/**
* 库存量
*/
private Integer num;
}
由于采用了 mybatis plus 技术,因此 mapper 的编写非常简单:
package com.jobs.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jobs.entity.Stock;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface StockMapper extends BaseMapper<Stock> {
}
接下来就是编写 zookeeper 的配置类,从 application.yml 中读取配置信息,创建 CuratorFramework 实例,加载都 Spring 容器中
package com.jobs.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class zkconfig {
@Value("${zk.connectString}")
private String connectString;
@Value("${zk.sessionTimeoutMs}")
private Integer sessionTimeoutMs;
@Value("${zk.connectionTimeoutMs}")
private Integer connectionTimeoutMs;
@Value("${zk.namespace}")
private String namespace;
//获取 Curator 的客户端连接
@Bean
public CuratorFramework getCuratorFramework(){
//重试策略,如果连接失败,最多重试 3 次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.namespace(namespace)
.retryPolicy(retryPolicy)
.build();
client.start();
return client;
}
}
最后就是提供一个 http 接口,可以通过浏览器进行访问
package com.jobs.controller;
import com.jobs.entity.Stock;
import com.jobs.mapper.StockMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RequestMapping("/stock")
@RestController
public class StockController {
@Autowired
private CuratorFramework curatorFramework;
@Autowired
private StockMapper stockMapper;
//为了简化逻辑,每次购买 1 件商品
@GetMapping("/buy")
public String stock() {
String result;
InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/mylock");
try {
//在 2 秒钟内,不断尝试获取锁,如果获得则继续执行,否则直接结束
boolean locked = mutex.acquire(2000, TimeUnit.MILLISECONDS);
if (locked) {
Stock stock = stockMapper.selectById(1);
if (stock.getNum() > 0) {
stock.setNum(stock.getNum() - 1);
stockMapper.updateById(stock);
result = "商品库存扣减成功,剩余库存:" + stock.getNum();
} else {
result = "商品库存不足!";
}
//释放锁
mutex.release();
} else {
result = "没有获取到锁,不能执行减库存操作!";
}
} catch (Exception ex) {
result = "出现异常:" + ex.getMessage();
}
return result;
}
}
有关 zookeeper 的分布式锁的测试效果,这里就介绍了。测试方案就是把本 demo 工程至少部署 2 个节点,然后使用浏览器频繁访问每个节点。
当然你也可以编写程序去频繁调用每个节点的接口,或者使用 nginx 对每个节点进行转发,然后使用 jemeter 压力测试工具去调用 nginx 接口。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/springboot_zk_lock.zip