Go Map 并发安全:sync.Map 详解 (如果文章有相关内容)

Go 并发安全 Map:sync.Map 详解

在 Go 语言中,内置的 map 类型并不是并发安全的。这意味着,如果多个 goroutine 同时对同一个 map 进行读写操作,可能会导致数据竞争(data race)和其他不可预测的行为。为了解决这个问题,Go 1.9 引入了一个并发安全的 map 类型:sync.Map

sync.Map 专门为并发场景设计,它在某些特定的使用模式下可以提供比使用普通 map 加互斥锁(sync.Mutexsync.RWMutex)更好的性能。然而,sync.Map 并非万能,它有自己的适用场景,如果使用不当,反而可能导致性能下降。

本文将深入探讨 sync.Map 的内部实现、使用方法、适用场景、性能分析以及与普通 map 加锁的对比,帮助你全面理解并正确使用 sync.Map

1. sync.Map 的设计目标与特点

sync.Map 的主要设计目标是优化以下两种常见场景:

  1. 键值对一旦写入,很少修改,主要进行读取操作(缓存场景):在这种场景下,sync.Map 可以减少锁竞争,提高读取性能。
  2. 多个 goroutine 并发读写不同的键值对:在这种场景下,sync.Map 内部的机制可以降低锁的粒度,提高整体并发性能。

sync.Map 具有以下特点:

  • 并发安全:多个 goroutine 可以同时对 sync.Map 进行读写操作,无需额外的同步机制。
  • 无类型约束sync.Map 的键和值可以是任意类型(interface{})。
  • 无需初始化:可以直接声明一个 sync.Map 类型的变量并使用,无需像内置 map 一样使用 make 初始化。
  • 空间换时间sync.Map 内部使用了一些冗余的存储空间,以减少锁竞争,提高并发性能。
  • ** 零值可用**: sync.Map 的零值(Zero Value)就是一个可以直接使用的空 map。

2. sync.Map 的内部实现

sync.Map 的内部实现相对复杂,它主要使用了以下几种技术来保证并发安全和性能:

  • 读写分离sync.Map 内部维护了两个 mapreaddirtyread 是一个只读的 map,用于存储稳定的键值对,读取操作可以直接在 read 上进行,无需加锁。dirty 是一个可读写的 map,用于存储新写入或修改的键值对。
  • 原子操作sync.Map 使用原子操作(atomic 包)来更新 readdirty 指针,以及一些计数器,保证并发安全。
  • 延迟删除:当删除一个键值对时,sync.Map 并不立即从 readdirty 中删除,而是将其标记为 expunged(删除),等到合适的时机再进行物理删除,以减少锁竞争。
  • 双重检查:在读取键值对时,sync.Map 会先尝试从 read 中读取,如果读取失败,再尝试从 dirty 中读取。这种双重检查机制可以减少对 dirty 的访问,降低锁竞争。
  • 晋升机制:当 dirty 中的键值对数量达到一定阈值时,sync.Map 会将 dirty 晋升为 read,并将 dirty 清空,以减少 dirty 的大小,提高读取性能。

2.1 sync.Map 的数据结构

```go
type Map struct {
mu Mutex

// read 包含了 map 中稳定的部分,访问它无需持有锁 mu。
// read 的内容要么在 dirty 中有对应条目 (且未被删除), 要么是一个被标记删除(expunged)的条目。
read atomic.Value // readOnly

// dirty 包含了 map 中新写入的和待修改的部分.
// 如果要将 dirty 提升为 read, 必须先将其中未被删除的条目复制到 read。
dirty map[interface{}]*entry

// misses 记录了从 read 中读取失败的次数。
// 一旦 misses 达到一定阈值,dirty 会被提升为 read,并且 misses 会被重置为 0。
misses int

}

// readOnly 是一个不可变的结构,存储在 Map.read 字段中。
type readOnly struct {
m map[interface{}]*entry
amended bool // 如果 dirty 中有 read 中不存在的键,则为 true
}

// entry 是 map 中存储的每个键值对的槽位。
type entry struct {
// p == nil: 键值对已被删除,且 m.dirty == nil
// p == expunged: 键值对已被删除, m.dirty!=nil 且该键不在 m.dirty 中
// 否则, 键值对是有效的, 如果 m.dirty != nil 则记录在 m.dirty[key] 中,
// 如果 m.dirty == nil, 则下一次对 m.dirty 的写操作将会先复制 m.read.m
p unsafe.Pointer // *interface{}
}
```

