常见的业务场景:x年x月x日x点x分x秒,限时抢购10件商品。前提:分布式的环境,多用户高并发访问。
- 依赖的 jar 包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qjl</groupId>
<artifactId>hdfs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- zookeeper所需依赖 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
- 待抢购的商品:
class Resource {
/**
* 公共资源(可以看成10件商品)
*/
private static AtomicInteger count = new AtomicInteger(10);
/**
* 公共的服务(可以看成购买一件商品)
*/
public static void commonService() {
System.out.println("***********抢购开始 **********");
int cur = count.decrementAndGet();
System.out.println("当前剩余商品:" + cur);
// 睡2秒,模拟线程切换
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("***********抢购结束 **********");
}
}
- 抢购任务:
/**
* 抢购任务
* @author qujianlei
*/
class Task implements Runnable {
private InterProcessMutex lock;
public Task(InterProcessMutex lock) {
this.lock = lock;
}
@Override
public void run() {
try {
// 执行任务之前先获得锁
lock.acquire();
} catch (Exception e) {
e.printStackTrace();
}
Resource.commonService();
// 闭锁数量减一
SynLock.latch.countDown();
try {
// 任务完成之后释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 使用闭锁确保所有线程执行完任务以后再关闭 zookeeper 客户端:
/**
* 通过闭锁来实现异步转同步的操作
* @author qujianlei
*/
class SynLock {
public static CountDownLatch latch = new CountDownLatch(10);
}
- 主程序:
/**
* 利用zookeeper的分布式锁实现秒杀功能
* @author qujianlei
*/
public class TestDistributedLock {
public static void main(String[] args) {
// 定义客户端重试策略
RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);
// 定义ZK的一个客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(policy)
.build();
// 在zk生成锁 --> 就是zk的目录
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/mylock");
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 启动10个线程去调用公共的服务
for (int i = 0; i < 10; i++) {
executorService.execute(new Task(lock));
}
executorService.shutdown();
try {
// 在闭锁的数量减到0之前,当前线程一直处于阻塞状态,确保zookeeper客户端关闭前,所有的抢购任务可以执行完毕
SynLock.latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
}
}