目的:
验证redis 集群模式,高可用场景
insert.py
from rediscluster import ClusterBlockingConnectionPool, RedisCluster, ClusterConnectionPool def init(): startup_nodes = [ {'host': '10.12.16.50', 'port': 6379}, {'host': '10.12.16.50', 'port': 6380}, {'host': '10.12.16.51', 'port': 6379}, {'host': '10.12.16.51', 'port': 6380}, {'host': '10.12.16.52', 'port': 6379}, {'host': '10.12.16.52', 'port': 6380}] # 构建连接池 pool = ClusterConnectionPool(startup_nodes=startup_nodes) # 创建redis集群客户端 redis_client = RedisCluster(connection_pool=pool) print("redis集群连接成功!!!") return redis_client def insert(key, i): redis_client = init() # 数据写入测试 redis_client.lpush(key, i) print("{}-->{}".format(key,i)) # print("测试数据写入成功!!!") def delete(key): redis_client = init() # 数据写入测试 redis_client.delete(key) print("{}-->{}".format("测试数据","删除成功")) if __name__=="__main__": key = "alamTest" delete(key) for i in range(1,1000): import time time.sleep(1) insert(key,i) print("-----time={}".format(time.asctime()))
select.py
from rediscluster import ClusterBlockingConnectionPool, RedisCluster, ClusterConnectionPool def init(): startup_nodes = [ {'host': '10.12.16.50', 'port': 6379}, {'host': '10.12.16.50', 'port': 6380}, {'host': '10.12.16.51', 'port': 6379}, {'host': '10.12.16.51', 'port': 6380}, {'host': '10.12.16.52', 'port': 6379}, {'host': '10.12.16.52', 'port': 6380}] # 构建连接池 pool = ClusterConnectionPool(startup_nodes=startup_nodes) # 创建redis集群客户端 redis_client = RedisCluster(connection_pool=pool) # print("redis集群连接成功!!!") return redis_client def select(keys="alamTest"): redis_client = init() # 数据写入测试 count = redis_client.llen(keys) print("{}-->{}".format(keys,count)) # print("测试数据查看成功!!!") if __name__=="__main__": while 1: import time time.sleep(1) select() print("-----time={}".format(time.asctime()))
标签:python,redis,host,client,print,操作,10.12,port From: https://www.cnblogs.com/alamZ/p/17388881.html