redis 接口
redis是c语言写的,有c接口,无c++接口。在一个负载均衡服务器项目中,用过一个redis的c++接口类。简单粗暴上代码吧,需要的自行移植一下。
/*
* redis_interface.cpp
* Author: jacky
* Data: 2019-4-12
*
*/
#include <stdio.h>
#include <sstream>
#include <string.h>
#include <assert.h>
#include <pthread.h>
#include <hiredis/hiredis.h>
#include <stdlib.h>
#include <hiredis/sds.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <iostream>
#include <fstream>
#include "varstr.h"
#include "redis_interface.h"
#include <sys/syslog.h>
#include <linux/limits.h>
using namespace ws_redis;
using namespace std;
RedisManager *RedisManager::_instance = 0;
enum LogLevel {
Fatal = LOG_EMERG, //0 最高级,致命的
Alert = LOG_ALERT, //1 最高级,必须采取措施
Crit = LOG_CRIT, //2 最高级,临界状态
Error = LOG_ERR, //3 高级,错误
Warn = LOG_WARNING, //4 高级,警告
Notice = LOG_NOTICE, //5 高级,正常但重要
Info = LOG_INFO, //6 高级,一般信息
Debug = LOG_DEBUG, //7 一般,调试信息
};
#define MAXLOGLINE 1024
std::ofstream fileout; // 日志文件句柄
//static long __log_lock = 0; // 加锁计数器
char * pDevLevel = getenv("DEV_LOG_LEVEL"); // 获取环境变量,系统默认日志等级
int syslogDevLevel = (NULL == pDevLevel) ? Debug : atol(pDevLevel);
// 等级名称
const char * levelnames[] = { "Fatal - ", "Alert - ", "Crit - ", "Error - ", "Warn - ", "Notice - ", "Info - ", "Debug - ", " - " };
const char * LevelToName(int l) {
return levelnames[l];
}
std::string Get_Current_Date(void)
{
time_t nowtime;
nowtime = time(NULL); //获取日历时间
char tmp[64];
//strftime(tmp,sizeof(tmp),"%Y-%m-%d %H:%M:%S",localtime(&nowtime));
strftime(tmp,sizeof(tmp),"%Y-%m-%d",localtime(&nowtime));
return tmp;
}
std::string Get_Current_Date_All(void)
{
time_t nowtime;
nowtime = time(NULL); //获取日历时间
char tmp[64];
strftime(tmp,sizeof(tmp),"%Y-%m-%d %H:%M:%S",localtime(&nowtime));
return tmp;
}
// 获取当前时间
std::string safe_time()
{
time_t t = time(NULL);
char *ct = ctime(&t);
if(ct){
size_t len = strlen(ct);
if(len > 0){
ct[len - 1] = '\0';
return std::string(ct, len - 1);
}
}
return "empty time";
}
//初始化日志,打开日志文件流
void initLog(void) {
char buffer[PATH_MAX * 2];
//ZeroMemory(buffer, sizeof(char) * PATH_MAX * 2);
memset(buffer, 0, sizeof(char) * PATH_MAX * 2);
sprintf(buffer, "./log/%sprotocol.log", Get_Current_Date().c_str());
fileout.open(buffer, std::ios_base::app);
fileout << "program start: " << Get_Current_Date_All() << std::endl;
}
// 获取日志等级
int getLogLevel()
{
return syslogDevLevel;
}
// 设置日志等级,返回旧的等级(这里不修改环境变量,防止更改所有进程的输出等级)
int setLogLevel(int l)
{
int old_level = syslogDevLevel;
syslogDevLevel = l;
return old_level;
}
void logxxx(int l, char *file, int line, const char *fmt, ...)
{
va_list param;
va_start(param, fmt);
// 锁
/*while (true)
{
if (InterlockedIncrement(&__log_lock) == 1)
{
break;
}
InterlockedDecrement(&__log_lock);
}*/
if (l <= syslogDevLevel)
{
char buf[MAXLOGLINE * 4];
vsnprintf(buf, MAXLOGLINE * 4, fmt, param);
fileout << Get_Current_Date_All() << " " << file << line << ":" << LevelToName(l) << buf << std::endl;
fileout.flush();
}
//InterlockedDecrement(&__log_lock);
va_end(param);
}
//#define log(...)
#define log(l, ...) do{logxxx(l, (char *)__FILE__, __LINE__, __VA_ARGS__); }while(0)
//--------------------------------------------------------
// reids命令实行时间检测器
//--------------------------------------------------------
RedisCmdTimeTesting::RedisCmdTimeTesting()
{
gettimeofday(&m_begin, NULL);
}
RedisCmdTimeTesting::~RedisCmdTimeTesting()
{
gettimeofday(&m_end, NULL);
uint64_t ucost = 1000000 * (m_end.tv_sec - m_begin.tv_sec) + (m_end.tv_usec - m_begin.tv_usec); // 秒*1 000 000 + 微秒 (高精度)
log(Info,"redis_cmd_time! log:%s, ucost:%lu ", m_log.c_str(), ucost); // 输出日志
//std::cout << m_log.c_str() << ucost << std::endl;
}
void RedisCmdTimeTesting::Log(PRedisContext pRD, const char* fmt, ...)
{
static char buf[WS_REDIS_LOG_LENGTH_MAX];
va_list ap;
va_start(ap, fmt);
int len = vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
if (len > 0)
{
len = WS_REDIS_LOG_LENGTH_MAX > len ? len : WS_REDIS_LOG_LENGTH_MAX; // 长度检查
m_log.assign(buf, len); // 赋值
//len = snprintf(buf, sizeof(buf), "%s:%d:%d", pRD->tag.ip, pRD->tag.port, pRD->tag.dbIndex); // 写入相关信息
//len = WS_REDIS_LOG_LENGTH_MAX > len ? len : WS_REDIS_LOG_LENGTH_MAX; // 长度验证
//m_log.append(buf, len);
}
}
static PRedisContext redisContextInit(void) {
PRedisContext c;
c = (PRedisContext)calloc(1,sizeof(redisContext));
if (c == NULL)
return NULL;
c->err = 0;
c->errstr[0] = '\0';
//c->obuf = sdsempty();
c->reader = redisReaderCreate();
#ifdef TRACE_EAGLE_EYE
// for trace
c->tag.trace = (kitty::TraceSimulate *) new kitty::TraceSimulate;
#endif
return c;
}
void __redisSetError(redisContext *c, int type, const char *str) {
size_t len;
c->err = type;
if (str != NULL) {
len = strlen(str);
len = len < (sizeof(c->errstr)-1) ? len : (sizeof(c->errstr)-1);
memcpy(c->errstr,str,len);
c->errstr[len] = '\0';
} else {
/* Only REDIS_ERR_IO may lack a description! */
assert(type == REDIS_ERR_IO);
strerror_r(errno,c->errstr,sizeof(c->errstr));
}
}
int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout) {
#if 0
int s, rv;
char _port[6]; /* strlen("65535"); */
struct addrinfo hints, *servinfo, *p;
int blocking = (c->flags & REDIS_BLOCK);
snprintf(_port, 6, "%d", port);
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
__redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv));
return REDIS_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (redisSetBlocking(c,s,0) != REDIS_OK)
goto error;
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
if (errno == EHOSTUNREACH) {
close(s);
continue;
} else if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else {
if (redisContextWaitReady(c,s,timeout) != REDIS_OK)
goto error;
}
}
if (blocking && redisSetBlocking(c,s,1) != REDIS_OK)
goto error;
if (redisSetTcpNoDelay(c,s) != REDIS_OK)
goto error;
c->fd = s;
c->flags |= REDIS_CONNECTED;
rv = REDIS_OK;
// modified by xwz, add the socket information into mapi tag for tracing
// in case of redis using ipV6, it is safe to call getsockname again
struct sockaddr_in localAddr;
socklen_t addrLen = sizeof(struct sockaddr_in);
if (getsockname(c->fd, (struct sockaddr *)&localAddr, &addrLen) == 0) {
c->tag.localPort = ntohs(localAddr.sin_port);
c->tag.localHost = ntohl(localAddr.sin_addr.s_addr);
// fprintf(stderr, "get sockname sucess, localHost = %s, localPort = %u\n",
// inet_ntoa(localAddr.sin_addr),
// c->tag.localPort);
} else {
c->tag.localHost = 0;
c->tag.localPort = 0;
// fprintf(stderr, "get sockname failed");
}
addrLen = sizeof(struct sockaddr_in);
if (getpeername(c->fd, (struct sockaddr *)&localAddr, &addrLen) == 0) {
c->tag.peerPort = ntohs(localAddr.sin_port);
c->tag.peerHost = ntohl(localAddr.sin_addr.s_addr);
// fprintf(stderr, "get sockname sucess, peerHost = %s, peerPort = %u\n",
// inet_ntoa(localAddr.sin_addr),
// c->tag.peerPort);
} else {
c->tag.peerHost = 0;
c->tag.peerPort = 0;
// fprintf(stderr, "get peername failed");
}
goto end;
}
if (p == NULL) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't create socket: %s",strerror(errno));
__redisSetError(c,REDIS_ERR_OTHER,buf);
goto error;
}
error:
rv = REDIS_ERR;
end:
freeaddrinfo(servinfo);
return rv; // Need to return REDIS_OK if alright
#endif
return 0;
}
RedisManager * RedisManager::singleton()
{
if(!_instance)
{
_instance = new RedisManager();
}
return _instance;
}
//--------------------------------------------------------
// reids 连接管理器
//--------------------------------------------------------
// -----------------redis连接相关---------------------
// 创建链接
uint32_t RedisManager::CreateContext(std::string ip, uint32_t port)
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 遍历查找该id是否已经建立了链接
PRedisContext_ext pRDEXT;
PRedisContext pRD = NULL;
uint32_t key_num = 0;
map<uint32_t, PRedisContext_ext>::iterator it = m_allRedisConn.begin();
for (; it != m_allRedisConn.end(); ++it)
{
pRDEXT = it->second;
pRD = pRDEXT.pRD;
if (NULL != pRD)
{
if (ip.compare(pRDEXT.str_ip.c_str()) == 0)
{
if (port == (uint32_t)pRDEXT.port)
{
key_num = it->first;
log(Info, "RedisManager::CreateContext this ip&port have create !");
return key_num;
}
}
}
}
if (0 == key_num) // 该ip没有建立链接
{
pRD = redisConnect(ip.c_str(), (int)port);
if (NULL != pRD && !pRD->err)
{
pRDEXT.pRD = pRD;
pRDEXT.str_ip = ip;
pRDEXT.port = port;
key_num = ++m_MaxRedisContext; // 获取key
m_allRedisConn.insert(make_pair(key_num, pRDEXT));
log(Info, "RedisManager::CreateContext Success to restore redis connection.");
std::cout << "RedisManager::CreateContext Success to restore redis connection." << std::endl;
}
else
{
log(Error, "RedisManager::CreateContext fail! ip: %s, port: %u, err: %d, desc: %s", ip.c_str(), port, pRD->err, pRD->errstr);
std::cout << ip.c_str() << port << pRD->err << pRD->errstr << std::endl;
}
}
return key_num;
}
int redisContextSetTimeout(redisContext *c, struct timeval tv)
{
if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) {
//__redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)");
return REDIS_ERR;
}
if (setsockopt(c->fd,SOL_SOCKET,SO_SNDTIMEO,&tv,sizeof(tv)) == -1) {
//__redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_SNDTIMEO)");
return REDIS_ERR;
}
return REDIS_OK;
}
PRedisContext my_redisConnectWithTimeout(const char *ip, int port, struct timeval tv)
{
PRedisContext c = redisContextInit();
c->flags |= REDIS_BLOCK;
//redisContextConnectTcp(c,ip,port,&tv);
return c;
}
#if 0
// 建立链接,带超时时间
uint32_t RedisManager::CreateContextWithOutTime(std::string ip, uint32_t port, uint32_t time)
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 遍历查找该id是否已经建立了链接
PRedisContext pRD = NULL;
uint32_t key_num = 0;
map<uint32_t, PRedisContext>::iterator it = m_allRedisConn.begin();
for (; it != m_allRedisConn.end(); ++it)
{
pRD = it->second;
if (NULL != pRD)
{
std::string str_ip;
str_ip.assign(&(pRD->tag.ip[0]), sizeof(char)*32);
if (ip.compare(str_ip.c_str()) == 0)
{
if (port == (uint32_t)pRD->tag.port)
{
key_num = it->first;
log(Info, "RedisManager::CreateContext this ip&port have create !");
return key_num;
}
}
}
}
if (0 == key_num) // 该ip没有建立链接
{
struct timeval timeout = { (long)time, 500000 }; // 1.5 seconds
pRD = my_redisConnectWithTimeout(ip.c_str(), (int)port, timeout);
if (NULL != pRD && 0 == pRD->err)
{
key_num = ++m_MaxRedisContext; // 获取key
timeout.tv_sec = 6;
redisContextSetTimeout(pRD, timeout); // 设置超时时间
m_allRedisConn.insert(make_pair(key_num, pRD));
log(Info, "RedisManager::CreateContext Success to restore redis connection.");
}
else
{
log(Error, "RedisManager::CreateContext fail! ip: %s, port: %u, err: %d, desc: %s", ip.c_str(), port, pRD->err, pRD->errstr);
}
}
return key_num;
}
#endif
// 指定管理某个链接
bool RedisManager::CloseContext(uint32_t index_link)
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 遍历查找该id是否已经建立了链接
PRedisContext_ext pRDEXT;
PRedisContext pRD = NULL;
map<uint32_t, PRedisContext_ext>::iterator it = m_allRedisConn.find(index_link);
if (it != m_allRedisConn.end())
{
pRDEXT = it->second;
pRD = pRDEXT.pRD;
redisFree(&(*pRD));
pRD = NULL;
}
log(Warn, "CloseContext: index_link %u ", index_link);
return true;
}
// 关闭所有链接
bool RedisManager::CloseAllContext()
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 遍历查找该id是否已经建立了链接
PRedisContext_ext pRDEXT;
PRedisContext pRD = NULL;
map<uint32_t, PRedisContext_ext>::iterator it = m_allRedisConn.begin();
for (; it != m_allRedisConn.end(); ++it)
{
pRDEXT = it->second;
pRD = pRDEXT.pRD;
redisFree(&(*pRD));
}
m_allRedisConn.clear();
log(Warn, "CloseAllContext: ");
return true;
}
// 根据key获取redis连接(每个连接DB个数由配置决定)
// (多连接情况下)通过hash key找到该key存储的redis连接,并发情况下可以提高并发率
PRedisContext RedisManager::getRedis(std::string key)
{
// hash 找出对应的redis连接
const char* ckey = key.c_str();
uint32_t hash = WS_REDIS_CONTEXT_HASH_MARK;
for (uint32_t i = 0; i < key.length(); i++)
{
hash = ((hash << 5) + hash) + ckey[i];
hash = hash & 0xFFFFFFFFl;
}
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 获取第n个
uint32_t index = (uint32_t)hash % (m_MaxRedisContext + 1);
if (index == m_MaxRedisContext)
{
--index;
}
PRedisContext_ext pRDEXT;
map<uint32_t, PRedisContext_ext>::iterator pRD_it = m_allRedisConn.find(index);
if (pRD_it != m_allRedisConn.end())
{
pRDEXT = pRD_it->second;
return pRDEXT.pRD;
}
return NULL;
}
// 直接指明hashmap中第几个redis连接
PRedisContext RedisManager::getRedis(uint32_t index_link)
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
PRedisContext_ext pRDEXT;
map<uint32_t, PRedisContext_ext>::iterator pRD_it = m_allRedisConn.find(index_link);
if (pRD_it != m_allRedisConn.end())
{
/*if (pRD_it->second != NULL)
{
log(Warn, "getRedis: index_link %u have break off ! ", index_link);
}*/
pRDEXT = pRD_it->second;
return pRDEXT.pRD;
}
return NULL;
}
// 获取指定redis连接中的第N个DB
bool RedisManager::selectDB(PRedisContext& pRD, uint32_t index_db)
{
std::string cmd("SELECT %d");
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommand(pRD, cmd.c_str(), index_db);
cTime.Log(pRD, "SELECT %u ; reply:%u, pRD:%u", index_db, reply, pRD); // 打印地址
if (reply)
{
//pRD->tag.dbIndex = index_db;
freeReplyObject(reply); // 释放空间
}
else
{
log(Warn, "err: redis select db %u failed! Error:%d, err_info:%s", index_db, pRD->err, pRD->errstr);
return false;
}
return true;
}
// -----------------tool工具--------------------------
// 链接信息
std::string RedisManager::contextAddrInfo(PRedisContext& pRD)
{
if (pRD == NULL)
{
return "NULL";
}
char ip[32] = "0.0.0.0";
int port = 0;
struct sockaddr_in guest;
unsigned int guest_len = sizeof(guest);
// 获取链接的对方的socket地址
if (0 == getpeername(pRD->fd, (struct sockaddr *)&guest, &guest_len))
{
// 将地址转换成10进制字符串
inet_ntop(AF_INET, &guest.sin_addr, ip, sizeof(ip));
port = ntohs(guest.sin_port);
}
std::stringstream os;
os << "redis_info:[" << ip << ":" << port << "]";
os << "context_info:[" << pRD->err << ":" << pRD->errstr << "]";
return os.str();
}
// 报告信息
std::string RedisManager::replayInfo(redisReply* pReply)
{
if (pReply == NULL)
{
return "NULL";
}
const char * type = "NaN";
const char * strval = "";
long long int intval = 0;
switch (pReply->type)
{
case REDIS_REPLY_STRING: {type = "std::string"; strval = pReply->str; break; }
case REDIS_REPLY_ARRAY: {type = "ARRAY"; break; }
case REDIS_REPLY_INTEGER: {type = "INTEGER"; intval = pReply->integer; break; }
case REDIS_REPLY_NIL: {type = "NIL"; break; }
case REDIS_REPLY_STATUS: {type = "STATUS"; strval = pReply->str; break; }
case REDIS_REPLY_ERROR: {type = "ERROR"; strval = pReply->str; break; }
default: {}
}
if (strval == 0)
strval = "NULL";
char tmp[100] = { 0 };
snprintf(tmp, sizeof(tmp), "type(%s),int(%lld),str(%s)", type, intval, strval);
return tmp;
}
// -----------------redis命令相关---------------------
redisReply* RedisManager::redisCommand(PRedisContext& pRD, const char *format, ...)
{
va_list ap;
struct timeval tBeginTime, tEndTime;
gettimeofday(&tBeginTime, NULL); // 初始化执行时间
// 执行命令
va_start(ap, format);
void *reply = redisvCommand(&(*pRD), format, ap);
va_end(ap);
gettimeofday(&tEndTime, NULL); // 执行结束时间
// 获取命令key
std::string strKey;
strKey = format;
strKey = strKey.substr(0, 10);
buildReport(strKey, tBeginTime, tEndTime); // 生成报告
// log
redisReply * rreply = (redisReply*)reply;
if (rreply == NULL || pRD == NULL || pRD->err != 0 || rreply->type == REDIS_REPLY_ERROR)
{
//log(Warn, "err REDIS_REPLY %s ! redis_info:%s, reply_info:%s, format:`%s`", __func__, contextAddrInfo(pRD).c_str(), replayInfo(rreply).c_str(), replaceCmdCrlf(format).c_str());
return NULL;
}
else
{
//log(Debug, "REDIS_REPLY %s ! redis_info:%s, reply_info:%s, format:`%s`", __func__, contextAddrInfo(pRD).c_str(), replayInfo(rreply).c_str(), replaceCmdCrlf(format).c_str());
}
return rreply;
}
// key
// 获取数据库中所有的key; value: 为返回的所有key集合, strMatching:为查找匹配,默认为"*"所有,可以为如"t[w]*"表达式
bool RedisManager::getAllKeys(PRedisContext& pRD, set<std::string>& value, std::string strMatching)
{
RedisCmdTimeTesting cTime; // 记录
// 执行命令
redisReply* reply = redisCommand(pRD, "KEYS %s", strMatching.c_str());
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
value.insert(reply->element[j]->str);
}
cTime.Log(pRD, "getAllKeys KEYS %s! reply:%u, size:%d", strMatching.c_str(), reply, value.size());
freeReplyObject(reply);
return true;
}
else
{
cTime.Log(pRD, "ERROR KEYS %s ! repl: %u, size:0", strMatching.c_str(), reply);
freeReplyObject(reply);
return false;
}
}
log(Warn, "err: getAllKeys! SMEMBERS reply is NULL!");
return false;
}
// 检查key是否存在
bool RedisManager::isKeyExist(PRedisContext& pRD, const string& key, bool& keyExist)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "EXISTS %s", key.c_str());
cTime.Log(pRD, "IsKeyExist EXISTS %s! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_INTEGER)
{
if (reply->integer == 1)
{
keyExist = true;
freeReplyObject(reply);
return true;
}
else if (reply->integer == 0)
{
keyExist = false;
freeReplyObject(reply);
return false;
}
else
{
keyExist = false;
log(Error, "IsKeyExist: unexpected integer:%lld", reply->integer);
freeReplyObject(reply);
return false;
}
}
else
{
log(Error, "IsKeyExist: unexpected reply type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Error, "IsKeyExist: EXISTS reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 删除指定的key
bool RedisManager::delKey(PRedisContext& pRD, const std::string& key)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "DEL %s", key.c_str());
cTime.Log(pRD, "delKey DEL %s! reply:%u", key.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "err: delKey: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 事务模式:删除指定的key, 返回0表示失败,1表示成功,2表示存入队列(因为有事务处理)
uint32_t RedisManager::delKey_T(PRedisContext& pRD, const std::string& key)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "DEL %s", key.c_str());
cTime.Log(pRD, "delKey DEL %s! reply:%u", key.c_str(), reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "QUEUED") == 0)
{
freeReplyObject(reply);
return 2;
}
}
freeReplyObject(reply);
return 1;
}
log(Warn, "err: delKey: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return 0;
}
// 获取key的类型;返回值:none(key不存在),std::string(字符串),list(列表),set(集合),zset(有序集),hash(哈希表)
bool RedisManager::keyType(PRedisContext& pRD, const std::string& key, std::string& type)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "TYPE %s", key.c_str());
cTime.Log(pRD, "keyType TYPE %s! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STATUS)
{
type.assign(reply->str, reply->len);
freeReplyObject(reply);
return true;
}
}
log(Warn, "err: Wrong reply type received! key:%s, type:%d, error:%d, error_info:%s,", key.c_str(), reply->type, pRD->err, pRD->errstr);
return false;
}
// 设置key的生存时间,单位s
bool RedisManager::setKeyTTL(PRedisContext& pRD, const std::string& key, uint32_t time)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "expire %s %s", key.c_str(), ws::toString(time).c_str());
cTime.Log(pRD, "setKeyTTL key %s! time:%u, reply:%u", key.c_str(), time, reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "err: setKeyTTL: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 更新key的生存时间, 单位s;覆盖旧的生存时间
bool RedisManager::expireAt(PRedisContext& pRD, const std::string& key, time_t calTime)
{
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommand(pRD, "EXPIREAT %s %s", key.c_str(), ws::toString(calTime).c_str());
cTime.Log(pRD, "expireAt EXPIREAT %s ! time:%s, reply:%u", key.c_str(), ws::toString(calTime).c_str(), reply);
if (reply)
{
bool retCode;
if (reply->type == REDIS_REPLY_INTEGER)
{
retCode = (reply->integer != 0);
}
else
{
log(Warn, "expireAt: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
retCode = false;
}
freeReplyObject(reply);
return retCode;
}
log(Warn, "expireAt: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// lock
bool RedisManager::lock(PRedisContext& pRD, const std::string &key, uint32_t ttl)
{
// 限制在第一个db
if (!pRD || !selectDB(pRD, 0))
{
log(Error, "%s, select db failed ,key:%s ", __FUNCTION__, key.c_str());
return false;
}
long long int ret;
uint32_t sleepTimes = 101;// 最多睡两秒
do
{
if (incrValue(pRD, key, ret, 1) && 1 == ret)
{
setKeyTTL(pRD, key, ttl);
return true;
}
if (0 == --sleepTimes)
{
break;
}
usleep(200);
} while (true);
return false;
}
bool RedisManager::unlock(PRedisContext& pRD, const std::string &key)
{
if (!pRD || !selectDB(pRD, 0))
{
log(Warn, "%s, select db failed ,key:%s ", __FUNCTION__, key.c_str());
return false;
}
if (!delKey(pRD, key))
{
return false;
}
return true;
}
//--- std::string
bool RedisManager::setStrValue(PRedisContext& pRD, const std::string& key, const std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SET %s %b", key.c_str(), value.data(), value.size());
cTime.Log(pRD, "setStrValue SET %s ! reply:%u", key.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "err: setStrValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
bool RedisManager::setStrValue(PRedisContext& pRD, const std::string& key, const std::string& value, const uint32_t& time)
{
RedisCmdTimeTesting cTime;
bool retCode = false;
redisReply* reply = redisCommand(pRD, "SETEX %s %s %s", key.c_str(), ws::toString(time).c_str(), value.c_str());
cTime.Log(pRD, "setStrValue SETEX %s %u %s ! reply:%u", key.c_str(), time, value.c_str(), reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "OK") == 0)
{
retCode = true;
}
else
{
log(Error, "SETEX return failure");
}
}
else
{
log(Error, "SETEX return error type");
}
freeReplyObject(reply);
}
else
{
log(Warn, "err: setStrValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
}
return retCode;
}
bool RedisManager::getStrValue(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "GET %s", key.c_str());
cTime.Log(pRD, "getStrValue GET %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
value.assign(reply->str, reply->len);
}
freeReplyObject(reply);
return true;
}
log(Warn, "err: getStrValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
bool RedisManager::setAnyValue(PRedisContext &pRD, map<std::string, std::string> value)
{
if (value.empty())
{
return false;
}
string command("MSET");
for (map<string, string>::iterator it = value.begin(); it != value.end(); ++it)
{
command += " " + it->first + " " + it->second;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, command.c_str());
cTime.Log(pRD, "setBatchStrValue %s ! reply:%u", command.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "err: setBatchStrValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
bool RedisManager::getAnyValue(PRedisContext& pRD, const set<std::string>& setKeys, map<std::string, std::string>& outMapInfo)
{
if (setKeys.empty())
{
return false;
}
string command("MGET ");
uint32_t count = 0;
set<string>::const_iterator itSet;
for (itSet = setKeys.begin(); itSet != setKeys.end(); ++itSet)
{
command += *itSet + " ";
count++;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, command.c_str());
cTime.Log(pRD, "getBatchStrValue %s ! reply:%u", command.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
assert(count == reply->elements);
uint32_t i = 0;
for (itSet = setKeys.begin(); itSet != setKeys.end(); ++itSet)
{
if (reply->element[i]->type == REDIS_REPLY_STRING)
{
outMapInfo[*itSet] = reply->element[i]->str;
}
++i;
}
}
freeReplyObject(reply);
}
return true;
}
//std::string incr
// 每次增加指定值,默认为1
bool RedisManager::incrValue(PRedisContext& pRD, const std::string& key, int incrNum)
{
RedisCmdTimeTesting cTime;
redisReply* reply = NULL;
if (1 == incrNum)
{
reply = redisCommand(pRD, "INCR %s %d", key.c_str());
}
else
{
reply = redisCommand(pRD, "INCRBY %s %d", key.c_str(), incrNum);
}
cTime.Log(pRD, "incrByValue INCRBY %s %d ! reply:%u", key.c_str(), incrNum, reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "err: INCRBY return error type");
freeReplyObject(reply);
return false;
}
}
log(Warn, "err: incrByValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 每次增加指定值,默认为1,返回增加后的值
bool RedisManager::incrValue(PRedisContext& pRD, const std::string& key, long long &outValue, int incrNum)
{
RedisCmdTimeTesting cTime;
redisReply* reply = NULL;
if (1 == incrNum)
{
reply = redisCommand(pRD, "INCR %s %d", key.c_str());
}
else
{
reply = redisCommand(pRD, "INCRBY %s %d", key.c_str(), incrNum);
}
cTime.Log(pRD, "incrByValue INCRBY %s %d ! reply:%u", key.c_str(), incrNum, reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
outValue = reply->integer;
freeReplyObject(reply);
return true;
}
else
{
log(Error, "err: INCRBY return error type");
freeReplyObject(reply);
return false;
}
}
log(Warn, "err: incrByValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
//std::string decr
// 每次减去指定值,默认为1
bool RedisManager::decrValue(PRedisContext& pRD, const std::string& key, int decrNum)
{
RedisCmdTimeTesting cTime;
redisReply* reply = NULL;
if (1 == decrNum)
{
reply = redisCommand(pRD, "DECR %s", key.c_str());
}
else
{
reply = redisCommand(pRD, "DECRBY %s %s", key.c_str(), ws::toString(decrNum).c_str());
}
cTime.Log(pRD, "DECR %s %d ! reply:%u retCode %d", key.c_str(), decrNum, reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "DECR return error type");
freeReplyObject(reply);
return false;
}
}
log(Warn, "decrValue: reply is NULL ! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 每次减去指定值,默认为1,返回增加后的值
bool RedisManager::decrValue(PRedisContext& pRD, const std::string& key, long long &outValue, int decrNum)
{
RedisCmdTimeTesting cTime;
redisReply* reply = NULL;
if (1 == decrNum)
{
reply = redisCommand(pRD, "DECR %s", key.c_str());
}
else
{
reply = redisCommand(pRD, "DECRBY %s %s", key.c_str(), ws::toString(decrNum).c_str());
}
cTime.Log(pRD, "DECR %s %d ! reply:%u retCode %d", key.c_str(), decrNum, reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
outValue = reply->integer;
freeReplyObject(reply);
return true;
}
else
{
log(Error, "DECR return error type");
freeReplyObject(reply);
return false;
}
}
log(Warn, "decrValue: reply is NULL ! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
//--- list
// 获取list长度
bool RedisManager::getListLen(PRedisContext& pRD, const std::string& key, uint32_t& len)
{
len = 0;
bool keyExist = false;
if (false == isKeyExist(pRD, key, keyExist))
{
return false;
}
if (keyExist == false)
{
return true;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LLEN %s", key.c_str());
cTime.Log(pRD, "getListLen LLEN %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
len = strtoul(reply->str, 0, 0);
}
else if (reply->type == REDIS_REPLY_INTEGER)
{
len = reply->integer;
}
else
{
log(Warn, "getListLen: LLEN reply is unexpected!");
}
freeReplyObject(reply);
return true;
}
log(Warn, "getListLen: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入表头,表为空创建
bool RedisManager::lPushList(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LPUSH %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "LPUSHList LPUSH %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Error, "LPUSHList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入表尾,表为空创建
bool RedisManager::rPushList(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "RPUSH %s %s", key.c_str(), value.c_str());
cTime.Log(pRD,"RPUSHList RPUSH %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "RPUSHList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入表头,表为空不创建
bool RedisManager::lPushXList(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LPUSHX %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "LPUSHXList LPUSHX %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "LPUSHXList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入表尾,表为空创建
bool RedisManager::rPushXList(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "RPUSHX %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "RPUSHXList RPUSHX %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "RPUSHXList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入多个值到表头, 先插入数据在表尾
bool RedisManager::lPushAnyList(PRedisContext& pRD, const std::string& key, vector<std::string>& values)
{
if (values.empty())
{
return true;
}
string command("");
for (uint32_t i = 0; i < values.size(); ++i)
{
command += " " + values[i];
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LPUSH %s %s", key.c_str(), command.c_str());
cTime.Log(pRD, "LPUSHList LPUSH %s %s ! reply:%u", key.c_str(), command.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "LPUSHList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入多个值到表尾, 先插入数据在表头
bool RedisManager::rPushAnyList(PRedisContext& pRD, const std::string& key, vector<std::string>& values)
{
if (values.empty())
{
return true;
}
string command("");
for (uint32_t i = 0; i < values.size(); ++i)
{
command += " " + values[i];
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "RPUSH %s %s", key.c_str(), command.c_str());
cTime.Log(pRD, "RPUSHList RPUSH %s %s ! reply:%u", key.c_str(), command.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "RPUSHList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入表头并获取长度
bool RedisManager::lPushGetLen(PRedisContext& pRD, const std::string& key, std::string& value, uint32_t& len)
{
len = 0;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LPUSH %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "lPushGetLen LPUSH %s %s reply %u", key.c_str(), value.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_INTEGER)
{
len = reply->integer;
freeReplyObject(reply);
return true;
}
else
{
log(Error, "lPushGetLen: reply is not INTEGER. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "lPushGetLen: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入表尾并获取长度
bool RedisManager::rPushGetLen(PRedisContext& pRD, const std::string& key, std::string& value, uint32_t& len)
{
len = 0;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "RPUSH %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "rPushGetLen RPUSH %s %s reply %u", key.c_str(), value.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_INTEGER)
{
len = reply->integer;
freeReplyObject(reply);
return true;
}
else
{
log(Error, "rPushGetLen: reply is not INTEGER. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "rPushGetLen: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 插入单个字符列到list尾部,如"a b c"
bool RedisManager::rPushListBit(PRedisContext& pRD, const std::string& key, std::string& binMem)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "RPUSH %s %b", key.c_str(), binMem.c_str(), binMem.size());
cTime.Log(pRD, "rPushListBit RPUSH %s ! size:%d,reply:%u", key.c_str(), binMem.size(), reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ERROR == reply->type)
{
log(Error, "RPUSH return error, list name: %u", key.c_str());
}
else
{
log(Error, "Not expected reply type received for RPUSH: %u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "RPUSHList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 移除字符列
bool RedisManager::lPopListBit(PRedisContext& pRD, const std::string& key, std::string& binMem, bool& keyExist)
{
keyExist = false;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LPOP %s", key.c_str());
cTime.Log(pRD, "lPopListBit LPOP %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
binMem.assign(reply->str, reply->len);
keyExist = true;
freeReplyObject(reply);
return true;
}
else if (reply->type == REDIS_REPLY_NIL)
{
// 值不存在
freeReplyObject(reply);
return true;
}
freeReplyObject(reply);
return false;
}
log(Warn, "lPopListBit: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 移除表尾的元素
bool RedisManager::rPopList(PRedisContext& pRD, const std::string& key, std::string& value, bool& keyExist)
{
keyExist = false;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "RPOP %s", key.c_str());
cTime.Log(pRD, "rPopList RPOP %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
value.assign(reply->str, reply->len);
keyExist = true;
freeReplyObject(reply);
return true;
}
else if (reply->type == REDIS_REPLY_NIL)
{
// 值不存在
freeReplyObject(reply);
return true;
}
freeReplyObject(reply);
return false;
}
log(Warn, "rPopList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 移除表头的元素
bool RedisManager::lPopList(PRedisContext& pRD, const std::string& key, std::string& value, bool& keyExist)
{
keyExist = false;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LPOP %s", key.c_str());
cTime.Log(pRD, "lPopList LPOP %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
value.assign(reply->str, reply->len);
keyExist = true;
freeReplyObject(reply);
return true;
}
else if (reply->type == REDIS_REPLY_NIL)
{
// 值不存在
freeReplyObject(reply);
return true;
}
freeReplyObject(reply);
return false;
}
log(Warn, "lPopList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 移除与value值相同的元素,count为移除个数(正数表示从前到后移除n个,负数表示从后往前移除n个,0表示移除所有)
bool RedisManager::lremList(PRedisContext& pRD, const std::string& key, std::string& value, std::string count)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LREM %s %s %s", key.c_str(), count.c_str(), value.c_str());
cTime.Log(pRD, "lremList LREM %s %s %s ! reply:%u", key.c_str(), count.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "lremList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 返回指定区域内的元素列表
bool RedisManager::lrangeList(PRedisContext& pRD, const std::string& key, std::vector<std::string>& result, uint32_t startPos, uint32_t endPos)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "LRANGE %s %s %s", key.c_str(), ws::toString(startPos).c_str(), ws::toString(endPos).c_str());
cTime.Log(pRD, "lrangeList LRANGE %s %s %s ! reply:%u", key.c_str(), ws::toString(startPos).c_str(), ws::toString(endPos).c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; j++)
{
result.push_back(reply->element[j]->str);
}
}
freeReplyObject(reply);
return true;
}
log(Warn, "lrangeList: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
//--- hash
// 设置hash值
bool RedisManager::setHashValue(PRedisContext& pRD, const std::string& key, std::string field, std::string strValue)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HSET %s %s %s", key.c_str(), field.c_str(), strValue.c_str());
cTime.Log(pRD, "setHashValue HSET %s %s %s ! reply:%u", key.c_str(), field.c_str(), strValue.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "setHashValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取hash值
bool RedisManager::getHashValue(PRedisContext& pRD, const std::string& key, std::string field, std::string& strValue)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HGET %s %s", key.c_str(), field.c_str());
cTime.Log(pRD, "getHashValue HGET %s %s ! reply:%u", key.c_str(), field.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
strValue.assign(reply->str, reply->len);
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getHashValue: reply is not STRING. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getHashValue: SMEMBERS reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 设置多个值
bool RedisManager::setAnyHashValue(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& values)
{
std::vector<const char *> argv(values.size() * 2 + 2);
std::vector<size_t> argvlen(values.size() * 2 + 2);
uint32_t j = 0;
static char msethash[] = "HMSET";
argv[j] = msethash;
argvlen[j] = sizeof(msethash) - 1;
++j;
argv[j] = key.c_str();
argvlen[j] = key.size();
++j;
for (std::map<string, string>::const_iterator i = values.begin(); i != values.end(); i++, j++)
{
argv[j] = i->first.c_str();
argvlen[j] = i->first.size();
j++;
argv[j] = i->second.c_str();
argvlen[j] = i->second.size();
}
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommandArgv(pRD, argv.size(), &(argv[0]), &(argvlen[0]));
cTime.Log(pRD, "setAnyHashValue HMSET size %d ! reply:%u", argv.size(), reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "OK") == 0)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "setAnyHashValue: reply is not INTEGER. reply->type:%u, reply->str: %s",reply->type, reply->str);
freeReplyObject(reply);
return false;
}
}
else
{
log(Error, "setAnyHashValue: return error type");
freeReplyObject(reply);
return false;
}
}
log(Warn, "setAnyHashValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取多个值
bool RedisManager::getAnyHashValue(PRedisContext& pRD, const std::string& key, vector<std::string>& fields, map<std::string, std::string>& values)
{
stringstream cmd;
cmd << "HMGET " << key;
for (vector<string>::iterator it = fields.begin(); it != fields.end(); ++it)
{
cmd << " " << *it;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, cmd.str().c_str());
cTime.Log(pRD, "getAnyHashValue %s ! reply %u", cmd.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_ARRAY == reply->type)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
if (REDIS_REPLY_NIL != reply->element[j]->type)
{
values[fields[j]] = reply->element[j]->str;
}
else
{
values[fields[j]] = ""; // 获取失败
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getAnyHashValue: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getAnyHashValue: HMGET reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取所有hash值对
bool RedisManager::getHashAllValue(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& valueMap)
{
string strField;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HGETALL %s", key.c_str());
cTime.Log(pRD, "getHashAllBinValue HGETALL %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
strField = reply->element[j]->str;
if (j < reply->elements)
{
j++;
valueMap[strField].assign(reply->element[j]->str, reply->element[j]->len);
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getHashAllValue: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getHashAllValue: SMEMBERS reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取目标key的所有field
bool RedisManager::getHashAllFields(PRedisContext& pRD, const std::string& key, vector<std::string>& fields)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HKEYS %s", key.c_str());
cTime.Log(pRD, "getHashAllFields HKEYS %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
fields.push_back(reply->element[j]->str);
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getHashAllFields: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getHashAllFields: HKEYS reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (二进制)设置hash值
bool RedisManager::setHashValueBit(PRedisContext& pRD, const std::string& key, std::string field, std::string binValue)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HSET %s %s %b", key.c_str(), field.c_str(), binValue.c_str(), binValue.size());
cTime.Log(pRD, "setHashValueBit HSET %s %s size %d ! reply:%u", key.c_str(), field.c_str(), binValue.size(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "setHashValueBit: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (二进制)获取所有键值对
bool RedisManager::getAnyHashValueBit(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& valueMap)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HGETALL %s", key.c_str());
cTime.Log(pRD, "getAnyHashValueBit HGETALL %s reply %u", key.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
std::string strField = reply->element[j]->str;
if (j < reply->elements)
{
j++;
valueMap[strField].assign(reply->element[j]->str, reply->element[j]->len);
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getAnyHashValueBit: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getAnyHashValueBit: SMEMBERS reply is NULL ! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 删除hash值
bool RedisManager::delHashValue(PRedisContext& pRD, const std::string& key, std::string field)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HDEL %s %s ", key.c_str(), field.c_str());
cTime.Log(pRD, "delHashValue HDEL %s %s ! reply:%u", key.c_str(), field.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "delHashValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 删除多个hash值
bool RedisManager::delHashValue(PRedisContext& pRD, const std::string& key, const set<std::string> & fields, int64_t& rmCount)
{
stringstream cmd;
cmd << "HDEL " << key <<" ";
set<std::string>::iterator it = fields.begin();
for (; it != fields.end(); ++it)
{
cmd << *it << " ";
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, cmd.str().c_str());
cTime.Log(pRD, "%s ! reply:%u", cmd.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type && rmCount != 0)
{
rmCount = reply->integer;
}
else
{
log(Error, "delHashValue: Unexpected reply type %d", reply->type);
}
freeReplyObject(reply);
return true;
}
log(Warn, "delHashValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 判断是不是hash成员
bool RedisManager::isMemInHash(PRedisContext& pRD, const std::string& key, std::string field, bool& isMem)
{
isMem = false;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HEXISTS %s %s", key.c_str(), field.c_str());
cTime.Log( pRD, "isMemInHash HEXISTS %s %s ! reply:%u", key.c_str(), field.c_str(), reply );
if ( reply )
{
if ( reply->type == REDIS_REPLY_INTEGER )
{
if ( reply->integer == 1)
{
isMem = true;
freeReplyObject(reply);
return true;
}
else if ( reply->integer == 0)
{
log( Debug, "isMemInHash: Mem %s not in Hash:%s", field.c_str(), key.c_str() );
}
else
{
log(Error, "isMemInHash: unexpected integer:%lld", reply->integer);
}
}
else
{
log( Error, "isMemInHash: unexpected reply type:%u", reply->type );
}
freeReplyObject(reply);
return false;
}
log(Warn, "isMemInHash: HEXISTS reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 在原始值上增加指定值
bool RedisManager::incrHashValue(PRedisContext& pRD, const std::string& key, std::string field, std::string value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "HINCRBY %s %s %s", key.c_str(), field.c_str(), value.c_str());
cTime.Log(pRD, "incrHashValue HINCRBY %s %s %s ! reply:%u", key.c_str(), field.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "incrHashValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (事务) 设置hash值
bool RedisManager::setHashValue_T(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& fieldValue)
{
std::vector<const char *> argv(fieldValue.size() * 2 + 2);
std::vector<size_t> argvlen(fieldValue.size() * 2 + 2);
uint32_t j = 0;
static char msethash[] = "HMSET";
argv[j] = msethash;
argvlen[j] = sizeof(msethash) - 1;
++j;
argv[j] = key.c_str();
argvlen[j] = key.size();
++j;
for (std::map<string, string>::const_iterator i = fieldValue.begin(); i != fieldValue.end(); i++, j++)
{
argv[j] = i->first.c_str();
argvlen[j] = i->first.size();
j++;
argv[j] = i->second.c_str();
argvlen[j] = i->second.size();
}
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommandArgv(pRD, argv.size(), &(argv[0]), &(argvlen[0]));
cTime.Log(pRD, "setHashValue_T HMSET size %d ! reply:%u", argv.size(), reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "QUEUED") == 0)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "setHashValue_T: reply is not INTEGER. reply->type:%u, reply->str: %s",reply->type, reply->str);
freeReplyObject(reply);
return false;
}
}
else
{
log(Error, "setHashValue_T return error type");
freeReplyObject(reply);
return false;
}
}
log(Warn, "setHashValue_T: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
//--- set
// 判断是不是set成员
bool RedisManager::isMemInSet(PRedisContext& pRD, const std::string& key, std::string& value, bool& isMem)
{
isMem = false;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SISMEMBER %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "isMemInSet SISMEMBER %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_INTEGER)
{
if (reply->integer == 1)
{
isMem = true;
freeReplyObject(reply);
return true;
}
else if (reply->integer == 0)
{
log(Debug, "isMemInSet: Mem %s not in set:%s", value.c_str(), key.c_str());
}
else
{
log(Error, "isMemInSet: unexpected integer:%lld", reply->integer);
}
}
else
{
log(Error, "isMemInSet: unexpected reply type:%u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "isMemInSet: SISMEMBER reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取set元素个数
bool RedisManager::getSetCount(PRedisContext& pRD, const std::string& key, uint32_t& count, bool& keyExist)
{
keyExist = false;
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SCARD %b", key.c_str(), key.size());
cTime.Log(pRD, "getSetCount SCARD key size %d ! reply:%u", key.size(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_STRING)
{
count = strtoul(reply->str, 0, 0);
keyExist = true;
freeReplyObject(reply);
return true;
}
else if (reply->type == REDIS_REPLY_INTEGER)
{
count = reply->integer;
keyExist = true;
freeReplyObject(reply);
return true;
}
log(Error, "getSetCount: SACRD reply is unexpected!");
freeReplyObject(reply);
return false;
}
log(Warn, "getSetCount: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 添加元素,已经存在的元素被忽略
bool RedisManager::setSetValue(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SADD %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "setSetValue SADD %s %s reply %u", key.c_str(), value.c_str(), reply);
if (reply)
{
if (reply->type != REDIS_REPLY_INTEGER)
{
log(Error, "setSetValue: reply is not INTEGER. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
freeReplyObject(reply);
return true;
}
log(Warn, "setSetValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 添加多个值,已经存在的元素被忽略
bool RedisManager::setAnySetValue(PRedisContext& pRD, const std::string& key, set<std::string>& value)
{
std::vector<const char *> argv(value.size() + 2);
std::vector<size_t> argvlen(value.size() + 2);
uint32_t j = 0;
static char sremcmd[] = "SADD";
argv[j] = sremcmd;
argvlen[j] = sizeof(sremcmd) - 1;
++j;
argv[j] = key.c_str();
argvlen[j] = key.size();
++j;
for (std::set<string>::const_iterator i = value.begin(); i != value.end(); ++i, ++j)
{
argv[j] = i->c_str();
argvlen[j] = i->size();
}
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommandArgv(pRD, argv.size(), &(argv[0]), &(argvlen[0]));
cTime.Log(pRD, "setAnySetValue SADD size %d reply %u", argv.size(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_INTEGER)
{
freeReplyObject(reply);
return true;
}
else
{
freeReplyObject(reply);
log(Error, "setAnySetValue: reply is not INTEGER. reply->type:%u", reply->type);
return false;
}
}
log(Warn, "setAnySetValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取set集合
bool RedisManager::getAnySetValue(PRedisContext& pRD, const std::string& key, set<std::string>& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SMEMBERS %s", key.c_str());
cTime.Log(pRD, "getAnySetValue str SMEMBERS %s size %d ! reply:%u", key.c_str(), value.size(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
value.insert(reply->element[j]->str);
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getAnySetValue: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getAnySetValue: SMEMBERS reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 移除集合中的元素
bool RedisManager::remSetValue(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SREM %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "remSetValue SREM %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
freeReplyObject(reply);
return true;
}
log(Warn, "remSetValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 移除多个指定元素
bool RedisManager::remAnySetValue(PRedisContext& pRD, const std::string &key, const std::set<std::string> &value)
{
std::vector<const char *> argv(value.size() + 2);
std::vector<size_t> argvlen(value.size() + 2);
uint32_t j = 0;
static char sremcmd[] = "SREM";
argv[j] = sremcmd;
argvlen[j] = sizeof(sremcmd) - 1;
++j;
argv[j] = key.c_str();
argvlen[j] = key.size();
++j;
for (std::set<string>::const_iterator i = value.begin(); i != value.end(); ++i, ++j)
{
argv[j] = i->c_str();
argvlen[j] = i->size();
}
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommandArgv(pRD, argv.size(), &(argv[0]), &(argvlen[0]));
cTime.Log(pRD, "remAnySetValue SREM size %d ! reply:%u", argv.size(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_INTEGER)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "remAnySetValue: reply is not INTEGER. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "remAnySetValue: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取多个key的并集
bool RedisManager::getSetSunion(PRedisContext& pRD, set<std::string>& keys, set<std::string>& value)
{
ostringstream cmdStr;
cmdStr << "SUNION";
for (set<std::string>::iterator it = keys.begin(); it != keys.end(); ++it)
{
cmdStr << " " << *it;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getSetSunion %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
value.insert(reply->element[j]->str);
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getSetSunion: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getSetSunion: SUNION reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取多个key的差集
bool RedisManager::getSetSdiff(PRedisContext& pRD, set<std::string>& keys, set<std::string>& value)
{
ostringstream cmdStr;
cmdStr << "SDIFF";
for (set<std::string>::iterator it = keys.begin(); it != keys.end(); ++it)
{
cmdStr << " " << *it;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getSetSdiff %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
value.insert(reply->element[j]->str);
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getSetSdiff: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getSetSdiff: SUNION reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取多个key的交集
bool RedisManager::getSetSinter(PRedisContext& pRD, set<std::string>& keys, set<std::string>& value)
{
ostringstream cmdStr;
cmdStr << "SINTER";
for (set<std::string>::iterator it = keys.begin(); it != keys.end(); ++it)
{
cmdStr << " " << *it;
}
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getSetSinter %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (reply->type == REDIS_REPLY_ARRAY)
{
for (unsigned int j = 0; j < reply->elements; ++j)
{
value.insert(reply->element[j]->str);
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getSetSinter: reply is not ARRAY. reply->type:%u", reply->type);
freeReplyObject(reply);
return false;
}
}
log(Warn, "getSetSinter: SUNION reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (事务)添加集合元素
bool RedisManager::setSetValue_T(PRedisContext& pRD, const std::string& key, std::string& value)
{
RedisCmdTimeTesting cTime;
redisReply* reply = redisCommand(pRD, "SADD %s %s", key.c_str(), value.c_str());
cTime.Log(pRD, "addSetValue_T SADD %s %s ! reply:%u", key.c_str(), value.c_str(), reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "QUEUED") == 0)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "setSetValue_T in transaction return failure");
}
}
else
{
log(Error, "setSetValue_T Wrong reply type received! Type: %d, key: %s", reply->type, key.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "setSetValue_T: reply is NULL! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// --- sortset
// 检查目标是不是sortset的资源
bool RedisManager::isMemInSortedSet(PRedisContext& pRD, const std::string& key, const std::string& member, bool& isMem)
{
isMem = false;
ostringstream cmdStr;
cmdStr << "ZSCORE " << key << " " << member;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "isMemInSortedSet %s reply %u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
isMem = true;
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_STRING == reply->type)
{
isMem = true;
freeReplyObject(reply);
return true;
}
else
{
log(Error, "isMemInSortedSet Wrong reply received. Type: %d,. sortedSet: %s, member: %s", key.c_str(), member.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "isMemInSortedSet failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取资源个数
bool RedisManager::getCountSortedSet(PRedisContext& pRD, const std::string& key, uint32_t& totalCount)
{
ostringstream cmdStr;
cmdStr << "ZCARD " << key;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getCountSortedSet %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "getCountSortedSet Wrong reply received. Type: %d, key: %s", reply->type, key.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "getCountSortedSet failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 添加元素到key
bool RedisManager::setSortSet(PRedisContext& pRD, const std::string& key, std::string& mem, const uint64_t& score)
{
ostringstream cmdStr;
cmdStr << "ZADD " << key << " " << score << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str());
cTime.Log(pRD, "setSortSet key %s, mem %s, score %u ! reply:%u", key.c_str(), mem.c_str(), score, reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ERROR == reply->type)
{
log(Error, "setSortSet return error, sortedSet: %s, score: %u", key.c_str(), score);
}
else
{
log(Error, "setSortSet Not expected reply type received for ZADD: %u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "setSortSet failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 添加多个元素到key
bool RedisManager::setAnySortSet(PRedisContext& pRD, const std::string& key, const map<std::string, std::string>& scores)
{
ostringstream cmdStr;
cmdStr << "ZADD " << key << " ";
for (map<string, string>::const_iterator it = scores.begin(); it != scores.end(); ++it)
{
if (it->first != "" && it->second != "")
{
cmdStr << it->second << " " << it->first << " ";
}
else
{
log(Warn, "setAnySortSet: Empty mem or score. mem: %s, score: %s", it->first.c_str(), it->second.c_str());
}
}
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "setAnySortSet key %s scores size %d ! reply:%u", key.c_str(), scores.size(), reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ERROR == reply->type)
{
log(Error, "setAnySortSet return error, sortedSet: %s, size: %u", key.c_str(), scores.size());
}
else
{
log(Error, "setAnySortSet Not expected reply type received for ZADD: %u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "setAnySortSet failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 给指定元素增加incrScore的值
bool RedisManager::incrSortSet(PRedisContext& pRD, const std::string& key, const std::string& mem, const uint32_t& incrScore)
{
ostringstream cmdStr;
cmdStr << "ZINCRBY " << key << " " << incrScore << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str());
cTime.Log(pRD, "incrSortSet key %s incrScore %u ! reply:%u", key.c_str(), incrScore, reply);
if (reply)
{
if (REDIS_REPLY_STRING == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ERROR == reply->type)
{
log(Error, "incrSortSet return error, sortedSet: %s, incrScore: %u, member: %s", key.c_str(), incrScore, mem.c_str());
}
else
{
log(Error, "incrSortSet Not expected reply type received for ZINCRBY: %u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "incrSortSet failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
//减少资源的排序值
bool RedisManager::decrScore(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& decrScore)
{
ostringstream cmdStr;
cmdStr << "ZINCRBY " << key << " -" << decrScore << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str());
cTime.Log(pRD, "decrScore key %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (REDIS_REPLY_STRING == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ERROR == reply->type)
{
log(Warn, "decrScore return error, sortedSet: %s, decrScore: %u, member: %s", key.c_str(), decrScore, mem.c_str());
}
else
{
log(Warn, "decrScore Not expected reply type received for ZINCRBY: %u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "decrScore failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 删除指定资源
bool RedisManager::remMemFromSortedSet(PRedisContext& pRD, const std::string& key, const std::string& mem)
{
ostringstream cmdStr;
cmdStr << "ZREM " << key << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str());
cTime.Log(pRD, "remMemFromSortedSet %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "remMemFromSortedSet Wrong reply received. Type: %d, key: %s, mem: %s", reply->type, key.c_str(), mem.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "remMemFromSortedSet failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 删除资源排序值域的资源
bool RedisManager::remRangeByScore(PRedisContext& pRD, const std::string& key, const uint32_t& min, const uint32_t& max)
{
if (min > max)
{
log(Warn, "min %u greater than max %u", min, max);
return false;
}
ostringstream cmdStr;
cmdStr << "ZREMRANGEBYSCORE " << key << " " << min << " " << max;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "remRangeByScore %s ! reply:%u", key.c_str(), reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "remRangeByScore Wrong reply received. Type: %d, key: %s, min: %u, max: %u", reply->type, key.c_str(), min, max);
}
freeReplyObject(reply);
return false;
}
log(Warn, "remRangeByScore failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 返回指定资源的排序值
bool RedisManager::getScoreByMember(PRedisContext& pRD, const std::string& key, const std::string& member, uint32_t& score)
{
score = 0;
ostringstream cmdStr;
cmdStr << "ZSCORE " << key << " " << member;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getScoreByMember %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_STRING == reply->type)
{
score = strtoul(reply->str, 0, 0);
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getScoreByMember Wrong reply received. Type: %d,. sortedSet: %s, member: %s", key.c_str(), member.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "getScoreByMember failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取资源的降序排名
bool RedisManager::getDescRankByMember(PRedisContext& pRD, const std::string& key, const std::string& member, uint32_t& rank)
{
ostringstream cmdStr;
cmdStr << "ZREVRANK " << key << " " << member;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getDescRankByMember %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
rank = 0;
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_INTEGER == reply->type)
{
rank = reply->integer;
++rank;
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "getDescRankByMember Wrong reply received. Type: %d,. sortedSet: %s, member: %s", reply->type, key.c_str(), member.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "getDescRankByMember failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 获取资源升序排名
bool RedisManager::getAscRankByMember(PRedisContext& pRD, const std::string& key, const std::string& member, uint32_t& rank)
{
ostringstream cmdStr;
cmdStr << "ZRANK " << key << " " << member;
RedisCmdTimeTesting cTime;
redisReply *reply = (redisReply*)redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getAscRankByMember %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
rank = 0;
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_INTEGER == reply->type)
{
rank = reply->integer;
++rank;
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "getAscRankByMember Wrong reply received. Type: %d,. sortedSet: %s, member: %s", reply->type, key.c_str(), member.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "getAscRankByMember failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (二进制)给指定元素增加incrScore的值
bool RedisManager::incrSortSetBit(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& incrScore)
{
ostringstream cmdStr;
cmdStr << "ZINCRBY " << key << " " << incrScore << " %b";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str(), mem.size());
cTime.Log(pRD, "incrSortSetBit key %s incrScore %u ! reply:%u", key.c_str(), incrScore, reply);
if (reply)
{
if (REDIS_REPLY_STRING == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ERROR == reply->type)
{
log(Error, "incrSortSetBit return error, sortedSet: %s, incrScore: %u", key.c_str(), incrScore);
}
else
{
log(Error, "incrSortSetBit Not expected reply type received for ZINCRBY: %u", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "incrSortSetBit failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 从大到小获取指定段的数据, 0 score 会被过滤掉
bool RedisManager::getDescSortedData(PRedisContext& pRD, const std::string& key, const uint32_t& offset, const uint32_t& limit, std::vector<SortData>& datas)
{
uint32_t stopIndex = offset + limit - 1;
ostringstream cmdStr;
cmdStr << "ZREVRANGE " << key << " " << offset << " " << stopIndex << " WITHSCORES";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getDescSortedData %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ARRAY == reply->type)
{
if (reply->elements > 0)
{
for (uint32_t i = 0; i < reply->elements - 1; )
{
SortData data;
data.m_score = reply->element[i++]->str;
data.m_number = reply->element[i++]->str;
//过滤掉<=0的score
#if 0
if (ws::atou32(data.m_number.c_str()) > 0)
{
datas.push_back(data);
}
else
{
log(Info, "getDescSortedData Not accepted score: %d, id: %u", data.m_score.c_str(), data.m_number.c_str());
}
#endif
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getDescSortedData Wrong reply received. Type: %d, key: %s, offset: %u, limit: %u", reply->type, key.c_str(), offset, limit);
}
freeReplyObject(reply);
return false;
}
log(Warn, "getDescSortedData failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 从大到小获取指定段的数据, 0 score 不会被过滤掉
bool RedisManager::getDescSortedAllData(PRedisContext& pRD, const std::string& key, const uint32_t& offset, const uint32_t& limit, std::vector<SortData>& datas)
{
uint32_t stopIndex = offset + limit - 1;
ostringstream cmdStr;
cmdStr << "ZREVRANGE " << key << " " << offset << " " << stopIndex << " WITHSCORES";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getDescSortedAllData %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ARRAY == reply->type)
{
if (reply->elements > 0)
{
for (uint32_t i = 0; i < reply->elements - 1; )
{
SortData data;
data.m_score = reply->element[i++]->str;
data.m_number = reply->element[i++]->str;
datas.push_back(data);
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getDescSortedAllData Wrong reply received. Type: %d, key: %s, offset: %u, limit: %u", reply->type, key.c_str(), offset, limit);
}
freeReplyObject(reply);
return false;
}
log(Warn, "getDescSortedAllData failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (升序)从小到大获取指定数据段的数据, 0 score 会被过滤掉
bool RedisManager::getAscSortedData(PRedisContext& pRD, const string& key, const uint32_t& start, const uint32_t& end, std::vector<SortData> &datas)
{
ostringstream cmdStr;
cmdStr << "ZRANGE " << key << " " << start << " " << end << " WITHSCORES";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getAscSortedData %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ARRAY == reply->type)
{
if (reply->elements > 0)
{
for (uint32_t i = 0; i < reply->elements - 1; )
{
SortData data;
data.m_score = reply->element[i++]->str;
data.m_number = reply->element[i++]->str;
#if 0
//过滤掉<=0的score
if (ws::atou32(data.m_number.c_str()) > 0)
{
datas.push_back(data);
}
else
{
log(Info, "getAscSortedData Not accepted score: %s, id: %s", data.m_score.c_str(), data.m_number.c_str());
}
#endif
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getAscSortedData Wrong reply received. Type: %d, key: %s, offset: %u, limit: %u", reply->type, key.c_str(), start, end);
}
freeReplyObject(reply);
return false;
}
log(Warn, "getAscSortedData failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (升序)从小到大获取指定数据段的数据, 0 score 不会被过滤掉
bool RedisManager::getAscSortedAllData(PRedisContext& pRD, const std::string& key, const uint32_t& start, const uint32_t& end, std::vector<SortData>& datas)
{
ostringstream cmdStr;
cmdStr << "ZRANGE " << key << " " << start << " " << end << " WITHSCORES";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str());
cTime.Log(pRD, "getAscSortedAllData %s ! reply:%u", cmdStr.str().c_str(), reply);
if (reply)
{
if (REDIS_REPLY_NIL == reply->type)
{
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_ARRAY == reply->type)
{
if (reply->elements > 0)
{
for (uint32_t i = 0; i < reply->elements - 1; )
{
SortData data;
data.m_score = reply->element[i++]->str;
data.m_number = reply->element[i++]->str;
datas.push_back(data);
}
}
freeReplyObject(reply);
return true;
}
else
{
log(Error, "getAscSortedAllData Wrong reply received ");
}
freeReplyObject(reply);
return false;
}
log(Warn, "getAscSortedAllData failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (事务)添加元素
bool RedisManager::setSortedSet_T(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& score)
{
//bool retCode = true;
ostringstream cmdStr;
cmdStr << "ZADD " << key << " " << score << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str(), mem.size());
cTime.Log(pRD, "setSortedSet_T ZADD key %s score %u size %d reply %u", key.c_str(), score, mem.size(), reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "QUEUED") == 0)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "setSortedSet_T in transaction return failure, key: %s", key.c_str());
}
}
else
{
log(Warn, "setSortedSet_T Wrong reply type received! Type: %d, key: %s", reply->type, key.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "setSortedSet_T failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (事务)删除资源
bool RedisManager::decrScore_T(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& decrScore)
{
ostringstream cmdStr;
cmdStr << "ZINCRBY " << key << " -" << decrScore << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str());
cTime.Log(pRD, "decrScore_T ZINCRBY %s -%u !reply:%u", key.c_str(), decrScore, reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "QUEUED") == 0)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "decrScore_T in transaction return failure");
}
}
else
{
log(Warn, "decrScore_T Wrong reply type received! Type: %d, key: %s", reply->type, key.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "decrScore_T failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// (事务)添加资源值
bool RedisManager::incrMemScore_T(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& incrScore)
{
ostringstream cmdStr;
cmdStr << "ZINCRBY " << key << " " << incrScore << " %s";
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, cmdStr.str().c_str(), mem.c_str());
cTime.Log(pRD, "incrMemScore_T key %s mem %s score %u ! reply:%u", key.c_str(), mem.c_str(), incrScore, reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
if (strcmp(reply->str, "QUEUED") == 0)
{
freeReplyObject(reply);
return true;
}
else
{
log(Error, "incrMemScore_T in transaction return failure, key: %s", key.c_str());
}
}
else
{
log(Warn, "incrMemScore_T Wrong reply type received! Type: %d, key: %s", reply->type, key.c_str());
}
freeReplyObject(reply);
return false;
}
log(Warn, "incrMemScore_T failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// --- Transaction(事务)
// 开始事务,标记事务开始
bool RedisManager::beginTransaction(PRedisContext& pRD)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "MULTI");
cTime.Log(pRD, "beginTransaction MULTI reply %u", reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "MULTI failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 结束事务,执行所有事务
bool RedisManager::endTransaction(PRedisContext& pRD, bool& isAborted)
{
isAborted = false;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "EXEC");
cTime.Log(pRD, "endTransaction EXEC reply %u", reply);
if (reply)
{
if (REDIS_REPLY_ARRAY == reply->type)
{
isAborted = true;
freeReplyObject(reply);
return true;
}
else if (REDIS_REPLY_NIL == reply->type)
{
isAborted = true;
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "EXEC failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 事务结束,执行所有事务
bool RedisManager::endTransaction(PRedisContext& pRD, bool& isAborted, redisReply** pReply)
{
isAborted = false;
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "EXEC");
cTime.Log(pRD, "endTransaction EXEC reply %u", reply);
if (reply)
{
if (REDIS_REPLY_ARRAY == reply->type)
{
isAborted = true;
*pReply = reply;
return true;
}
else if (REDIS_REPLY_NIL == reply->type)
{
isAborted = true;
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
*pReply = reply;
return false;
}
log(Warn, "EXEC failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 取消事务,取消执行模块内的所有事务
bool RedisManager::discardTransaction(PRedisContext& pRD)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "DISCARD");
cTime.Log(pRD, "discardTransaction DISCARD ! reply:%u", reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "DISCARD failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 监控一个key
bool RedisManager::watch(PRedisContext& pRD, const std::string& key)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "WATCH %s", key.c_str());
cTime.Log(pRD, "watch WATCH ! reply:%u", reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "Watch key %s failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 监控一些key
bool RedisManager::watch(PRedisContext& pRD, const vector<std::string>& keys)
{
std::vector<const char *> argv(keys.size() + 1);
std::vector<size_t> argvlen(keys.size() + 1);
uint32_t j = 0;
static char sremcmd[] = "WATCH";
argv[j] = sremcmd;
argvlen[j] = sizeof(sremcmd) - 1;
++j;
for (std::vector<string>::const_iterator i = keys.begin(); i != keys.end(); ++i, ++j)
{
argv[j] = i->c_str();
argvlen[j] = i->size();
}
RedisCmdTimeTesting cTime;
redisReply* reply = (redisReply*)redisCommandArgv(pRD, argv.size(), &(argv[0]), &(argvlen[0]));
cTime.Log(pRD, "watch WATCH ! reply:%u", reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "Watch key failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 取消监控key
bool RedisManager::unwatch(PRedisContext& pRD)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "UNWATCH");
cTime.Log(pRD, "unwatch UNWATCH reply %u", reply);
if (reply)
{
if (REDIS_REPLY_STATUS == reply->type)
{
freeReplyObject(reply);
return true;
}
else
{
log(Warn, "Wrong reply received. Type: %d", reply->type);
}
freeReplyObject(reply);
return false;
}
log(Warn, "UnWatch failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// --- lua 脚本控制
// 加载脚本,指定脚本对应一个key
bool RedisManager::loadScript(PRedisContext& pRD, const std::string& key, const std::string& script)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "SCRIPT LOAD %s", script.c_str());
cTime.Log(pRD, "loadScript LOAD ! reply:%u", reply);
if (reply)
{
if (REDIS_REPLY_STRING == reply->type)
{
string checkSum = reply->str; // 获取校验和
setScriptInfo(pRD, key, checkSum);
freeReplyObject(reply);
log(Info, "loadScript LOAD success! checkNum:%s", checkSum.c_str());
return true;
}
freeReplyObject(reply);
}
log(Warn, "loadScript LOAD failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 检查key指定的脚本是不是已经加载
bool RedisManager::existsScript(PRedisContext& pRD, const std::string& key)
{
string checkSum;
getScriptInfo(pRD, key, checkSum);
if (checkSum.compare("") != 0)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "SCRIPT EXIST %s", checkSum.c_str());
cTime.Log(pRD, "existsScript SCRIPT EXIST ! reply:%u", reply);
if (reply)
{
if (REDIS_REPLY_INTEGER == reply->type)
{
if (1 == reply->integer)
{
freeReplyObject(reply);
return true;
}
}
freeReplyObject(reply);
return false;
}
log(Warn, "existsScript SCRIPT EXIST failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
return false;
}
// 获取已经加载的所有的脚本key
bool RedisManager::getAllScriptKeys(PRedisContext& pRD, std::vector<string> &keys)
{
return getScriptAllKeys(pRD, keys);
}
// 清理所有已经加载的脚本缓存
bool RedisManager::clearAllScript(PRedisContext& pRD)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "SCRIPT FLUSH");
cTime.Log(pRD, "SCRIPT FLUSH ! reply:%u", reply);
if (reply)
{
clearScriptAllKeys(pRD);
freeReplyObject(reply);
return true;
}
log(Warn, "SCRIPT FLUSH failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// 执行脚本,outValue表示返回数据
bool RedisManager::exceScript(PRedisContext& pRD, const std::string& key, std::vector<std::string> &outValue)
{
string checkSum;
getScriptInfo(pRD, key, checkSum);
log(Warn, "exceScript 1 %s ", checkSum.c_str());
if (checkSum.compare("") != 0)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "EVALSHA %s 0", checkSum.c_str());
cTime.Log(pRD, "exceScript EVALSHA! reply:%u", reply);
if (reply)
{
switch (reply->type)
{
case REDIS_REPLY_STRING:// 返回值为字符串
{
string value = reply->str; // 获取校验和
outValue.push_back(value);
break;
}
case REDIS_REPLY_INTEGER: // 返回的是整数
{
string value = ws::toString(reply->integer);
outValue.push_back(value);
break;
}
case REDIS_REPLY_ARRAY: // 返回的是数组
{
if (reply->elements > 0)
{
for (uint32_t i = 0; i < reply->elements - 1; )
{
outValue.push_back(reply->element[i++]->str);
}
}
break;
}
}
freeReplyObject(reply);
return true;
}
log(Warn, "exceScript failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
return false;
}
// 终止当前正在执行的脚本(且当该脚本没有执行写操作才能成功)
bool RedisManager::stopRuningScript(PRedisContext& pRD)
{
RedisCmdTimeTesting cTime;
redisReply *reply = redisCommand(pRD, "SCRIPT KILL");
cTime.Log(pRD, "SCRIPT KILL ! reply:%u", reply);
if (reply)
{
clearScriptAllKeys(pRD);
freeReplyObject(reply);
return true;
}
log(Warn, "SCRIPT KILL failed! error:%d, error_info:%s", pRD->err, pRD->errstr);
return false;
}
// -----------------reids执行报告----------------------
// 初始化redis执行报告; pid:为0表示监视所有线程,其他表示单一线程
uint32_t RedisManager::startRedisReport(uint32_t pid)
{
//if (0 == pid)
//{
// pid = (uint32_t)pthread_self(); // 当前线程
//}
m_RedisReport.m_State = 1; // 开启
m_RedisReport.m_Pid = pid; // 线程id
m_RedisReport.m_RunCount = 0;
m_RedisReport.m_RunTime = 0;
m_RedisReport.m_StartTime = time(NULL);
m_RedisReport.m_KeyInfo.clear();
log(Info, "First time init for new reportRedisStruct in thread %u.", pid);
return pid;
}
// 关闭redis报告记录
void RedisManager::stopRedisReport()
{
m_RedisReport.m_State = 0; // 关闭
}
// 打印执行报告
std::string RedisManager::getRedisReport()
{
std::stringstream os;
os << "[redis report!]:";
os << "state:%u " << m_RedisReport.m_State;
os << "pid:%u " << m_RedisReport.m_Pid;
os << "run count:%u " << m_RedisReport.m_RunCount;
os << "run time:%lu " << m_RedisReport.m_RunTime;
os << "start time:%lu " << m_RedisReport.m_StartTime;
os << "cur time:%lu " << (uint64_t)time(NULL);
os << "key info:[ ";
map<std::string, map<std::string, uint32_t> >::iterator itF = m_RedisReport.m_KeyInfo.begin();
for (; itF != m_RedisReport.m_KeyInfo.end(); ++itF)
{
os << "(" << itF->first << ":" << itF->second["time"] << ":" << itF->second["count"] << ");";
}
os << "] ";
return os.str();
}
// 产生报告
void RedisManager::buildReport(std::string key, timeval startTime, timeval endTime)
{
// 检查是不是开启检测
if (0 == m_RedisReport.m_State)
{
return;
}
// 判断是不是全线程
if (0 != m_RedisReport.m_Pid)
{
uint32_t pid = (uint32_t)pthread_self(); // 当前线程
if (pid != m_RedisReport.m_Pid)
{
return; // 不检测
}
}
// 一条记录的执行时间
uint64_t uTime = (endTime.tv_sec - startTime.tv_sec) * 1000 * 1000 + (endTime.tv_usec - startTime.tv_usec);
m_RedisReport.m_RunCount++; // 执行语句总数加1
m_RedisReport.m_RunTime += uTime; // 执行时间相加
m_RedisReport.m_KeyInfo[key]["time"] += uTime;
m_RedisReport.m_KeyInfo[key]["count"] ++;
}
// 保存脚本<key,校验和>,已经存在的key不允许更改
bool RedisManager::setScriptInfo(PRedisContext& pRD, const std::string& key, const std::string& checkSum)
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 建立脚本映射关系
map<PRedisContext, map<string, string> >::iterator it_Script = m_Script.find(pRD);
if (it_Script == m_Script.end())
{
map<string, string> script_map;
script_map.insert(make_pair(key, checkSum));
m_Script[pRD] = script_map;
}
else
{
map<string, string>::iterator it_second = m_Script[pRD].find(key);
if (it_second == m_Script[pRD].end())
{
m_Script[pRD][key] = checkSum;
}
}
return true;
}
// 获取目标脚本校验和
bool RedisManager::getScriptInfo(PRedisContext& pRD, const std::string& key, std::string& checkSum)
{
checkSum ="";
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
// 建立脚本映射关系
map<PRedisContext, map<string, string> >::iterator it_Script = m_Script.find(pRD);
if (it_Script != m_Script.end())
{
map<string, string>::iterator it_second = m_Script[pRD].find(key);
if (it_second != m_Script[pRD].end())
{
checkSum = m_Script[pRD][key];
return true;
}
}
return false;
}
// 获取所有的keys
bool RedisManager::getScriptAllKeys(PRedisContext& pRD, std::vector<string> &keys)
{
keys.clear();
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
map<PRedisContext, map<string, string> >::iterator it_Script = m_Script.find(pRD);
if (it_Script != m_Script.end())
{
map<string, string>::iterator it_second = m_Script[pRD].begin();
for (; it_second != m_Script[pRD].end(); ++it_second)
{
keys.push_back(it_second->first);
}
}
return true;
}
// 清理缓存
bool RedisManager::clearScriptAllKeys(PRedisContext& pRD)
{
std::lock_guard<std::mutex> lock(m_RedisLock); // 加锁
map<PRedisContext, map<string, string> >::iterator it_Script = m_Script.find(pRD);
if (it_Script != m_Script.end())
{
m_Script[pRD].clear();
}
return true;
}
bool RedisManager::clean_all_server(void)
{
return true;
}
bool RedisManager::load_servers(const std::set <std::string> & server_list)
{
std::lock_guard<std::mutex> lock(server_online_mutex);
std::cout << "cfg.json文件,初始化数据代理服务器在线状态:" << std::endl;
for(std::set<std::string>::iterator it=server_list.begin (); it!=server_list.end (); it++)
{
std::string server_ip = *it;
server_status ser_status;
memset(&ser_status, 0, sizeof(ser_status));
ser_status._since_enpoch = 0;
ser_status.gateway_count = 0;
proxy_server_online.insert(std::make_pair(server_ip, ser_status));
}
//std::cout << "注册服务器" << std::endl;
return true;
}
bool RedisManager::print_serverlist(void)
{
std::cout << "打印数据代理服务器在线状态:" << std::endl;
for(std::map<std::string, RedisManager::server_status>::iterator it=RedisManager::singleton()->proxy_server_online.begin ();
it!=RedisManager::singleton()->proxy_server_online.end (); it++)
{
RedisManager::server_status status;
status = it->second;
std::cout << it->first << " : " << status._since_enpoch << status.gateway_count << std::endl;
}
return true;
}
bool RedisManager::unregister_server(const std::string & server_ip)
{
//从redis 移除所有属于ip 的网关的注册
RedisManager::server_status status;
uint32_t key = RedisManager::singleton()->CreateContext("127.0.0.1",6379);
PRedisContext pRD = RedisManager::singleton()->getRedis(key);
RedisManager::singleton()->selectDB(pRD, 0);
if (pRD != NULL)
{
std::set<std::string> gateway;
if (true == getAnySetValue(pRD, server_ip, gateway)) {
for(set<std::string>::iterator it1=gateway.begin (); it1!=gateway.end (); it1++) {
delKey(pRD, *it1);
}
delKey(pRD, server_ip);
}
}
else {
return false;
}
//设置ip离线
status._since_enpoch = 0;
status.gateway_count = 0;
proxy_server_online[server_ip] = status;
return true;
}
bool RedisManager::unregister_allserver(const std::set <std::string> & server_list)
{
for(std::set<std::string>::iterator it=server_list.begin (); it!=server_list.end (); it++)
{
std::string server_ip = *it;
unregister_server(server_ip);
}
return true;
}
bool RedisManager::manage_server_online(void)
{
std::lock_guard<std::mutex> lock(server_online_mutex);
chrono::time_point<chrono::system_clock, chrono::microseconds> now = chrono::time_point_cast<chrono::microseconds>(
chrono::system_clock::now());
int64_t microSeconds = now.time_since_epoch().count();
for(std::map<std::string, RedisManager::server_status>::iterator it=RedisManager::singleton()->proxy_server_online.begin ();
it!=RedisManager::singleton()->proxy_server_online.end (); it++)
{
RedisManager::server_status status = it->second;
//std::cout << it->first << " : " << status._since_enpoch << status.gateway_count << std::endl;
/* 服务器掉线,则消除属于该服务器的所有网关的注册 */
if ((status._since_enpoch > 0 ) && ((status._since_enpoch + timeout) < microSeconds)) {
std::cout << "proxy server : " << it->first << "should offline" << std::endl;
//从redis 移除所有属于ip 的网关的注册
uint32_t key = RedisManager::singleton()->CreateContext("127.0.0.1",6379);
PRedisContext pRD = RedisManager::singleton()->getRedis(key);
RedisManager::singleton()->selectDB(pRD, 0);
if (pRD != NULL)
{
std::set<std::string> gateway;
if (true == getAnySetValue(pRD, it->first, gateway)) {
for(set<std::string>::iterator it1=gateway.begin (); it1!=gateway.end (); it1++) {
delKey(pRD, *it1);
}
delKey(pRD, it->first);
}
}
//设置ip离线
status._since_enpoch = 0;
status.gateway_count = 0;
proxy_server_online[it->first] = status;
}
if ((status._since_enpoch + timeout) > microSeconds) {
std::cout << "proxy server : " << it->first << "online" << std::endl;
}
}
return true;
}
bool RedisManager::update_server_Timestamp(std::string server_ip, int server_port)
{
std::cout << "刷新合法服务器的心跳时间戳" << std::endl;
chrono::time_point<chrono::system_clock, chrono::microseconds> now = chrono::time_point_cast<chrono::microseconds>(
chrono::system_clock::now());
int64_t microSeconds = now.time_since_epoch().count();
RedisManager::server_status ser_status = RedisManager::singleton()->proxy_server_online[server_ip + "-" + std::to_string(server_port)];
ser_status._since_enpoch = microSeconds;
RedisManager::singleton()->proxy_server_online[server_ip + "-" + std::to_string(server_port)] = ser_status;
RedisManager::server_status ser_statusssss = RedisManager::singleton()->proxy_server_online[server_ip + "-" + std::to_string(server_port)];
cout << "代理服务器multi" << server_ip + "-" + std::to_string(server_port) << "心跳时间戳为:" << ser_statusssss._since_enpoch << std::endl;
return true;
}
bool RedisManager::get_server_online(std::string & serve_ip, std::string & id)
{
std::lock_guard<std::mutex> lock(server_online_mutex);
//查询redis,注册则返回老的注册服务器ip port。不存在则新注册。
int server_min_gateway_count = INT_MAX;
std::string ip;
RedisManager::server_status status;
for(std::map<std::string, RedisManager::server_status>::iterator it=RedisManager::singleton()->proxy_server_online.begin ();
it!=RedisManager::singleton()->proxy_server_online.end (); it++)
{
status = it->second;
if ((status._since_enpoch > 1000) && (server_min_gateway_count > status.gateway_count) ) {
server_min_gateway_count = status.gateway_count;
ip = it->first;
std::cout << "尝试获取在线ip: " << it->first << std::endl;
}
else {
//std::cout << "时间戳" << status._since_enpoch << "当前连接数量" << server_min_gateway_count << "遍历连接数量" << status.gateway_count << std::endl;
}
//std::cout << "变量服务器ip" << it->first << "时间戳" << status._since_enpoch << "在线数量" << status.gateway_count << std::endl;
}
if (server_min_gateway_count == INT_MAX) {
return false;
}
//不存在,则新注册到redis
bool if_id_exist;
uint32_t key = RedisManager::singleton()->CreateContext("127.0.0.1",6379);
PRedisContext pRD = RedisManager::singleton()->getRedis(key);
RedisManager::singleton()->selectDB(pRD, 0);
if (false == RedisManager::singleton()->isKeyExist(pRD, id, if_id_exist)) {
//注册id到服务器,id:ip;ip-id
if (true == RedisManager::singleton()->setStrValue(pRD, id, ip)) {
if (true == RedisManager::singleton()->setSetValue(pRD, ip, id)) {
serve_ip = ip;
std::cout << "网关id=[" << id << "]" << "注册成功" <<std::endl;
return true;
}
else {
RedisManager::singleton()->delKey(pRD, id);
return false;
}
}
else {
return false;
}
}
else {
std::cout << "网关id=[" << id << "]" << "已注册" <<std::endl;
serve_ip = ip;
return true;
}
}
/*
* redis_interface.h
* Author: jacky
* Data: 2019-4-12
*
*/
#pragma once
#include <string>
#include <map>
#include <set>
#include <time.h>
#include <vector>
#include <mutex>
#include <limits.h>
#include <condition_variable>
#include <hiredis/hiredis.h>
using namespace std;
namespace ws_redis
{
#define WS_REDIS_LOG_LENGTH_MAX 528 // redis log最大长度
#define WS_REDIS_CONTEXT_HASH_MARK 5381 // redis连接hash查找掩码
typedef redisContext* PRedisContext;
//--------------------------------------------------------
// reids命令执行时间检测器
//--------------------------------------------------------
class RedisCmdTimeTesting
{
public:
RedisCmdTimeTesting();
virtual ~RedisCmdTimeTesting();
void Log(PRedisContext pRD, const char* fmt, ...); // cmd执行命令log
private:
std::string m_log; // 日志
timeval m_begin; // 起始时间
timeval m_end; // 结束时间
};
//--------------------------------------------------------
// reids 执行报告记录器
//--------------------------------------------------------
struct RedisReport_t
{
uint32_t m_State; // 报告状态0表示关闭,1表示开启
uint32_t m_Pid; // 监控线程pid
uint32_t m_RunCount; // 执行总数
uint64_t m_RunTime; // 执行时间(所有语句执行的时间和)
uint64_t m_StartTime; // 起始时间
map<std::string, map<std::string,uint32_t> > m_KeyInfo; // 同一条语句执行的次数和时间<key,<"time"/"count",>>
RedisReport_t() :m_State(0), m_Pid(0), m_RunCount(0), m_StartTime(0)
{
m_KeyInfo.clear();
}
};
struct SortData
{
std::string m_number; //对应sortedSet的member
std::string m_score; //对应sortedSet的score
};
//--------------------------------------------------------
// reids 连接管理器
//--------------------------------------------------------
class RedisManager
{
private:
static RedisManager *_instance;
public:
static RedisManager *singleton();
RedisManager() {
proxy_server_online.clear();
}
~RedisManager() {};
public:
// -----------------redis连接相关---------------------
uint32_t CreateContext(std::string ip, uint32_t port); // 创建链接
uint32_t CreateContextWithOutTime(std::string ip, uint32_t port, uint32_t time); // 建立链接,带超时时间
bool CloseContext(uint32_t index_link); // 指定管理某个链接
bool CloseAllContext(); // 关闭所有链接
PRedisContext getRedis(std::string key); // 根据key获取redis连接(多连接情况下)通过hash key找到该key存储的redis连接,并发情况下可以提高并发率
PRedisContext getRedis(uint32_t index_link); // 直接指明hashmap中第几个redis连接
bool selectDB(PRedisContext& pRD, uint32_t index_db); // 获取指定redis连接中的第N个DB
// -----------------tool工具--------------------------
std::string contextAddrInfo(PRedisContext& pRD); // 链接信息
std::string replayInfo(redisReply* pReply); // 报告信息
std::string replaceCmdCrlf(const char* format); // 将命令中的一些符号替换掉
// -----------------redis命令相关---------------------
redisReply* redisCommand(PRedisContext& pRD, const char *format, ...);
// key
bool getAllKeys(PRedisContext& pRD, set<std::string>& value, std::string strMatching = "*"); // (消耗性能)获取数据库中所有的key; value: 为返回的所有key集合, strMatching:为查找匹配,默认为"*"所有,可以为如"t[w]*"表达式
bool isKeyExist(PRedisContext& pRD, const string& key, bool& keyExist); // 检查key是否存在
bool delKey(PRedisContext& pRD, const std::string& key); // 删除指定的key
bool keyType(PRedisContext& pRD, const std::string& key, std::string& type); // 获取key的类型;返回值:none(key不存在),std::string(字符串),list(列表),set(集合),zset(有序集),hash(哈希表)
bool setKeyTTL(PRedisContext& pRD, const std::string& key, uint32_t time); // 设置key的生存时间,单位s
bool expireAt(PRedisContext& pRD, const std::string& key, time_t calTime); // 更新key的生存时间, 单位time;
uint32_t delKey_T(PRedisContext& pRD, const std::string& key); // 事务模式:删除指定的key, 返回0表示失败, 1表示成功, 2表示存入队列(因为有事务处理)
// lock 模拟,阻塞(最大阻塞2秒)
bool lock(PRedisContext& pRD, const std::string &key, uint32_t ttl); // 加锁
bool unlock(PRedisContext& pRD, const std::string &key); // 解锁
//--- string
bool setStrValue(PRedisContext& pRD, const std::string& key, const std::string& value); // 设置key-value
bool setStrValue(PRedisContext& pRD, const std::string& key, const std::string& value, const uint32_t& time);// 设置key-value,带时间
bool getStrValue(PRedisContext& pRD, const std::string& key, std::string& value); // 获取key-value
bool setAnyValue(PRedisContext &pRD, map<std::string, std::string> value); // 一次插入多个值
bool getAnyValue(PRedisContext& pRD, const set<std::string>& setKeys, map<std::string, std::string>& outMapInfo); // 一次获取多个数据的值
bool incrValue(PRedisContext& pRD, const std::string& key, int incrNum = 1); // 每次增加指定值,默认为1
bool incrValue(PRedisContext& pRD, const std::string& key, long long &outValue, int incrNum = 1); // 每次增加指定值,默认为1,返回增加后的值
bool decrValue(PRedisContext& pRD, const std::string& key, int decrNum = 1); // 每次减去指定值,默认为1
bool decrValue(PRedisContext& pRD, const std::string& key, long long &outValue, int decrNum = 1); // 每次减去指定值,默认为1,返回增加后的值
//--- list
bool getListLen(PRedisContext& pRD, const std::string& key, uint32_t& len); // 获取list长度
bool lPushList(PRedisContext& pRD, const std::string& key, std::string& value); // 插入表头,表为空创建
bool rPushList(PRedisContext& pRD, const std::string& key, std::string& value); // 插入表尾,表为空创建
bool lPushXList(PRedisContext& pRD, const std::string& key, std::string& value); // 插入表头,表为空不创建
bool rPushXList(PRedisContext& pRD, const std::string& key, std::string& value); // 插入表尾,表为空创建
bool lPushAnyList(PRedisContext& pRD, const std::string& key, vector<std::string>& values); // 插入多个值到表头,先插入数据在表尾
bool rPushAnyList(PRedisContext& pRD, const std::string& key, vector<std::string>& values); // 插入多个值到表尾,先插入数据在表头
bool lPushGetLen(PRedisContext& pRD, const std::string& key, std::string& value, uint32_t& len); // 插入表头并获取长度
bool rPushGetLen(PRedisContext& pRD, const std::string& key, std::string& value, uint32_t& len); // 插入表尾并获取长度
bool rPushListBit(PRedisContext& pRD, const std::string& key, std::string& binMem); // (二进制)插入单个字符列到list尾部,如"a b c"
bool lPopListBit(PRedisContext& pRD, const std::string& key, std::string& binMem, bool& keyExist); // (二进制)移除字符列, keyExist 判断是否获取字符
bool rPopList(PRedisContext& pRD, const std::string& key, std::string& value, bool& keyExist); // 移除表尾的元素
bool lPopList(PRedisContext& pRD, const std::string& key, std::string& value, bool& keyExist); // 移除表头的元素
bool lremList(PRedisContext& pRD, const std::string& key, std::string& value, std::string count = "1"); // 移除与value值相同的元素,count为移除个数(正数表示从前到后移除n个,负数表示从后往前移除n个,0表示移除所有)
bool lrangeList(PRedisContext& pRD, const std::string& key, std::vector<std::string>& result, uint32_t startPos = 0, uint32_t endPos = 0); // 返回指定区域内的元素列表
//--- hash
bool setHashValue(PRedisContext& pRD, const std::string& key, std::string field, std::string strValue); // 设置hash值
bool getHashValue(PRedisContext& pRD, const std::string& key, std::string field, std::string& strValue);// 获取hash值
bool setAnyHashValue(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& values);// 设置多个值
bool getAnyHashValue(PRedisContext& pRD, const std::string& key, vector<std::string>& fields, map<std::string, std::string>& values); // 获取多个值
bool getHashAllValue(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& valueMap); // 获取所有hash值对
bool getHashAllFields(PRedisContext& pRD, const std::string& key, vector<std::string>& fields); // 获取目标key的所有field
bool setHashValueBit(PRedisContext& pRD, const std::string& key, std::string field, std::string binValue);// (二进制)设置hash值
bool getAnyHashValueBit(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& valueMap);// (二进制)获取所有键值对
bool delHashValue(PRedisContext& pRD, const std::string& key, std::string field); // 删除hash值
bool delHashValue(PRedisContext& pRD, const std::string& key, const set<std::string> & fields, int64_t& rmCount);// 删除多个hash值
bool isMemInHash(PRedisContext& pRD, const std::string& key, std::string field, bool& isMem); // 判断是不是hash成员
bool incrHashValue(PRedisContext& pRD,const std::string& key, std::string field,std::string value = "1");// 在原始值上增加指定值
bool setHashValue_T(PRedisContext& pRD, const std::string& key, map<std::string, std::string>& fieldValue); // (事务) 设置hash值
//--- set
bool isMemInSet(PRedisContext& pRD, const std::string& key, std::string& value, bool& isMem); // 判断是不是set成员
bool getSetCount(PRedisContext& pRD, const std::string& key, uint32_t& count, bool& keyExist); // 获取set元素个数
bool setSetValue(PRedisContext& pRD, const std::string& key, std::string& value); // 添加元素,已经存在的元素被忽略
bool setAnySetValue(PRedisContext& pRD, const std::string& key, set<std::string>& value); // 添加多个值,已经存在的元素被忽略
bool getAnySetValue(PRedisContext& pRD, const std::string& key, set<std::string>& value); // 获取set集合
bool remSetValue(PRedisContext& pRD, const std::string& key, std::string& value); // 移除集合中的元素
bool remAnySetValue(PRedisContext& pRD, const std::string &key, const std::set<std::string> &value); // 移除多个指定元素
bool getSetSunion(PRedisContext& pRD, set<std::string>& keys, set<std::string>& value); // 获取多个key的并集
bool getSetSdiff(PRedisContext& pRD, set<std::string>& keys, set<std::string>& value); // 获取多个key的差集
bool getSetSinter(PRedisContext& pRD, set<std::string>& keys, set<std::string>& value); // 获取多个key的交集
bool setSetValue_T(PRedisContext& pRD, const std::string& key, std::string& value); // (事务)添加集合元素
// --- sortset
bool isMemInSortedSet(PRedisContext& pRD, const std::string& key, const std::string& member, bool& isMem); // 检查目标是不是sortset的资源
bool getCountSortedSet(PRedisContext& pRD, const std::string& key, uint32_t& totalCount); // 获取资源个数
bool setSortSet(PRedisContext& pRD, const std::string& key, std::string& mem, const uint64_t& score); // 添加元素到key
bool setAnySortSet(PRedisContext& pRD, const std::string& key, const map<std::string, std::string>& scores);// 添加多个元素到key
bool incrSortSet(PRedisContext& pRD, const std::string& key, const std::string& mem, const uint32_t& incrScore);// 给指定元素增加incrScore的值
bool decrScore(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& decrScore); //减少资源的排序值
bool remMemFromSortedSet(PRedisContext& pRD, const std::string& key, const std::string& mem); // 删除指定资源
bool remRangeByScore(PRedisContext& pRD, const std::string& key, const uint32_t& min, const uint32_t& max); // 删除资源排序值域的资源
bool getScoreByMember(PRedisContext& pRD, const std::string& key, const std::string& member, uint32_t& score); // 返回指定资源的排序值(注意是值不是排名)
bool getDescRankByMember(PRedisContext& pRD, const std::string& key, const std::string& member, uint32_t& rank); // 获取资源的降序排名
bool getAscRankByMember(PRedisContext& pRD, const std::string& key, const std::string& member, uint32_t& rank); // 获取资源升序排名
bool incrSortSetBit(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& incrScore); // (二进制)给指定元素增加incrScore的值
bool getDescSortedData(PRedisContext& pRD, const std::string& key, const uint32_t& offset, const uint32_t& limit, std::vector<SortData>& datas); // (降序)从大到小获取指定段的数据, 0 score 会被过滤掉
bool getDescSortedAllData(PRedisContext& pRD, const std::string& key, const uint32_t& offset, const uint32_t& limit, std::vector<SortData>& datas); // (降序)从大到小获取指定段的数据, 0 score 不会被过滤掉
bool getAscSortedData(PRedisContext& pRC, const string& key, const uint32_t& start, const uint32_t& end, std::vector<SortData> &datas); // (升序)从小到大获取指定数据段的数据, 0 score 会被过滤掉
bool getAscSortedAllData(PRedisContext& pRD, const std::string& key, const uint32_t& start, const uint32_t& end, std::vector<SortData>& datas); // (升序)从小到大获取指定数据段的数据, 0 score 不会被过滤掉
bool setSortedSet_T(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& score); // (事务)添加元素
bool decrScore_T(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& decrScore); // (事务)删除资源
bool incrMemScore_T(PRedisContext& pRD, const std::string& key, std::string& mem, const uint32_t& incrScore); // (事务)添加资源值
// --- Transaction(事务)
bool beginTransaction(PRedisContext& pRD); // 开始事务,标记事务开始
bool endTransaction(PRedisContext& pRD, bool& isAborted); // 结束事务,执行所有事务
bool endTransaction(PRedisContext& pRD, bool& isAborted, redisReply** pReply); // 事务结束,执行所有事务
bool discardTransaction(PRedisContext& pRD); // 取消事务(回滚),取消执行模块内的所有事务
bool watch(PRedisContext& pRD, const std::string& key); // 监控一个key
bool watch(PRedisContext& pRD, const vector<std::string>& keys); // 监控一些key
bool unwatch(PRedisContext& pRD); // 取消监控key
// --- lua 脚本控制
bool loadScript(PRedisContext& pRD, const std::string& key,const std::string& script);// 加载脚本,指定脚本对应一个key
bool existsScript(PRedisContext& pRD, const std::string& key); // 检查key指定的脚本是不是已经加载
bool getAllScriptKeys(PRedisContext& pRD, std::vector<string> &keys); // 获取已经加载的所有的脚本key
bool clearAllScript(PRedisContext& pRD); // 清理所有已经加载的脚本缓存
bool exceScript(PRedisContext& pRD, const std::string& key, std::vector<std::string> &outValue); // 执行脚本,outValue表示返回数据
bool stopRuningScript(PRedisContext& pRD); // 终止当前正在执行的脚本(且当该脚本没有执行写操作才能成功)
// -----------------reids执行报告----------------------
// 初始化redis执行报告; pid:为0表示监视所有线程,其他表示单一线程
uint32_t startRedisReport(uint32_t pid = 0);
void stopRedisReport(); // 关闭redis报告记录
std::string getRedisReport(); // 打印执行报告
void buildReport(std::string key, timeval startTime, timeval endTime); // 产生报告
public:
typedef struct {
PRedisContext pRD;
string str_ip;
int port;
} PRedisContext_ext;
map<PRedisContext, map<string, string> > m_Script; // 脚本<key,脚本对应的校验和>
map<uint32_t, PRedisContext_ext> m_allRedisConn; // 所有的redis连接
uint32_t m_MaxRedisContext; // redis最大连接个数
RedisReport_t m_RedisReport; // redis执行报告
//mutex m_RedisLock; // 线程锁
std::mutex m_RedisLock;
private:
bool setScriptInfo(PRedisContext& pRD, const std::string& key, const std::string& checkSum); // 保存脚本<key,校验和>,已经存在的key不允许更改
bool getScriptInfo(PRedisContext& pRD, const std::string& key, std::string& checkSum);// 获取目标脚本校验和
bool getScriptAllKeys(PRedisContext& pRD, std::vector<string> &keys);// 获取所有的keys
bool clearScriptAllKeys(PRedisContext& pRD);// 清理缓存
public:
bool clean_all_server(void);
bool load_servers(const std::set <std::string> & server_list);
bool print_serverlist(void);
bool unregister_server(const std::string & server_list);
bool unregister_allserver(const std::set <std::string> & server_ip);
bool manage_server_online(void);
bool update_server_Timestamp(std::string server_ip, int server_port);
bool get_server_online(std::string & serve_ip, std::string & id);
typedef struct server_status {
int64_t _since_enpoch; //数据代理服务器最后接收心跳的时间戳
int gateway_count; //数据代理服务器接入的网关数量
}server_status;
private:
std::mutex server_online_mutex;
std::map<std::string, server_status> proxy_server_online;
const int64_t timeout = 1000 * 1000 * 120;
};
}