首页 > 其他分享 >etcd之etcd分布式锁及事务(四)

etcd之etcd分布式锁及事务(四)

时间:2024-10-24 20:20:23浏览次数:8  
标签:clientv3 context cli err nil 锁及 etcd key 分布式

1、etcd分布式锁及事务

1.1 前言

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如

果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥

来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

1.2 etcd分布式锁设计

1、排他性:任意时刻,只能有一个机器的一个线程能获取到锁。

通过在etcd中存入key值来实现上锁,删除key实现解锁,参考下面伪代码:

func Lock(key string, cli *clientv3.Client) error {
    //获取key,判断是否存在锁
	resp, err := cli.Get(context.Background(), key)
	if err != nil {
		return err
	}
	//锁存在,返回上锁失败
	if len(resp.Kvs) > 0 {
		return errors.New("lock fail")
	}
	_, err = cli.Put(context.Background(), key, "lock")
	if err != nil {
		return err
	}
	return nil
}
//删除key,解锁
func UnLock(key string, cli *clientv3.Client) error {
	_, err := cli.Delete(context.Background(), key)
	return err
}

当发现已上锁时,直接返回lock fail。也可以处理成等待解锁,解锁后竞争锁。

//等待key删除后再竞争锁
func waitDelete(key string, cli *clientv3.Client) {
	rch := cli.Watch(context.Background(), key)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.DELETE: //删除
				return
			}
		}
	}
}

2、容错性:只要分布式锁服务集群节点大部分存活,client就可以进行加锁解锁操作。

etcd基于Raft算法,确保集群中数据一致性。

3、避免死锁:分布式锁一定能得到释放,即使client在释放之前崩溃。

上面分布式锁设计有缺陷,假如client获取到锁后程序直接崩了,没有解锁,那其他线程也无法拿到锁,导致死锁

出现。

通过给key设定leases来避免死锁,但是leases过期时间设多长呢?假如设了30秒,而上锁后的操作比30秒

大,会导致以下问题:

  • 操作没完成,锁被别人占用了,不安全

  • 操作完成后,进行解锁,这时候把别人占用的锁解开了

解决方案:给key添加过期时间后,以Keep leases alive方式延续leases,当client正常持有锁时,锁不会过

期;当client程序崩掉后,程序不能执行Keep leases alive,从而让锁过期,避免死锁。看以下伪代码:

