总体思路
Redis服务器
两台虚拟机,2C4G规格
redis服务部署
客户端
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>RedisTemplateTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.23.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j2-impl -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.23.1</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/com.conversantmedia/disruptor -->
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>1.2.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-redis -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.lettuce/lettuce-core -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
log4j2.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="%style{%d{HH:mm:ss.SSS}}{Magenta} %style{|-}{White}%highlight{%-5p} [%t] %style{%40.40c}{Cyan}:%style{%-3L}{Blue} %style{-|}{White} %m%n%rEx{filters(jdk.internal.reflect,java.lang.reflect,sun.reflect)}" disableAnsi="false" charset="UTF-8"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/app-%d{yyyy-MM-dd-HH}.log" immediateFlush="false" bufferSize="1048576">
<PatternLayout>
<Pattern>%d %p %c{1.} [%t] %m%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy interval="1" />
<SizeBasedTriggeringPolicy size="200MB"/>
</Policies>
</RollingFile>
<Async name="AsyncAppender">
<AppenderRef ref="RollingFile"/>
<blocking>false</blocking>
<LinkedTransferQueue/>
<!-- <DisruptorBlockingQueue/>-->
</Async>
</Appenders>
<Loggers>
<Root level="info">
<!-- <AppenderRef ref="Console"/>-->
<AppenderRef ref="AsyncAppender"/>
</Root>
</Loggers>
</Configuration>
单机功能测试版本
package org.example;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RedPacketNumber {
private static final Logger logger = LoggerFactory.getLogger(RedPacketNumber.class);
public static class RedPacketGiveReq {
private int id;
private int userId;
private int groupId;
private double amount;
private int num;
public RedPacketGiveReq(int userId, int groupId, int id, double amount, int num) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
this.amount = amount;
this.num = num;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
public double getAmount() {
return amount;
}
public int getNum() {
return num;
}
}
public static class RedPacketGrabReq {
private int id;
private int userId;
private int groupId;
public RedPacketGrabReq(int userId, int groupId, int id) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
}
private static final ExecutorService printer = Executors.newFixedThreadPool(1,
(r)->{
Thread thread = new Thread(r);
thread.setName("Thread-printer-" + thread.getId());
return thread;
});
private static final DecimalFormat df = new DecimalFormat("#.00");
private static final String luaPacketGive =
"local exists = redis.call('EXISTS', KEYS[1]) \n" +
"if exists == 1 then \n" +
" return false \n" +
"else \n" +
" redis.call('HSET', KEYS[1], 'amount', ARGV[1] ) \n" +
" redis.call('HSET', KEYS[1], 'num', ARGV[2] ) \n" +
" redis.call('HSET', KEYS[1], 'init_amount', ARGV[3] ) \n" +
" redis.call('HSET', KEYS[1], 'init_num', ARGV[4] ) \n" +
" return true \n" +
"end";
//定义 lua脚步
private static final String luaGrab = // 如果已经抢过红包,直接返回
"local amount = redis.call('HGET', KEYS[2], ARGV[2]) \n " +
"if amount then \n" +
" return '-1' \n" +
"end \r" +
"local num = tonumber(redis.call('HGET',KEYS[1],'num')) \n" +
"if num == 1 then \n" + //最后一个红包,直接返回剩余金额
" redis.call('HSET', KEYS[1], 'num', 0) \n" +
" local amount = redis.call('HGET', KEYS[1], 'amount') \n" +
" redis.call('HSET', KEYS[2], ARGV[2], amount) \n" +
" redis.call('HSET', KEYS[1], 'amount', '0.00' ) \n " +
" return string.format('%.2f',amount) \n" +
"elseif num > 1 then \n" + //剩余金额均值2倍乘以0-1随机数
" redis.call('HSET', KEYS[1], 'num', num-1) \n" +
" local amount = tonumber(redis.call('HGET', KEYS[1], 'amount')) \n" +
" local avg = amount / num \n" +
" local random = math.random() \n" +
" local result = avg * 2 * random \n " +
" if result < 0.01 then \n" +
" result = 0.01 \n"+
" end \n" +
" local res = string.format('%.2f',result) \n " +
" redis.call('HSET', KEYS[2], ARGV[2], res) \n " +
" redis.call('HSET', KEYS[1], 'amount', string.format('%.2f',amount - res)) \n " +
" return res \n" +
"else \n" +
" return '0.00' \n" + //红包抢完,直接返回 0
"end ";
public static void main(String[] args) throws InterruptedException, ExecutionException {
RedisClient redisClient = RedisClient.create("redis://192.168.253.176:6379");
SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(false)
.build();
ClientOptions options = ClientOptions.builder()
.socketOptions(socketOptions)
.build();
redisClient.setOptions(options);
StatefulRedisConnection<String, String> connection1 = redisClient.connect();
RedisAsyncCommands<String, String> asyncCommands1 = connection1.async();
//清理上次数据
String prefix = "2";
RedisFuture<List<String>> delKeys = asyncCommands1.keys(prefix + "*");
delKeys.get().forEach(asyncCommands1::del);
Thread.sleep(3000);
System.out.println("清理上次数据完毕");
//发红包
RedPacketGiveReq redPacketGiveReq = new RedPacketGiveReq(1000000, 2000000, 3000000, 100, 10);
String key = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()));
String grabKey = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()),
"grab");
String[] keys = new String[]{key, grabKey};
{
String amountStr = df.format(redPacketGiveReq.getAmount());
String numStr = String.valueOf(redPacketGiveReq.getNum());
RedisFuture<Boolean> future = asyncCommands1.eval(luaPacketGive, ScriptOutputType.BOOLEAN, keys, amountStr, numStr, amountStr, numStr);
if (future.get() ) {
System.out.println("红包创建成功, key为 " + key + ", 金额为 " + amountStr + ", 红包数为 " + numStr);
} else {
amountStr = asyncCommands1.hget(key, "init_amount").get();
numStr = asyncCommands1.hget(key, "init_num").get();
System.out.println("红包已经存在, key为 " + key + ", 金额为 " + amountStr + ", 红包数为 " + numStr);
}
}
//抢红包
int userNum = 12;
CountDownLatch latch = new CountDownLatch(userNum);
for (int i = 0; i < userNum; i++) {
RedPacketGrabReq redPacketGrabReq = new RedPacketGrabReq(1000000 + i, 2000000, 3000000);
{
RedisFuture<String> result1 = asyncCommands1.eval(luaGrab, ScriptOutputType.VALUE, keys, "", redPacketGrabReq.getUserId() + "");
result1.whenComplete((r, e) -> {
latch.countDown();
printResult(r, e, redPacketGrabReq);
});
}
}
latch.await();
//结果查询
Map<String, String> result = asyncCommands1.hgetall(key).get();
Map<String, String> resultDetial = asyncCommands1.hgetall(grabKey).get();
System.out.println("红包结果: " + result);
System.out.println("红包明细: " + resultDetial);
System.exit(0);
}
private static void printResult(String r, Throwable e, RedPacketGrabReq redPacketGrabReq) {
if (e != null) {
logger.error("用户 " + redPacketGrabReq.getUserId() + " error", e);
} else {
if ("0.00".equals(r)) {
System.out.println("用户 " + redPacketGrabReq.getUserId() + " 抢红包失败,红包已抢完");
} else if ("-1".equals(r))
{
System.out.println("用户 " + redPacketGrabReq.getUserId() + " 抢红包失败,已经抢过红包");
}
else
{
System.out.println("用户 " + redPacketGrabReq.getUserId() + " 抢到金额: " + r);
}
}
}
}
单机性能测试-单连接-版本
package org.example;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DecimalFormat;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class RedPacketTest {
private static final Logger logger = LoggerFactory.getLogger(RedPacketTest.class);
public static class RedPacketGiveReq {
private int id;
private int userId;
private int groupId;
private double amount;
private int num;
public RedPacketGiveReq(int userId, int groupId, int id, double amount, int num) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
this.amount = amount;
this.num = num;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
public double getAmount() {
return amount;
}
public int getNum() {
return num;
}
}
public static class RedPacketGrabReq {
private int id;
private int userId;
private int groupId;
public RedPacketGrabReq(int userId, int groupId, int id) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
}
// private static final ExecutorService printer = Executors.newFixedThreadPool(1,
// (r)->{
// Thread thread = new Thread(r);
// thread.setName("Thread-printer-" + thread.getId());
// return thread;
// });
private static final DecimalFormat df = new DecimalFormat("#.00");
private static final String luaPacketGive =
"local exists = redis.call('EXISTS', KEYS[1]) \n" +
"if exists == 1 then \n" +
" return false \n" +
"else \n" +
" redis.call('HSET', KEYS[1], 'user_id', ARGV[1] ) \n" +
" redis.call('HSET', KEYS[1], 'amount', ARGV[2] ) \n" +
" redis.call('HSET', KEYS[1], 'num', ARGV[3] ) \n" +
" redis.call('HSET', KEYS[1], 'init_amount', ARGV[4] ) \n" +
" redis.call('HSET', KEYS[1], 'init_num', ARGV[5] ) \n" +
" return true \n" +
"end";
//定义 lua脚步
private static final String luaGrab = // 如果已经抢过红包,直接返回
"local amount = redis.call('HGET', KEYS[2], ARGV[2]) \n " +
"if amount then \n" +
" return '-1' \n" +
"end \r" +
"local num = tonumber(redis.call('HGET',KEYS[1],'num')) \n" +
"if num == 1 then \n" + //最后一个红包,直接返回剩余金额
" redis.call('HSET', KEYS[1], 'num', 0) \n" +
" local amount = redis.call('HGET', KEYS[1], 'amount') \n" +
" redis.call('HSET', KEYS[2], ARGV[2], amount) \n" +
" redis.call('HSET', KEYS[1], 'amount', '0.00' ) \n " +
" return string.format('%.2f',amount) \n" +
"elseif num > 1 then \n" + //剩余金额均值2倍乘以0-1随机数
" redis.call('HSET', KEYS[1], 'num', num-1) \n" +
" local amount = tonumber(redis.call('HGET', KEYS[1], 'amount')) \n" +
" local avg = amount / num \n" +
" local random = math.random() \n" +
" local result = avg * 2 * random \n " +
" if result < 0.01 then \n" +
" result = 0.01 \n"+
" end \n" +
" local res = string.format('%.2f',result) \n " +
" redis.call('HSET', KEYS[2], ARGV[2], res) \n " +
" redis.call('HSET', KEYS[1], 'amount', string.format('%.2f',amount - res)) \n " +
" return res \n" +
"else \n" +
" return '0.00' \n" + //红包抢完,直接返回 0
"end ";
private static final Random random = new Random();
private static final int base = 100000000;
private static final int LIMIT = (int)(Runtime.getRuntime().freeMemory() / 1024 / 4 ) ;
static {
// logger.info("LIMIT: {}", LIMIT);
System.out.println("LIMIT: " + LIMIT);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
int times = args.length > 0 ? Integer.parseInt(args[0]) : 1;
//redis服务器 虚拟机 2C4G 规格
RedisClient redisClient = RedisClient.create("redis://192.168.253.176:6379");
SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(false)
.build();
ClientOptions options = ClientOptions.builder()
.socketOptions(socketOptions)
.build();
redisClient.setOptions(options);
StatefulRedisConnection<String, String> connection1 = redisClient.connect();
RedisAsyncCommands<String, String> asyncCommands1 = connection1.async();
//清理上次数据
logger.info("清理上次数据");
String prefix = "2";
RedisFuture<List<String>> delKeys = asyncCommands1.keys(prefix + "*");
asyncCommands1.del(delKeys.get().toArray(new String[0])).get();
// Thread.sleep(6000);
System.out.println("清理上次数据完毕");
int counter = 0;
AtomicInteger latchCount = new AtomicInteger(0);
long begin = System.currentTimeMillis();
for(int c = 0 ; c < times; c++) {
int userId = base + random.nextInt(base);
int groupId = base * 2 + random.nextInt(base);
int id = base * 3 + random.nextInt(base);
double amount = 100 + random.nextInt(100);
int num = 1 + random.nextInt(10);
//发红包
RedPacketGiveReq redPacketGiveReq = new RedPacketGiveReq(userId, groupId, id, amount, num);
String key = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()));
String grabKey = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()),
"grab");
String[] keys = new String[]{key, grabKey};
{
final String userIdStr = String.valueOf(redPacketGiveReq.getUserId());
final String amountStr = df.format(redPacketGiveReq.getAmount());
final String numStr = String.valueOf(redPacketGiveReq.getNum());
RedisFuture<Boolean> future = asyncCommands1.eval(luaPacketGive,
ScriptOutputType.BOOLEAN, keys, userIdStr,
amountStr, numStr, amountStr, numStr);
latchCount.incrementAndGet();
counter++;
future.whenComplete((r, e) -> {
latchCount.decrementAndGet();
print(r, e, userIdStr, key, amountStr, numStr, asyncCommands1);
});
}
//抢红包
int userNum = num + random.nextInt(10);
for (int i = 0; i < userNum; i++) {
RedPacketGrabReq redPacketGrabReq = new RedPacketGrabReq(base + random.nextInt(base), groupId, id);
{
RedisFuture<String> result1 = asyncCommands1.eval(luaGrab, ScriptOutputType.VALUE, keys, "", redPacketGrabReq.getUserId() + "");
latchCount.incrementAndGet();
counter++;
result1.whenComplete((r, e) -> {
latchCount.decrementAndGet();
printResult(r, e,key, redPacketGrabReq);
});
}
}
if( latchCount.get() > LIMIT )
{
Thread.yield();
}
}
while ( latchCount.get() > 0 )
{
Thread.yield();
}
long end = System.currentTimeMillis();
long delta = end - begin;
Thread.sleep(2000);
System.out.printf("总计 %s 次请求 %n", counter);
System.out.printf("总计耗时 %s ms %n", delta);
System.out.printf("平均耗时 %s ms %n", delta / counter);
System.out.printf("TPS: %s 次/秒 %n", counter * 1000L / delta);
System.exit(0);
}
private static void print(Boolean r, Throwable e, String userIdStr, String key, String amountStr, String numStr, RedisAsyncCommands<String, String> asyncCommands1) {
if (e != null) {
logger.info("用户 {} 红包创建失败, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, amountStr, numStr);
}
else
{
if (r) {
logger.info("用户 {} 红包创建成功, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, amountStr, numStr);
} else {
try {
String intiAmountStr = asyncCommands1.hget(key, "init_amount").get();
String initNumStr = asyncCommands1.hget(key, "init_num").get();
logger.info("用户 {} 的红包已经存在, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, intiAmountStr, initNumStr);
} catch (InterruptedException | ExecutionException ex) {
logger.info("用户 {} 的红包已经存在, key为 {}, 异常:{}", userIdStr, key, ex.getMessage());
}
}
}
}
private static void printResult(String r, Throwable e,String key, RedPacketGrabReq redPacketGrabReq) {
if (e != null) {
logger.error("用户 {}抢红包{}, error", redPacketGrabReq.getUserId(), key, e);
} else {
if ("0.00".equals(r)) {
logger.info("用户 {} 抢红包 {} 失败,红包已抢完", redPacketGrabReq.getUserId(), key);
} else if ("-1".equals(r))
{
logger.info("用户 {} 抢红包 {} 失败,已经抢过红包", redPacketGrabReq.getUserId(), key);
}
else
{
logger.info("用户 {} 抢红包 {} 抢到金额: {}", redPacketGrabReq.getUserId(), key, r);
}
}
}
}
单机性能测试-2个连接-版本
package org.example;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class RedPacketTest2 {
private static final Logger logger = LoggerFactory.getLogger(RedPacketTest2.class);
public static class RedPacketGiveReq {
private int id;
private int userId;
private int groupId;
private double amount;
private int num;
public RedPacketGiveReq(int userId, int groupId, int id, double amount, int num) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
this.amount = amount;
this.num = num;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
public double getAmount() {
return amount;
}
public int getNum() {
return num;
}
}
public static class RedPacketGrabReq {
private int id;
private int userId;
private int groupId;
public RedPacketGrabReq(int userId, int groupId, int id) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
}
// private static final ExecutorService printer = Executors.newFixedThreadPool(1,
// (r)->{
// Thread thread = new Thread(r);
// thread.setName("Thread-printer-" + thread.getId());
// return thread;
// });
private static final DecimalFormat df = new DecimalFormat("#.00");
private static final String luaPacketGive =
"local exists = redis.call('EXISTS', KEYS[1]) \n" +
"if exists == 1 then \n" +
" return false \n" +
"else \n" +
" redis.call('HSET', KEYS[1], 'user_id', ARGV[1] ) \n" +
" redis.call('HSET', KEYS[1], 'amount', ARGV[2] ) \n" +
" redis.call('HSET', KEYS[1], 'num', ARGV[3] ) \n" +
" redis.call('HSET', KEYS[1], 'init_amount', ARGV[4] ) \n" +
" redis.call('HSET', KEYS[1], 'init_num', ARGV[5] ) \n" +
" return true \n" +
"end";
//定义 lua脚步
private static final String luaGrab = // 如果已经抢过红包,直接返回
"local amount = redis.call('HGET', KEYS[2], ARGV[2]) \n " +
"if amount then \n" +
" return '-1' \n" +
"end \r" +
"local num = tonumber(redis.call('HGET',KEYS[1],'num')) \n" +
"if num == nil then \n" +
" return '-2' \n" +
"elseif num == 1 then \n" + //最后一个红包,直接返回剩余金额
" redis.call('HSET', KEYS[1], 'num', 0) \n" +
" local amount = redis.call('HGET', KEYS[1], 'amount') \n" +
" redis.call('HSET', KEYS[2], ARGV[2], amount) \n" +
" redis.call('HSET', KEYS[1], 'amount', '0.00' ) \n " +
" return string.format('%.2f',amount) \n" +
"elseif num > 1 then \n" + //剩余金额均值2倍乘以0-1随机数
" redis.call('HSET', KEYS[1], 'num', num-1) \n" +
" local amount = tonumber(redis.call('HGET', KEYS[1], 'amount')) \n" +
" local avg = amount / num \n" +
" local random = math.random() \n" +
" local result = avg * 2 * random \n " +
" if result < 0.01 then \n" +
" result = 0.01 \n"+
" end \n" +
" local res = string.format('%.2f',result) \n " +
" redis.call('HSET', KEYS[2], ARGV[2], res) \n " +
" redis.call('HSET', KEYS[1], 'amount', string.format('%.2f',amount - res)) \n " +
" return res \n" +
"else \n" +
" return '0.00' \n" + //红包抢完,直接返回 0
"end ";
private static final Random random = new Random();
private static final int base = 100000000;
private static final int LIMIT = (int)(Runtime.getRuntime().freeMemory() / 1024 / 4 ) ;
static {
// logger.info("LIMIT: {}", LIMIT);
System.out.println("LIMIT: " + LIMIT);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
int times = args.length > 0 ? Integer.parseInt(args[0]) : 1;
RedisClient redisClient = RedisClient.create("redis://192.168.253.176:6379");
SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(false)
.build();
ClientOptions options = ClientOptions.builder()
.socketOptions(socketOptions)
.build();
redisClient.setOptions(options);
StatefulRedisConnection<String, String> connection1 = redisClient.connect();
StatefulRedisConnection<String, String> connection2 = redisClient.connect();
RedisAsyncCommands<String, String> asyncCommands1 = connection1.async();
RedisAsyncCommands<String, String> asyncCommands2 = connection2.async();
RedisAsyncCommands<String, String>[] commands = new RedisAsyncCommands[]{
asyncCommands1,
asyncCommands2
};
//清理上次数据
logger.info("清理上次数据");
String prefix = "2";
RedisFuture<List<String>> delKeys = asyncCommands1.keys(prefix + "*");
delKeys.get().forEach(asyncCommands1::del);
asyncCommands1.del(delKeys.get().toArray(new String[0])).get();
System.out.println("清理上次数据完毕");
int counter = 0;
AtomicInteger latchCount = new AtomicInteger(0);
long begin = System.currentTimeMillis();
for(int c = 0 ; c < times; c++) {
int userId = base + random.nextInt(base);
int groupId = base * 2 + random.nextInt(base);
int id = base * 3 + random.nextInt(base);
double amount = 100 + random.nextInt(100);
int num = 1 + random.nextInt(10);
//发红包
RedPacketGiveReq redPacketGiveReq = new RedPacketGiveReq(userId, groupId, id, amount, num);
String key = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()));
String grabKey = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()),
"grab");
String[] keys = new String[]{key, grabKey};
{
final String userIdStr = String.valueOf(redPacketGiveReq.getUserId());
final String amountStr = df.format(redPacketGiveReq.getAmount());
final String numStr = String.valueOf(redPacketGiveReq.getNum());
RedisFuture<Boolean> future = commands[counter & 1].eval(luaPacketGive,
ScriptOutputType.BOOLEAN, keys, userIdStr,
amountStr, numStr, amountStr, numStr);
counter++;
future.whenComplete((r, e) -> {
latchCount.incrementAndGet();
print(r, e, userIdStr, key, amountStr, numStr, asyncCommands1);
});
}
//抢红包
int userNum = num + random.nextInt(10);
for (int i = 0; i < userNum; i++) {
RedPacketGrabReq redPacketGrabReq = new RedPacketGrabReq(base + random.nextInt(base), groupId, id);
{
RedisFuture<String> result1 = commands[counter & 1]
.eval(luaGrab, ScriptOutputType.VALUE, keys, "", redPacketGrabReq.getUserId() + "");
counter++;
result1.whenComplete((r, e) -> {
latchCount.incrementAndGet();
printResult(r, e,key, redPacketGrabReq);
});
}
}
if( counter - latchCount.get() > LIMIT )
{
Thread.yield();
}
}
while (latchCount.get() < counter)
{
Thread.yield();
}
long end = System.currentTimeMillis();
long delta = end - begin;
Thread.sleep(2000);
System.out.printf("总计 %s 次请求 %n", counter);
System.out.printf("总计耗时 %s ms %n", delta);
System.out.printf("平均耗时 %s ms %n", delta / counter);
System.out.printf("TPS: %s 次/秒 %n", counter * 1000L / delta);
System.exit(0);
}
private static void print(Boolean r, Throwable e, String userIdStr, String key, String amountStr, String numStr, RedisAsyncCommands<String, String> asyncCommands1) {
if (e != null) {
logger.info("用户 {} 红包创建失败, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, amountStr, numStr);
}
else
{
if (r) {
logger.info("用户 {} 红包创建成功, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, amountStr, numStr);
} else {
try {
String intiAmountStr = asyncCommands1.hget(key, "init_amount").get();
String initNumStr = asyncCommands1.hget(key, "init_num").get();
logger.info("用户 {} 的红包已经存在, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, intiAmountStr, initNumStr);
} catch (InterruptedException | ExecutionException ex) {
logger.info("用户 {} 的红包已经存在, key为 {}, 异常:{}", userIdStr, key, ex.getMessage());
}
}
}
}
private static void printResult(String r, Throwable e,String key, RedPacketGrabReq redPacketGrabReq) {
if (e != null) {
logger.error("用户 {}抢红包{}, error", redPacketGrabReq.getUserId(), key, e);
} else {
if ("0.00".equals(r)) {
logger.info("用户 {} 抢红包 {} 失败,红包已抢完", redPacketGrabReq.getUserId(), key);
} else if ("-1".equals(r))
{
logger.info("用户 {} 抢红包 {} 失败,已经抢过红包", redPacketGrabReq.getUserId(), key);
} else if ("-2".equals(r)) {
//乱序处理
logger.info("用户 {} 抢红包 {} 失败,红包不存在", redPacketGrabReq.getUserId(), key);
} else
{
logger.info("用户 {} 抢红包 {} 抢到金额: {}", redPacketGrabReq.getUserId(), key, r);
}
}
}
}
2台Redis服务-性能测试版本
package org.example;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class RedPacketTest3 {
private static final Logger logger = LoggerFactory.getLogger(RedPacketTest3.class);
public static class RedPacketGiveReq {
private int id;
private int userId;
private int groupId;
private double amount;
private int num;
public RedPacketGiveReq(int userId, int groupId, int id, double amount, int num) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
this.amount = amount;
this.num = num;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
public double getAmount() {
return amount;
}
public int getNum() {
return num;
}
}
public static class RedPacketGrabReq {
private int id;
private int userId;
private int groupId;
public RedPacketGrabReq(int userId, int groupId, int id) {
this.userId = userId;
this.groupId = groupId;
this.id = id;
}
public int getId() {
return id;
}
public int getUserId() {
return userId;
}
public int getGroupId() {
return groupId;
}
}
// private static final ExecutorService printer = Executors.newFixedThreadPool(1,
// (r)->{
// Thread thread = new Thread(r);
// thread.setName("Thread-printer-" + thread.getId());
// return thread;
// });
private static final DecimalFormat df = new DecimalFormat("#.00");
private static final String luaPacketGive =
"local exists = redis.call('EXISTS', KEYS[1]) \n" +
"if exists == 1 then \n" +
" return false \n" +
"else \n" +
" redis.call('HSET', KEYS[1], 'user_id', ARGV[1] ) \n" +
" redis.call('HSET', KEYS[1], 'amount', ARGV[2] ) \n" +
" redis.call('HSET', KEYS[1], 'num', ARGV[3] ) \n" +
" redis.call('HSET', KEYS[1], 'init_amount', ARGV[4] ) \n" +
" redis.call('HSET', KEYS[1], 'init_num', ARGV[5] ) \n" +
" return true \n" +
"end";
//定义 lua脚步
private static final String luaGrab = // 如果已经抢过红包,直接返回
"local amount = redis.call('HGET', KEYS[2], ARGV[2]) \n " +
"if amount then \n" +
" return '-1' \n" +
"end \r" +
"local num = tonumber(redis.call('HGET',KEYS[1],'num')) \n" +
"if num == nil then \n" +
" return '-2' \n" +
"elseif num == 1 then \n" + //最后一个红包,直接返回剩余金额
" redis.call('HSET', KEYS[1], 'num', 0) \n" +
" local amount = redis.call('HGET', KEYS[1], 'amount') \n" +
" redis.call('HSET', KEYS[2], ARGV[2], amount) \n" +
" redis.call('HSET', KEYS[1], 'amount', '0.00' ) \n " +
" return string.format('%.2f',amount) \n" +
"elseif num > 1 then \n" + //剩余金额均值2倍乘以0-1随机数
" redis.call('HSET', KEYS[1], 'num', num-1) \n" +
" local amount = tonumber(redis.call('HGET', KEYS[1], 'amount')) \n" +
" local avg = amount / num \n" +
" local random = math.random() \n" +
" local result = avg * 2 * random \n " +
" if result < 0.01 then \n" +
" result = 0.01 \n"+
" end \n" +
" local res = string.format('%.2f',result) \n " +
" redis.call('HSET', KEYS[2], ARGV[2], res) \n " +
" redis.call('HSET', KEYS[1], 'amount', string.format('%.2f',amount - res)) \n " +
" return res \n" +
"else \n" +
" return '0.00' \n" + //红包抢完,直接返回 0
"end ";
private static final Random random = new Random();
private static final int base = 100000000;
private static final int LIMIT = (int)(Runtime.getRuntime().freeMemory() / 1024 / 4 ) ;
static {
// logger.info("LIMIT: {}", LIMIT);
System.out.println("LIMIT: " + LIMIT);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
int times = args.length > 0 ? Integer.parseInt(args[0]) : 1;
//两台redis,虚拟机规格都是2C4G
RedisClient redisClient1 = RedisClient.create("redis://192.168.253.176:6379");
RedisClient redisClient2 = RedisClient.create("redis://192.168.253.201:6379");
SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(false)
.build();
ClientOptions options = ClientOptions.builder()
.socketOptions(socketOptions)
.build();
redisClient1.setOptions(options);
redisClient2.setOptions(options);
StatefulRedisConnection<String, String> connection1 = redisClient1.connect();
StatefulRedisConnection<String, String> connection2 = redisClient2.connect();
RedisAsyncCommands<String, String> asyncCommands1 = connection1.async();
RedisAsyncCommands<String, String> asyncCommands2 = connection2.async();
RedisAsyncCommands<String, String>[] commands = new RedisAsyncCommands[]{
asyncCommands1,
asyncCommands2
};
//清理上次数据
logger.info("清理上次数据");
String prefix = "2";
RedisFuture<List<String>> delKeys = asyncCommands1.keys(prefix + "*");
asyncCommands1.del(delKeys.get().toArray(new String[0])).get();
delKeys = asyncCommands2.keys(prefix + "*");
asyncCommands2.del(delKeys.get().toArray(new String[0])).get();
System.out.println("清理上次数据完毕");
int counter = 0;
AtomicInteger latchCount = new AtomicInteger(0);
long begin = System.currentTimeMillis();
for(int c = 0 ; c < times; c++) {
int userId = base + random.nextInt(base);
int groupId = base * 2 + random.nextInt(base);
int id = base * 3 + random.nextInt(base);
double amount = 100 + random.nextInt(100);
int num = 1 + random.nextInt(10);
//发红包
RedPacketGiveReq redPacketGiveReq = new RedPacketGiveReq(userId, groupId, id, amount, num);
String key = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()));
String grabKey = String.join("_",
String.valueOf(redPacketGiveReq.getGroupId()),
String.valueOf(redPacketGiveReq.getId()),
"grab");
RedisAsyncCommands<String,String> command = commands[key.hashCode() & 1];
String[] keys = new String[]{key, grabKey};
{
final String userIdStr = String.valueOf(redPacketGiveReq.getUserId());
final String amountStr = df.format(redPacketGiveReq.getAmount());
final String numStr = String.valueOf(redPacketGiveReq.getNum());
RedisFuture<Boolean> future = command.eval(luaPacketGive,
ScriptOutputType.BOOLEAN, keys, userIdStr,
amountStr, numStr, amountStr, numStr);
counter++;
latchCount.incrementAndGet();
future.whenComplete((r, e) -> {
latchCount.decrementAndGet();
print(r, e, userIdStr, key, amountStr, numStr, command );
});
}
//抢红包
int userNum = num + random.nextInt(10);
for (int i = 0; i < userNum; i++) {
RedPacketGrabReq redPacketGrabReq = new RedPacketGrabReq(base + random.nextInt(base), groupId, id);
{
RedisFuture<String> result1 = command
.eval(luaGrab, ScriptOutputType.VALUE, keys, "", redPacketGrabReq.getUserId() + "");
counter++;
latchCount.incrementAndGet();
result1.whenComplete((r, e) -> {
latchCount.decrementAndGet();
printResult(r, e,key, redPacketGrabReq);
});
}
}
if( latchCount.get() > LIMIT )
{
Thread.yield();
}
}
while (latchCount.get() > 0)
{
Thread.yield();
}
long end = System.currentTimeMillis();
long delta = end - begin;
Thread.sleep(2000);
System.out.printf("总计 %s 次请求 %n", counter);
System.out.printf("总计耗时 %s ms %n", delta);
System.out.printf("平均耗时 %s ms %n", delta / counter);
System.out.printf("TPS: %s 次/秒 %n", counter * 1000L / delta);
System.exit(0);
}
private static void print(Boolean r, Throwable e, String userIdStr, String key, String amountStr, String numStr, RedisAsyncCommands<String, String> asyncCommands1) {
if (e != null) {
logger.info("用户 {} 红包创建失败, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, amountStr, numStr);
}
else
{
if (r) {
logger.info("用户 {} 红包创建成功, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, amountStr, numStr);
} else {
try {
String intiAmountStr = asyncCommands1.hget(key, "init_amount").get();
String initNumStr = asyncCommands1.hget(key, "init_num").get();
logger.info("用户 {} 的红包已经存在, key为 {}, 金额为 {}, 红包数为 {}", userIdStr, key, intiAmountStr, initNumStr);
} catch (InterruptedException | ExecutionException ex) {
logger.info("用户 {} 的红包已经存在, key为 {}, 异常:{}", userIdStr, key, ex.getMessage());
}
}
}
}
private static void printResult(String r, Throwable e,String key, RedPacketGrabReq redPacketGrabReq) {
if (e != null) {
logger.error("用户 {}抢红包{}, error", redPacketGrabReq.getUserId(), key, e);
} else {
if ("0.00".equals(r)) {
logger.info("用户 {} 抢红包 {} 失败,红包已抢完", redPacketGrabReq.getUserId(), key);
} else if ("-1".equals(r))
{
logger.info("用户 {} 抢红包 {} 失败,已经抢过红包", redPacketGrabReq.getUserId(), key);
} else if ("-2".equals(r)) {
//乱序处理
logger.info("用户 {} 抢红包 {} 失败,红包不存在", redPacketGrabReq.getUserId(), key);
} else
{
logger.info("用户 {} 抢红包 {} 抢到金额: {}", redPacketGrabReq.getUserId(), key, r);
}
}
}
}