首页 > 其他分享 >atomic

atomic

时间:2023-02-04 10:35:32浏览次数:58  
标签:head return next tail atomic func

atomic

Mutex、RWMutex 等并发原语的实现,最底层是通过 atomic 包中的一些原子操作来实现的.

在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化.

atomic 原子操作的应用场景

  • 可以使用 atomic 实现自己定义的基本并发原语.

  • 如果想要不加锁, 可以考虑atomic. atomic是实现lock-free数据结构的基石. lock-free 的数据结构可以让我们不使用互斥锁,这样就不会让线程因为等待互斥锁而阻塞休眠,而是让线程保持继续处理的状态.

lock-free数据结构实现起来比较复杂, 这里是一位微软专家写的一篇经验分享:

Xbox 360 和 Microsof Windows 的无锁编程注意事项 - Win32 apps | Microsoft Learn

atomic 提供的方法和一个特殊类型--Value

atomic 为了支持 int32、int64、uint32、uint64、uintptr、Pointer(Add 方法不支持)类型,分别提供了 AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等方法。不过,你也不要担心,你只要记住了一种数据类型的方法的意义,其它数据类型的方法也是一样的

关于 atomic,还有一个地方你一定要记住,atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法.

Add

Add*** 以原子方式将 delta 添加到 *addr 并返回新值.

对于有符号的整数来说,delta 可以是一个负数,相当于减去一个值。对于无符号的整数和 uinptr 类型来说,怎么实现减去一个值呢?毕竟,atomic 并没有提供单独的减法操作.

Go 官方给出以下操作, 以Uint64为例: (你可以利用计算机补码的规则,把减法变成加法)

要从 x 中减去有符号的正常数值 c,请执行 AddUint64(&x, ^uint64(c-1)). 特别是,要将x减去1,请执行 AddUint64(&x, ^uint64(0)).

CompareAndSwap (CAS)

这个方法会比较当前 addr 地址里的值是不是 old,如果不等于 old,就返回 false;如果等于 old,就把此地址的值替换成 new 值,返回 true。这就相当于“判断相等才替换”。

如果使用伪代码来表示这个原子操作,代码如下:

if *addr == old {
  *addr = new
  return true
}
return false

Swap

如果不需要比较旧值,只是比较粗暴地替换的话,就可以使用 Swap 方法,它替换后还可以返回旧值,伪代码如下:

old = *addr
*addr = new
return old

Load

Load 方法会取出 addr 地址中的值,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Load 是一个原子操作。

Store

Store 方法会把一个值存入到指定的 addr 地址中,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Store 是一个原子操作。别的 goroutine 通过 Load 读取出来,不会看到存取了一半的值。

Value

atomic package - sync/atomic - Go Packages

刚刚说的都是一些比较常见的类型,其实,atomic 还提供了一个特殊的类型:Value。它可以原子地存取对象类型

下面的例子显示了如何使用Value进行定期的程序配置更新并将变化传播给工作程序。

type Config struct {
    NodeName string
    Addr     string
    Count    int32
}

func loadNewConfig() Config {
    return Config{
        NodeName: "北京",
        Addr:     "10.77.95.27",
        Count:    rand.Int31(),
    }
}
func main() {
    var config atomic.Value
    config.Store(loadNewConfig())
    var cond = sync.NewCond(&sync.Mutex{})

    // 设置新的config
    go func() {
        for {
            time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
            config.Store(loadNewConfig())
            cond.Broadcast() // 通知等待着配置已变更
        }
    }()

    go func() {
        for {
            cond.L.Lock()
            cond.Wait()                 // 等待变更信号
            c := config.Load().(Config) // 读取新的配置
            fmt.Printf("new config: %+v\n", c)
            cond.L.Unlock()
        }
    }()

    select {}
}

下面的例子显示了如何维护一个可扩展的频繁读取但不频繁更新的数据结构(使用copy-on-write习惯用法).

package main

import (
    "sync"
    "sync/atomic"
)

func main() {
    type Map map[string]string
    var m atomic.Value
    m.Store(make(Map))
    var mu sync.Mutex // used only by writers
    // read function can be used to read the data without further synchronization
    read := func(key string) (val string) {
        m1 := m.Load().(Map)
        return m1[key]
    }
    // insert function can be used to update the data without further synchronization
    insert := func(key, val string) {
        mu.Lock() // synchronize with other potential writers
        defer mu.Unlock()
        m1 := m.Load().(Map) // load current value of the data structure
        m2 := make(Map)      // create a new value
        for k, v := range m1 {
            m2[k] = v // copy all data from the current object to the new one
        }
        m2[key] = val // do the update that we need
        m.Store(m2)   // atomically replace the current object with the new one
        // At this point all new readers start working with the new version.
        // The old version will be garbage collected once the existing readers
        // (if any) are done with it.
    }
    _, _ = read, insert
}

使用 atomic 实现 lock-free queue

atomic 常常用来实现 Lock-Free 的数据结构,这次我会给你展示一个 Lock-Free queue 的实现。