//上锁
func Lock(key string, cli *clientv3.Client) error {
    //获取key,判断是否存在锁
	resp, err := cli.Get(context.Background(), key)
	if err != nil {
		return err
	}
	//锁存在,等待解锁后再竞争锁
	if len(resp.Kvs) > 0 {
		waitDelete(key, cli)
		return Lock(key)
	}
    //设置key过期时间
	resp, err := cli.Grant(context.TODO(), 30)
	if err != nil {
		return err
	}
	//设置key并绑定过期时间
	_, err = cli.Put(context.Background(), key, "lock", clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//延续key的过期时间
	_, err = cli.KeepAlive(context.TODO(), resp.ID)
	if err != nil {
		return err
	}
	return nil
}
//通过让key值过期来解锁
func UnLock(resp *clientv3.LeaseGrantResponse, cli *clientv3.Client) error {
	_, err := cli.Revoke(context.TODO(), resp.ID)
	return err
}

经过以上步骤,我们初步完成了分布式锁设计。其实官方已经实现了分布式锁,它大致原理和上述有出入,接下来

我们看下如何使用官方的分布式锁。

1.3 etcd分布式锁使用

package main

import (
	"context"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/clientv3/concurrency"
	"log"
)

var endpoints = []string{"localhost:2379"}

func ExampleMutex_Lock() {
	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// create two separate sessions for lock competition
	s1, err := concurrency.NewSession(cli)
	if err != nil {
		log.Fatal(err)
	}
	defer s1.Close()
	m1 := concurrency.NewMutex(s1, "/my-lock/")

	s2, err := concurrency.NewSession(cli)
	if err != nil {
		log.Fatal(err)
	}
	defer s2.Close()
	m2 := concurrency.NewMutex(s2, "/my-lock/")

	// acquire lock for s1
	if err := m1.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("acquired lock for s1")

	m2Locked := make(chan struct{})
	go func() {
		defer close(m2Locked)
		// wait until s1 is locks /my-lock/
		if err := m2.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
	}()

	if err := m1.Unlock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("released lock for s1")

	<-m2Locked
	fmt.Println("acquired lock for s2")
}

func main() {
	ExampleMutex_Lock()
}
# 输出
acquired lock for s1
released lock for s1
acquired lock for s2

此代码来源于官方文档,etcd分布式锁使用起来很方便。

1.4 etcd事务

顺便介绍一下etcd事务,先看这段伪代码:

Txn(context.TODO()).If(//如果以下判断条件成立
	Compare(Value(k1), "<", v1),
	Compare(Version(k1), "=", 2)
).Then(//则执行Then代码段
	OpPut(k2,v2), OpPut(k3,v3)
).Else(//否则执行Else代码段
	OpPut(k4,v4), OpPut(k5,v5)
).Commit()//最后提交事务
package main

import (
	"context"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"log"
	"time"
)

var endpoints = []string{"localhost:2379"}

func ExampleKV_txn() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	kvc := clientv3.NewKV(cli)

	_, err = kvc.Put(context.TODO(), "key", "xyz")
	if err != nil {
		log.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	_, err = kvc.Txn(ctx).
		// txn value comparisons are lexical
		If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
		// the "Then" runs, since "xyz" > "abc"
		Then(clientv3.OpPut("key", "XYZ")).
		// the "Else" does not run
		Else(clientv3.OpPut("key", "ABC")).
		Commit()
	cancel()
	if err != nil {
		log.Fatal(err)
	}

	gresp, err := kvc.Get(context.TODO(), "key")
	cancel()
	if err != nil {
		log.Fatal(err)
	}
	for _, ev := range gresp.Kvs {
		fmt.Printf("%s : %s\n", ev.Key, ev.Value)
	}
}

func main() {
	ExampleKV_txn()
}
# 输出
key : XYZ

上面的使用例子,代码来自官方文档。

1.5 总结

如果发展到分布式服务阶段,且对数据的可靠性要求很高,选etcd实现分布式锁不会错。一般的Redis分布式

锁,可能出现锁丢失的情况(如果你是Java开发者,可以使用Redisson客户端实现分布式锁,据说不会出现锁丢

失的情况)。

标签:clientv3,context,cli,err,nil,锁及,etcd,key,分布式
From: https://blog.csdn.net/qq_30614345/article/details/142316528

相关文章

  • 云渲染分布式渲染什么意思?一文详解
    渲染和分布式渲染是现代计算机图形学中的重要技术,它们通过将渲染任务分散到多个服务器或计算节点上,显著提高了渲染效率和处理大规模数据的能力。这项技术在动画制作、游戏开发和电影特效等领域发挥着关键作用,为创作者提供了更快速、更灵活的渲染解决方案。分布式渲染是什么意思?......
  • 事务回顾及使用Seata解决分布式事务
    目录一、事务回顾1、什么是事务2、事务的作用3、事务ACID四大特性4、事务的并发二、Seata之原理简介1、TC、TM、RM三大组件2、分布式事务的执行流程3、AT模式如何做到对业务的无侵入3.1、一阶段加载3.2、二阶段提交3.3、二阶段回滚三、使用SEata添加分布式事务1......
  • 大模型分布式计算的优化方法思考
    转发请附原文链接:https://mp.weixin.qq.com/s/2Dox8nG2hKFQUfGoTvkNCQ一‍引言近年来,以ChatGPT、Sora为代表的通用生成式大模型的研究取得了显著进展。生成式大模型的参数规模已实现了从千万级别到万亿级别的飞跃,并朝着十万亿级别前进。由于大模型的参数规模巨大,单块GPU无法装......
  • 鸿蒙Next数据同步艺术:分布式数据对象的创建与管理
    本文旨在深入探讨华为鸿蒙HarmonyOSNext系统(截止目前API12)的技术细节,基于实际开发实践进行总结。主要作为技术分享与交流载体,难免错漏,欢迎各位同仁提出宝贵意见和问题,以便共同进步。本文为原创内容,任何形式的转载必须注明出处及原作者。引言在多设备协同的智能生态中,华为鸿蒙......
  • 《使用Gin框架构建分布式应用》阅读笔记:p127-p142
    《用Gin框架构建分布式应用》学习第9天,p127-p142总结,总计16页。一、技术总结1.Authentication方式汇总(1)APIkeysAPIkeys认证方式示例:func(handler*RecipesHandler)NewRecipeHandler(c*gin.Context){ //API-keys认证 value:=os.Getenv("X-API-KEY") log.Print......
  • 08.Sleuth(Micrometer)+ZipKin分布式链路追逐
    1.Sleuth进入维护模式替代方案->MicrometerTracing2.分布式链路追踪概述2.1出现背景在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协调产生最后的结果,每一个前端请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时......
  • 在分布式系统中使用异步管道创建实体
    背景   在分布式系统中异步创建实体既是挑战也是优势,尤其是对于追求可扩展性、容错性和高效用户体验的大型企业而言。用于创建实体的异步(async)管道可以解耦服务、优雅地处理故障并最大限度地减少延迟。这些特性使企业能够在扩展过程中保持灵活、高性能的系统。让我们深入探讨......
  • 鸿蒙Next之数据同步艺术之四:必要不充分理解分布式数据对象同步
    本文旨在深入探讨华为鸿蒙HarmonyOSNext系统(截止目前API12)的技术细节,基于实际开发实践进行总结。主要作为技术分享与交流载体,难免错漏,欢迎各位同仁提出宝贵意见和问题,以便共同进步。本文为原创内容,任何形式的转载必须注明出处及原作者。分布式数据对象的生命周期分布式数据对......
  • 《使用Gin框架构建分布式应用》阅读笔记:p108-p126
    《用Gin框架构建分布式应用》学习第8天,p108-p126总结,总计18页。一、技术总结1.Redisevictionpolicy(1)什么是evictionpolicy?Theevictionpolicydetermineswhathappenswhenadatabasereachesitsmemorylimit.(2)配置示例在redis.conf中配置。maxmemory-policy......
  • 【Elasticsearch】分布式搜索引擎技术学习[上]
    目录一.认识与了解搜索引擎1.介绍2.安装二.初步了解Elasticsearch1.倒排索引2.IK分词器3.基础概念三.Elasticsearch基础操作1.索引库操作1.1.常见映射属性1.2.索引库的·CRUD操作2.文档操作1.1.文档的CRUD操作1.2.批量处理四.ES的Java客户端1.客户端的......