2.2 sync.Map 的主要方法

sync.Map 提供了以下几个主要方法:

  • Load(key interface{}) (value interface{}, ok bool):读取指定键的值。如果键存在,返回对应的值和 true;否则,返回 nilfalse
  • Store(key, value interface{}):存储一个键值对。
  • LoadOrStore(key, value interface{}) (actual interface{}, loaded bool):如果键存在,返回已有的值和 true;否则,存储给定的键值对,并返回给定的值和 false
  • LoadAndDelete(key interface{}) (value interface{}, loaded bool): 读取并删除键值,如果键存在,则返回键值,loaded 为 true, 否则 value 为 nil, loaded 为 false.
  • Delete(key interface{}):删除指定的键值对。
  • Range(f func(key, value interface{}) bool):遍历 sync.Map 中的所有键值对。f 是一个回调函数,接收键和值作为参数,如果返回 true,则继续遍历;如果返回 false,则停止遍历。

2.3 方法实现细节 (以 Load, Store, Delete 为例)

2.3.1 Load 方法

```go
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 避免在持有锁期间创建一个新的 read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 无论键是否存在,都记录一次 miss,以便将 dirty 提升为 read。
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

func (e entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return
(*interface{})(p), true
}
```

Load 方法的执行流程如下:

  1. 首先尝试从 read 中读取键值对,如果读取成功,直接返回结果。
  2. 如果从 read 中读取失败,并且 read.amendedtrue(表示 dirty 中有 read 中不存在的键),则加锁。
  3. 加锁后,再次尝试从 read 中读取(双重检查),如果仍然失败,则从 dirty 中读取。
  4. 无论是否从 dirty 中读取成功,都调用 m.missLocked() 方法,增加 misses 计数。
  5. 解锁。
  6. 如果最终没有读取到键值对,返回 nilfalse;否则,调用 e.load() 方法,通过原子操作读取 entry 中的值。

2.3.2 Store 方法

```go
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
    if e.unexpungeLocked() {
        // 键之前被标记为删除了,因此 dirty 中一定没有。
        m.dirty[key] = e
    }
    e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
    e.storeLocked(&value)
} else {
    if !read.amended {
        // 这是第一次向 dirty 中添加 read 中没有的键。
        // 确保将 read 复制到 dirty 中,并将 amended 标记设置为 true。
        m.dirtyLocked()
        m.read.Store(readOnly{m: read.m, amended: true})
    }
    m.dirty[key] = newEntry(value)
}
m.mu.Unlock()

}

func (e entry) tryStore(i interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (e entry) storeLocked(i interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
```

Store 方法的执行流程如下:

  1. 首先尝试从 read 中读取键值对,如果键存在且未被标记删除,则尝试通过原子操作直接更新 entry 中的值。
  2. 如果 read 中不存在该键,或者键被标记删除,则加锁。
  3. 加锁后,再次尝试从 read 中读取(双重检查):
    • 如果键存在且被标记删除,则调用 e.unexpungeLocked() 方法,将 entry 的状态从未删除改为正常,并将键值对添加到 dirty 中。
    • 如果键存在且未被标记删除,则直接更新 dirty 中对应 entry 的值。
    • 如果 read 中不存在该键,则检查 dirty 中是否存在:
      • 如果 dirty 中存在,则直接更新 dirty 中对应 entry 的值。
      • 如果 dirty 中也不存在,则:
        • 如果 read.amendedfalse,说明这是第一次向 dirty 中添加 read 中没有的键,需要调用 m.dirtyLocked() 方法,将 read 复制到 dirty 中,并将 read.amended 设置为 true
        • 创建一个新的 entry,并将键值对添加到 dirty 中。
  4. 解锁。

2.3.3 Delete 方法