Lock-Free queue 最出名的就是 Maged M. Michael 和 Michael L. Scott 1996 年发表的论文中的算法,算法比较简单,容易实现,伪代码的每一行都提供了注释,我就不在这里贴出伪代码了,因为我们使用 Go 实现这个数据结构的代码几乎和伪代码一样:

package queue

import (
  "sync/atomic"
  "unsafe"
)
// lock-free的queue
type LKQueue struct {
  head unsafe.Pointer
  tail unsafe.Pointer
}
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
  value interface{}
  next  unsafe.Pointer
}
func NewLKQueue() *LKQueue {
  n := unsafe.Pointer(&node{})
  return &LKQueue{head: n, tail: n}
}
// 入队
func (q *LKQueue) Enqueue(v interface{}) {
  n := &node{value: v}
  for {
    tail := load(&q.tail)
    next := load(&tail.next)
    if tail == load(&q.tail) { // 尾还是尾
      if next == nil { // 还没有新数据入队
        if cas(&tail.next, next, n) { //增加到队尾
          cas(&q.tail, tail, n) //入队成功,移动尾巴指针
          return
        }
      } else { // 已有新数据加到队列后面,需要移动尾指针
        cas(&q.tail, tail, next)
      }
    }
  }
}
// 出队,没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
  for {
    head := load(&q.head)
    tail := load(&q.tail)
    next := load(&head.next)
    if head == load(&q.head) { // head还是那个head
      if head == tail { // head和tail一样
        if next == nil { // 说明是空队列
          return nil
        }
        // 只是尾指针还没有调整,尝试调整它指向下一个
        cas(&q.tail, tail, next)
      } else {
        // 读取出队的数据
        v := next.value
        // 既然要出队了,头指针移动到下一个
        if cas(&q.head, head, next) {
          return v // Dequeue is done.  return
        }
      }
    }
  }
}

// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) {
  return (*node)(atomic.LoadPointer(p))
}

// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
  return atomic.CompareAndSwapPointer(
    p, unsafe.Pointer(old), unsafe.Pointer(new))
}

我来给你介绍下这里的主要逻辑。

这个 lock-free 的实现使用了一个辅助头指针(head),头指针不包含有意义的数据,只是一个辅助的节点,这样的话,出队入队中的节点会更简单。

入队的时候,通过 CAS 操作将一个元素添加到队尾,并且移动尾指针。

出队的时候移除一个节点,并通过 CAS 操作移动 head 指针,同时在必要的时候移动尾指针。

标签:head,return,next,tail,atomic,func
From: https://www.cnblogs.com/geraldkohn/p/17090979.html

相关文章

  • Java并发JUC——Atomic原子类
    什么是原子类原子是不可分割的最小单位,故原子类可以认为其操作都是不可分割一个操作时不可中断的,即便是在多线程的情况下也可以保证原子类的作用和锁类似,是为了保证并发......
  • atomic compare_exchange_weak函数
    compare_exchange_weak/compare_exchange_strong(是著名的CAS(compareandset))。参数传入期待值与新值,通过比较当前值与期待值的情况进行区别改变。a.compare_exchange_we......
  • 树莓派raspberry编译isc-dhcp遇到“undefined reference to `__atomic_fetch_add_8'”
    想在树莓派上修改dhclient,增加一些打印信息,需要编译isc-dhcp。但是在编译过程中遇到了一个错误,错误如下:1gcc-g-Wall-Werror-fno-strict-aliasing-I../includes......
  • What is an atomic instruction?
    LINK:https://www.quora.com/What-is-an-atomic-instruction     Thistermisappliedtoinstructionsthatexecutetwoormoreseparateaccessestomemo......
  • 【Kotlin 协程】协程并发安全问题 ( 使用 Atomic 并发安全类型 | 使用 Channel 通道 |
    文章目录​​一、协程不安全数据访问​​​​二、使用Atomic并发安全类型​​​​三、使用Channel通道​​​​四、使用Mutext轻量级锁​​​​五、使用Semaphore轻......
  • redis保存AtomicInteger对象踩坑及解决详解
    这篇文章主要介绍了redis保存AtomicInteger对象踩坑及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教redis保存AtomicInteger对象踩......
  • 原子操作类AtomicInteger详解
    为什么需要AtomicInteger原子操作类?对于Java中的运算操作,例如自增或自减,若没有进行额外的同步操作,在多线程环境下就是线程不安全的。num++解析为num=num+1,明显,这个操作不......
  • JAVA原子类 AtomicInteger
    JAVA原子类java原子类位于:JUC包(java.util.concurrent.atomic.Atomic*)中举例:classAtomicIntegerextendsNumberimplementsjava.io.Serializableimportjava.util.co......
  • <五>基于CAS操作的atomic原子类型
    C++11多线程类库中提供了include包含了很多原子类型原子操作若干汇编指令具有读-修改-写类型,也就是说它们访问存储器单元两次,第一次读原值,第二次写新值假定运行在两个c......
  • 使用 Atomic 减少互斥锁与Atomic.Value
    看到medium的文章:https://medium.com/a-journey-with-go/go-how-to-reduce-lock-contention-with-the-atomic-package-ba3b2664b549点开一看发现居然需要vip...于是就去......