首页 > 其他分享 >.net 分布式架构之分布式锁实现

.net 分布式架构之分布式锁实现

时间:2022-11-11 14:03:15浏览次数:40  
标签:lockresult LockResult 架构 string lock redisserver net 分布式

.net分布式锁,包括redis分布式锁和zookeeper分布式锁的.net实现。 分布式锁在解决分布式环境下的业务一致性是非常有用的。

分布式锁

经常用于在解决分布式环境下的业务一致性和协调分布式环境。

实际业务场景中,比如说解决并发一瞬间的重复下单,重复确认收货,重复发现金券等。

使用分布式锁的场景一般不能太多。


开源相关群: .net 开源基础服务 238543768

这里整理了C#.net关于redis分布式锁和zookeeper分布式锁的实现,仅用于研究。(可能有bug)

采用ServiceStack.Redis实现Redis分布式锁

/*     
* Redis分布式锁
* 采用ServiceStack.Redis实现的Redis分布式锁
* 详情可阅读其开源代码
* 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同 xxf里面默认使用2.2版本
*/ public class RedisDistributedLock : BaseRedisDistributedLock
{
private ServiceStack.Redis.RedisLock _lock;
private RedisClient _client;
public RedisDistributedLock(string redisserver, string key)
: base(redisserver, key)
{

}

public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("检测到当前锁已获取");
_client = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient();
/* * 阅读源码发现当其获取锁后,redis连接资源会一直占用,知道获取锁的资源释放后,连接才会跳出,可能会导致连接池资源的浪费。 */

try {
this._lock = new ServiceStack.Redis.RedisLock(_client, key, getlockTimeOut);
lockresult = LockResult.Success;
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
}

public override void Dispose()
{
try {
if (this._lock != null)
this._lock.Dispose();
if (_client != null)
this._client.Dispose();
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}

 

 

来自网络的java实现Redis分布式锁(C#版)

/*     
* Redis分布式锁
* 采用网络上java实现的Redis分布式锁
* 参考 http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html
* 详情可阅读其开源代码
*/ public class RedisDistributedLockFromJava : BaseRedisDistributedLock
{


public RedisDistributedLockFromJava(string redisserver, string key)
: base(redisserver, key)
{


}

public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("检测到当前锁已获取");
try {
// 1. 通过SETNX试图获取一个lock
         string @lock = key;
long taskexpiredMilliseconds = (taskrunTimeOut != null ? (long)taskrunTimeOut.Value.TotalMilliseconds : (long)DistributedLockConfig.MaxLockTaskRunTime);
long getlockexpiredMilliseconds = (getlockTimeOut != null ? (long)getlockTimeOut.Value.TotalMilliseconds : 0);
long hassleepMilliseconds = 0;
while (true)
{
using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
long value = CurrentUnixTimeMillis() + taskexpiredMilliseconds + 1;
/*Java以前版本都是用SetNX,但是这种是无法设置超时时间的,不是很理解为什么,
              * 可能是因为原来的redis命令比较少导致的?现在用Add不知道效果如何.
                因对redis细节不了解,但个人怀疑若异常未释放锁经常发生,可能会导致内存逐步溢出*/

              bool acquired = redisclient.Add<long>(@lock, value, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));
//SETNX成功,则成功获取一个锁
if (acquired == true)
{
lockresult = LockResult.Success;
}
//SETNX失败,说明锁仍然被其他对象保持,检查其是否已经超时
               else
               {
var oldValueBytes = redisclient.Get(@lock);
//超时
if (oldValueBytes != null && BitConverter.ToInt64(oldValueBytes, 0) < CurrentUnixTimeMillis())
{
/*此处虽然重设并获取锁,但是超时时间可能被覆盖,故重设超时时间;若有进程一直在尝试获取锁,那么锁存活时间应该被延迟*/

                  var getValueBytes = redisclient.GetSet(@lock, BitConverter.GetBytes(value));
var o1 = redisclient.ExpireEntryIn(@lock, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));//这里如果程序异常终止,依然会有部分锁未释放的情况。 // 获取锁成功 if (getValueBytes == oldValueBytes)
{
lockresult = LockResult.Success;
}
// 已被其他进程捷足先登了
else
{
lockresult = LockResult.GetLockTimeOutFailure;
}
}
//未超时,则直接返回失败
else
{
lockresult = LockResult.GetLockTimeOutFailure;
}
}
}

//成功拿到锁
if (lockresult == LockResult.Success)
break;

//获取锁超时
if (hassleepMilliseconds >= getlockexpiredMilliseconds)
{
lockresult = LockResult.GetLockTimeOutFailure;
break;
}

//继续等待
System.Threading.Thread.Sleep(DistributedLockConfig.GetLockFailSleepTime);
hassleepMilliseconds += DistributedLockConfig.GetLockFailSleepTime;
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
}

private long CurrentUnixTimeMillis()
{
return (long)(System.DateTime.UtcNow - new System.DateTime(1970, 1, 1, 0, 0, 0, System.DateTimeKind.Utc)).TotalMilliseconds;
}

public override void Dispose()
{
if (lockresult == LockResult.Success || lockresult == LockResult.LockSystemExceptionFailure)
{
try {
long current = CurrentUnixTimeMillis();
using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
var v = redisclient.Get(key);
if (v != null)

{
// 避免删除非自己获取得到的锁
if (current < BitConverter.ToInt64(v, 0))
{
redisclient.Del(key);
}
}
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}
}

 

ServiceStack.Redis内部实现版本(较旧)

/*   
* Redis分布式锁
* 采用ServiceStack.Redis实现的Redis分布式锁
* 详情可阅读其开源代码
* 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同
* 拷贝自网络开源代码 较旧的实现版本
*/ public class RedisDistributedLockFromServiceStack : BaseRedisDistributedLock
{
public RedisDistributedLockFromServiceStack(string redisserver, string key)
: base(redisserver, key)
{


}
public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("检测到当前锁已获取");
try
{
using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
ExecExtensions.RetryUntilTrue(
() =>
{
//This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
//Calculate a unix time for when the lock should expire
                  TimeSpan realSpan = taskrunTimeOut ?? TimeSpan.FromMilliseconds(DistributedLockConfig.MaxLockTaskRunTime); //new TimeSpan(365, 0, 0, 0); //if nothing is passed in the timeout hold for a year DateTime expireTime = DateTime.UtcNow.Add(realSpan);
string lockString = (expireTime.ToUnixTimeMs() + 1).ToString();

//Try to set the lock, if it does not exist this will succeed and the lock is obtained
var nx = redisClient.SetEntryIfNotExists(key, lockString);
if (nx)
{
lockresult = LockResult.Success;
return true;
}

//If we've gotten here then a key for the lock is present. This could be because the lock is
//correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
//Therefore we need to get the value of the lock to see when it should expire
redisClient.Watch(key);
string lockExpireString = redisClient.Get<string>(key);
long lockExpireTime;
if (!long.TryParse(lockExpireString, out lockExpireTime))
{
redisClient.UnWatch(); // since the client is scoped externally
                      lockresult = LockResult.GetLockTimeOutFailure;
return false;
}

//If the expire time is greater than the current time then we can't let the lock go yet
                   if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
{
redisClient.UnWatch(); // since the client is scoped externally
                     lockresult = LockResult.GetLockTimeOutFailure;
return false;
}

//If the expire time is less than the current time then it wasn't released properly and we can attempt to //acquire the lock. The above call to Watch(_lockKey) enrolled the key in monitoring, so if it changes //before we call Commit() below, the Commit will fail and return false, which means that another thread //was able to acquire the lock before we finished processing. using (var trans = redisClient.CreateTransaction()) // we started the "Watch" above; this tx will succeed if the value has not moved {
trans.QueueCommand(r => r.Set(key, lockString));
//return trans.Commit(); //returns false if Transaction failed var t = trans.Commit();
if (t == false)
lockresult = LockResult.GetLockTimeOutFailure;
else
lockresult = LockResult.Success;
return t;
}
},
getlockTimeOut
);

}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
}

public override void Dispose()
{
try {
using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
redisClient.Remove(key);
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}

 

 

Zookeeper 版本实现分布式锁

/*  * 来源java网络源码的zookeeper分布式锁实现(目前仅翻译并简单测试ok,未来集成入sdk)  
* 备注: 共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,
然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,
如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,
从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
*/ public class ZooKeeprDistributedLockFromJava : IWatcher
{
private ZooKeeper zk;
private string root = "/locks"; //根
private string lockName; //竞争资源的标志
private string waitNode; //等待前一个锁
private string myZnode; //当前锁
//private CountDownLatch latch; //计数器
private AutoResetEvent autoevent;
private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(30000);
private IList<Exception> exception = new List<Exception>();

/// <summary>
/// 创建分布式锁,使用前请确认config配置的zookeeper服务可用 </summary>
/// <param name="config"> 127.0.0.1:2181 </param>
/// <param name="lockName"> 竞争资源标志,lockName中不能包含单词lock </param>
public ZooKeeprDistributedLockFromJava(string config, string lockName)
{
this.lockName = lockName;
// 创建一个与服务器的连接
try
       {
zk = new ZooKeeper(config, sessionTimeout, this);
var stat = zk.Exists(root, false);
if (stat == null)
{
// 创建根节点
zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
}
}
catch (KeeperException e)
{
throw e;
}
}

/// <summary>
/// zookeeper节点的监视器
/// </summary>
public virtual void Process(WatchedEvent @event)
{
if (this.autoevent != null)
{
this.autoevent.Set();
}
}

public virtual bool tryLock()
{
try {
string splitStr = "_lock_";
if (lockName.Contains(splitStr))
{
//throw new LockException("lockName can not contains \\u000B");
          }
//创建临时子节点
myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
Console.WriteLine(myZnode + " is created ");
//取出所有子节点
IList<string> subNodes = zk.GetChildren(root, false);
//取出所有lockName的锁
IList<string> lockObjNodes = new List<string>();
foreach (string node in subNodes)
{
if (node.StartsWith(lockName))
{
lockObjNodes.Add(node);
}
}
Array alockObjNodes = lockObjNodes.ToArray();
Array.Sort(alockObjNodes);
Console.WriteLine(myZnode + "==" + lockObjNodes[0]);
if (myZnode.Equals(root + "/" + lockObjNodes[0]))
{
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1);
waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1];
}
catch (KeeperException e)
{
throw e;
}
return false;
}

public virtual bool tryLock(TimeSpan time)
{
try {
if (this.tryLock())
{
return true;
}
return waitForLock(waitNode, time);
}
catch (KeeperException e)
{
throw e;
}
return false;
}

private bool waitForLock(string lower, TimeSpan waitTime)
{
var stat = zk.Exists(root + "/" + lower, true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if (stat != null)
{
Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower);
autoevent = new AutoResetEvent(false);
bool r = autoevent.WaitOne(waitTime);
autoevent.Dispose();
autoevent = null;
return r;
}
else return true;
}

public virtual void unlock()
{
try {
Console.WriteLine("unlock " + myZnode);
zk.Delete(myZnode, -1);
myZnode = null;
zk.Dispose();
}
catch (KeeperException e)
{
throw e;
}
}




}

 

 

以上代码仅做参考,未压测。

代码粘贴有些问题,详细请下载开源包运行研究。

 

开源是一种态度,分享是一种精神,学习仍需坚持,进步仍需努力,.net生态圈因你我更加美好。



标签:lockresult,LockResult,架构,string,lock,redisserver,net,分布式
From: https://blog.51cto.com/chejiangyi/5844802

相关文章

  • .net 分布式架构之任务调度平台
    .net简单任务调度平台,用于.netdll,exe的任务的挂载,任务的隔离,调度执行,访问权限控制,监控,管理,日志,错误预警,性能分析等。.net任务调度平台用于.net......
  • .net 分布式架构之业务消息队列
    .net业务消息队列是应用于业务的解耦和分离,应具备分布式,高可靠性,高性能,高实时性,高稳定性,高扩展性等特性。大量的业务消息堆积能力;无单点故障及故障监控......
  • .net 任务调度平台
    .net简单任务调度平台,用于.netdll,exe的任务的挂载,任务的隔离,调度执行,访问权限控制,监控,管理,日志,错误预警,性能分析等。.net任务调度平台用于.net......
  • 搜索精准度优化架构方案
    搜索精准度优化架构方案概述实现公司对内容精准化搜索和用户精准化推送的目标。采用方案 搜索技术+数据挖掘+机器学习(未来)+人工审核(现在)人员配......
  • 乘风破浪,遇见新一代工业互联网(Industrial Internet)之自主移动机器人(AMR)、自动导航
    什么是自主移动机器人(AMR)从工厂车间的重复性工作到农业、物流、酒店等领域的活动,自主移动机器人的使用正在改变业务的运营方式。自主移动机器人(AutonomousMobile......
  • 超全面Redis分布式高可用方案:哨兵机制
    开发工作中对于分布式缓存高可用方案(搭建Redis缓存高可用方案),Redis主从架构下是如何保证高可用的呢?我们知道RedisSentinel是一个分布式系统,为Redis提供高可用性解决......
  • 关于架构设计的易变性,应该如何理解呢?
    hello,大家好,我是张张,「架构精进之路」公号作者。一、架构设计分层通常情况下,我们的架构设计图大概率会如下图这个样子了,首先声明一点,这其实并没有什么不妥的,这也是很典型的......
  • asp.net中updatepanel控件向外传值
    .aspx代码如下:<%@PageLanguage="C#"AutoEventWireup="true"CodeFile="UpdatePanel控件传值.aspx.cs"Inherits="UpdatePanel控件传值"%><!DOCTYPEhtmlPUBLIC"-//W3......
  • asp.net中FileUpload控件研究汇总
    .aspx代码如下:<%@PageLanguage="C#"AutoEventWireup="true"CodeFile="文件上传.aspx.cs"Inherits="文件上传"%><!DOCTYPEhtmlPUBLIC"-//W3C//DTDXHTML1.0Transi......
  • asp.net中MultiView和View
    小例子.aspx代码如下:<%@PageLanguage="C#"AutoEventWireup="true"CodeFile="2_MultiView.aspx.cs"Inherits="_2_MultiView"%><!DOCTYPEhtmlPUBLIC"-//W3C//DTDXHT......