引言
在并发编程的世界里,数据的一致性和线程安全是永恒的话题。Go语言以其独特的并发模型——goroutine和channel,简化了并发编程的复杂性。然而,在某些场景下,我们仍然需要一种机制来保证操作的原子性。这就是sync/atomic.Value
发挥作用的地方。
原子性:并发编程的基石
原子性(atomicity) 是指一个或多个操作在执行过程中不会被中断的特性。这些操作要么全部完成,要么全部不执行,从而避免了中间状态的暴露。在Go中,sync/atomic
包提供了一组原子操作,而Value
类型则是一种特殊的原子操作,用于存储和读取单个值。
适用场景:读多写少的优化
sync/atomic.Value
利用了写时复制(Copy-On-Write,COW)技术,这使得它在读多写少的场景下表现卓越。由于COW的特性,频繁的读操作不需要加锁,而写操作则会产生一个新的副本,这在内存使用上可能不是最经济的,尤其是在内存较大且写操作频繁的情况下。
官方案例:配置信息的动态更新
让我们通过一个官方示例来了解Value
的使用。这个示例展示了如何使用Value
来动态更新和读取服务器配置:
package main
import (
"sync/atomic"
"time"
)
func loadConfig() map[string]string {
return make(map[string]string)
}
func requests() chan int {
return make(chan int)
}
func main() {
var config atomic.Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
// Reload config every 10 seconds
// and update config value with the new version.
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
}
原理解析:Value的内部机制
Value
的内部实现基于Go的interface{}
类型,通过unsafe
包来实现原子操作。下面是Value
的定义和写入操作的核心逻辑:
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
v any
}
Value 的底层是一个 intreface 结构体类型,包含一个 interface 类型 v
// efaceWords is interface{} internal representation.
type efaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
efaceWords 是 interface 类型的内部实现,包含类型和值
写入操作
写入操作的关键在于确保类型一致性和原子性。首次写入时,会禁用抢占,确保写入过程不会被中断。后续写入则会检查类型一致性,并原子性地更新数据。
var firstStoreInProgress byte
// Store sets the value of the Value v to val.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(val any) {
if val == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*efaceWords)(unsafe.Pointer(v))
vlp := (*efaceWords)(unsafe.Pointer(&val))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, vlp.data)
StorePointer(&vp.typ, vlp.typ)
runtime_procUnpin()
return
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != vlp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, vlp.data)
return
}
}
流程:
-
如果传入的值为空会产生一个 panic
-
通过 unsafe.Pointer 将old value 和 new value 转化成 efaceWords 类型
-
进入 for 循环
-
如果 typ == nil 说明是第一次写入值,那么进入到第一次赋值的流程
- runtime_procPin() 禁止抢占,标记当前G在M上不会被抢占
- 使用 CompareAndSwapPointer 先尝试将
typ
设置为^uintptr(0)
这个中间状态。如果失败,则证明已经有别的线程抢先完成了赋值操作,那它就解除抢占锁,然后重新回到 for 循环第一步。 - 如果设置成功则进入赋值阶段,注意这里是 先赋值 data,再赋值 typ,因为我们是根据 typ 是否等于 nil 判断 对象是否被初始化,所以最后赋值 typ 才能确保对象完成了初始化。
-
如果 typ 不等于 nil
-
typ == unsafe.Pointer(&firstStoreInProgress) 判断初始化是否完成,未完成则回到 for 循环起始处
-
如果初始化对象完成,判断 typ != vlp.typ ,如果新写入的值不等于旧值则panic
-
StorePointer(&vp.data, vlp.data) 把 old value 原子性替换成 new value
-
// StorePointer atomically stores val into *addr. // Consider using the more ergonomic and less error-prone [Pointer.Store] instead. func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
-
读取操作
读取操作相对简单,它会原子性地获取当前值。如果值尚未初始化,将返回nil
。
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (val any) {
vp := (*efaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
vlp := (*efaceWords)(unsafe.Pointer(&val))
vlp.typ = typ
vlp.data = data
return
}
流程:
-
首先载入 value
-
if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) 判断写入过程是否初始化完成
-
data := LoadPointer(&vp.data) 原子性载入 old value
// LoadPointer atomically loads *addr. // Consider using the more ergonomic and less error-prone [Pointer.Load] instead. func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
- 定义一个新的值 vlp := (*efaceWords)(unsafe.Pointer(&val))
- 然后将 old value 赋值给 new value (COW 思想)
- 返回新的 value
1. 所以每次调用 Load 我们都是获取到了一个副本,所以可以保证在并发读写时候的线程安全
总结与最佳实践
sync/atomic.Value
是一个强大的工具,适用于需要高并发读取的场景。然而,它也有其局限性,特别是在内存使用和写入操作的频率上。在使用Value
时,应当考虑以下几点:
- 读多写少:
Value
最适合的场景是读操作远多于写操作。 - 内存效率:频繁的写入可能会因为COW机制导致内存使用增加。
- 类型安全:写入操作要求类型一致性,否则会引发panic