```go
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// 无论键是否存在,都记录一次 miss,以便将 dirty 提升为 read。
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}

func (e entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return
(*interface{})(p), true
}
}
}

``Delete方法实际上通过调用LoadAndDelete` 方法实现。

LoadAndDelete 方法的执行流程如下:

  1. 首先尝试从 read 中读取键值对,如果读取成功,直接调用 entry.delete() 进行删除。
  2. 如果从 read 中读取失败,并且 read.amendedtrue(表示 dirty 中有 read 中不存在的键),则加锁。
  3. 加锁后,再次尝试从 read 中读取(双重检查),如果仍然失败,则从 dirty 中读取并删除。 同时调用 m.missLocked() 增加 misses 计数。
  4. 解锁。
  5. 如果最终从 read 中读取到键值对,调用 e.delete() 方法,通过原子操作将 entry 的值设置为 nil。 注意此处并未直接删除 read map 中的数据。

2.4 missLockeddirtyLocked 方法

```go
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty}) // 晋升 dirty
m.dirty = nil
m.misses = 0
}

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
    if !e.tryExpungeLocked() {
        m.dirty[k] = e
    }
}

}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
```

  • missLocked:每次从 read 中读取失败时,都会调用 missLocked 方法,增加 misses 计数。当 misses 计数达到 dirty 的长度时,说明 dirty 已经积累了足够多的修改,需要将其晋升为 read,并将 dirty 清空。
  • dirtyLocked:当第一次向 dirty 中添加 read 中没有的键时,会调用 dirtyLocked 方法。该方法会创建一个新的 dirty,并将 read 中未被标记删除的键值对复制到 dirty 中。

3. sync.Map 的使用方法

```go
package main

import (
"fmt"
"sync"
)

func main() {
var m sync.Map

// 存储键值对
m.Store("name", "John Doe")
m.Store("age", 30)

// 读取键值对
name, ok := m.Load("name")
if ok {
    fmt.Println("Name:", name) // 输出:Name: John Doe
}

// 读取或存储键值对
actual, loaded := m.LoadOrStore("city", "New York")
if loaded {
    fmt.Println("City already exists:", actual)
} else {
    fmt.Println("City stored:", actual) // 输出:City stored: New York
}

// 删除键值对
m.Delete("age")
// 读取并删除
age, loaded := m.LoadAndDelete("age")
if loaded{
    fmt.Println("age:", age) // 不会输出, age 已经被删除
}

// 遍历键值对
m.Range(func(key, value interface{}) bool {
    fmt.Println(key, ":", value)
    return true // 继续遍历
})
// 可能的输出:
// name : John Doe
// city : New York

}
```

4. sync.Map 的适用场景

sync.Map 适用于以下场景:

  • 缓存:当键值对一旦写入,很少修改,主要进行读取操作时,sync.Map 可以提供很好的性能。例如,可以将一些不经常变化的数据(如配置信息、用户信息等)缓存在 sync.Map 中,以减少对数据库或其他数据源的访问。
  • 多 goroutine 并发读写不同的键:当多个 goroutine 并发读写不同的键时,sync.Map 可以降低锁的粒度,提高整体并发性能。例如,可以用 sync.Map 来实现一个并发安全的计数器,每个 goroutine 负责更新不同的计数器。

sync.Map 不适用于以下场景:

  • 频繁更新相同的键:如果多个 goroutine 频繁更新相同的键,sync.Map 的性能可能不如普通 map 加互斥锁。因为 sync.Map 的写操作需要维护 readdirty 两个 map,并且可能涉及原子操作和锁竞争,开销较大。
  • 需要 range 操作且对顺序敏感: range 操作的顺序是不确定的, 如果需要保证顺序, 不应该使用 sync.Map.
  • 大量的删除操作: sync.Map 的删除操作并非立即执行, 而是标记删除, 这可能在大量删除的情况下导致内存占用过高。

5. sync.Map 的性能分析

sync.Map 的性能优势主要体现在读多写少,且多个 goroutine 并发读写不同键的场景。在这种场景下,sync.Map 可以减少锁竞争,提高读取性能。

在写多读少,或者多个 goroutine 频繁更新相同键的场景下,sync.Map 的性能可能不如普通 map 加互斥锁。因为 sync.Map 的写操作需要维护 readdirty 两个 map,并且可能涉及原子操作和锁竞争,开销较大。

为了更直观地对比 sync.Map 和普通 map 加锁的性能差异,我们可以进行基准测试(benchmark)。

```go
package main

import (
"sync"
"testing"
)

// 使用 sync.Map 的基准测试
func BenchmarkSyncMap(b *testing.B) {
var m sync.Map
for i := 0; i < b.N; i++ {
m.Store(i, i)
_, _ = m.Load(i)
}
}

// 使用普通 map 加互斥锁的基准测试
func BenchmarkMapWithMutex(b *testing.B) {
var m = make(map[int]int)
var mu sync.Mutex
for i := 0; i < b.N; i++ {
mu.Lock()
m[i] = i
_ = m[i]
mu.Unlock()
}
}

// 使用普通 map 加读写锁的基准测试
func BenchmarkMapWithRWMutex(b *testing.B) {
var m = make(map[int]int)
var mu sync.RWMutex
for i := 0; i < b.N; i++ {
mu.Lock()
m[i] = i
mu.Unlock()
mu.RLock()
_ = m[i]
mu.RUnlock()
}
}
并发测试:go
package main

import (
"sync"
"testing"
)

func benchmarkSyncMapParallel(b testing.B, concurrency int) {
var m sync.Map
b.SetParallelism(concurrency) // 设置并行度, 模拟多核 CPU
b.RunParallel(func(pb
testing.PB) {
for pb.Next() {
key := 1 // 模拟固定 key
m.Store(key, 1)
_, _ = m.Load(key)
}
})
}
func BenchmarkSyncMapParallel_1(b testing.B){ benchmarkSyncMapParallel(b, 1)}
func BenchmarkSyncMapParallel_10(b
testing.B){ benchmarkSyncMapParallel(b, 10)}
func BenchmarkSyncMapParallel_100(b *testing.B){ benchmarkSyncMapParallel(b, 100)}

func benchmarkMapWithMutexParallel(b testing.B, concurrency int) {
var m = make(map[int]int)
var mu sync.Mutex
b.SetParallelism(concurrency)
b.RunParallel(func(pb
testing.PB) {
for pb.Next() {
key := 1 // 模拟固定 key
mu.Lock()
m[key] = 1
_ = m[key]
mu.Unlock()
}
})
}

func BenchmarkMapWithMutexParallel_1(b testing.B){ benchmarkMapWithMutexParallel(b, 1) }
func BenchmarkMapWithMutexParallel_10(b
testing.B){ benchmarkMapWithMutexParallel(b, 10) }
func BenchmarkMapWithMutexParallel_100(b *testing.B){ benchmarkMapWithMutexParallel(b, 100) }
```

通过运行这些基准测试,我们可以看到:

  • 在简单的读写场景下,sync.Map 的性能可能不如普通 map 加互斥锁。
  • 在并发读写, 尤其是读多写少的场景下,如果竞争不激烈(例如,不同的 goroutine 访问不同的 key),sync.Map 的性能通常优于普通 map 加锁。
  • 在并发读写, 且竞争激烈 (例如, 多个 goroutine 访问相同的 key), sync.Map 性能不如普通 map 加锁。

因此,在使用 sync.Map 之前,最好先进行基准测试,评估其在你的具体场景下的性能表现。

6. 总结

sync.Map 是 Go 语言提供的一个并发安全的 map 类型,它在某些特定的使用模式下可以提供比普通 map 加锁更好的性能。sync.Map 的内部实现比较复杂,它使用了读写分离、原子操作、延迟删除、双重检查和晋升机制等多种技术来保证并发安全和性能。

sync.Map 适用于缓存和多 goroutine 并发读写不同键的场景,但不适用于频繁更新相同键的场景。在使用 sync.Map 之前,最好先了解其适用场景和性能特点,并进行基准测试,以确保其能够满足你的需求。

希望本文能够帮助你深入理解 sync.Map,并在实际开发中正确使用它。

THE END