第26章:同步原语——sync 包

第26章:同步原语——sync 包

如果说 goroutine 是 Go 的高并发精髓,那 sync 包就是让这股"并发之力"不至于变成"并发灾难"的定海神针。本章我们来聊聊那些让多个 goroutine 和平共处、共同完成大业的同步原语。


26.1 sync 包解决什么问题:多个 goroutine 共享数据时需要同步,否则会有数据竞争(data race)

想象一下,两个小学生同时在一张纸上写名字,一个写"小明",另一个写"小红"。结果纸上可能是"小红明"——这不是艺术,这是数据竞争(data race)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
    "fmt"
    "time"
)

// 全局变量,多个 goroutine 共享
var counter = 0

func increment() {
    // 模拟每个 goroutine 执行 1000 次自增
    for i := 0; i < 1000; i++ {
        counter++ // 读取、+1、写回,三步操作不是原子的!
        time.Sleep(time.Microsecond) // 故意加点料,让竞争更明显
    }
}

func main() {
    // 启动 10 个 goroutine 同时自增
    for i := 0; i < 10; i++ {
        go increment()
    }

    // 等待一会儿(虽然不是最佳实践,但这里为了演示)
    time.Sleep(2 * time.Second)

    fmt.Println("最终计数器值:", counter) // 很可能不是 10000!这就是 data race 的威力
    // 期望输出: 10000
    // 实际输出: 往往是 9000 多,因为 race 丢失了某些自增
}

运行这个程序,你会发现最终结果往往是 9000 多,而不是期望的 10000。这就是 data race 的威力——counter++ 在 CPU 层面是三条指令(读、改、写),多个 goroutine 同时执行时,会互相覆盖彼此的结果。

专业词汇解释:

  • 数据竞争(Data Race):两个或多个 goroutine 同时访问同一块内存,且至少有一个是写操作,这时候竞争就发生了
  • 原子操作(Atomic Operation):不可中断的操作,要么完全执行,要么完全不执行,不存在中间状态

温馨提示:可以用 go run -race main.go 来检测 data race,Go 内置的 race detector 超级好用!


26.2 sync 核心原理:锁、信号量、等待组、条件变量

sync 包的核心就是四大天王:

graph TB
    A["sync 包四大天王"] --> B["🔒 互斥锁 Mutex"]
    A --> C["📊 读写锁 RWMutex"]
    A --> D["⏳ 等待组 WaitGroup"]
    A --> E["🔔 条件变量 Cond"]
    
    B --> B1["Lock / Unlock"]
    C --> C1["RLock / RUnlock / Lock / Unlock"]
    D --> D1["Add / Done / Wait"]
    E --> E1["Wait / Signal / Broadcast"]
  • 锁(Mutex/RWMutex):保证同一时刻只有一个 goroutine 能访问共享资源
  • 信号量(Semaphore):控制并发数量的高级工具
  • 等待组(WaitGroup):等待一群 goroutine 全部完成
  • 条件变量(Cond):基于特定条件的等待和唤醒机制

26.3 sync.Mutex:互斥锁,Mutex.Lock、Mutex.Unlock

互斥锁是 sync 包最基础也最常用的组件。就像厕所的门锁——一个人进去后锁上,用完出来别人才能进。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "fmt"
    "sync"
)

var counter = 0
var mu sync.Mutex // 互斥锁

func increment() {
    mu.Lock()   // 加锁,其他 goroutine 只能在外面等
    counter++   // 临界区:同一时刻只有一个 goroutine 能执行这里
    mu.Unlock() // 解锁,让其他 goroutine 有机会进来
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                increment()
            }
        }()
    }
    
    wg.Wait()
    fmt.Println("最终计数器值:", counter) // 稳定输出 10000
    // 输出: 最终计数器值: 10000
}

工作原理图:

sequenceDiagram
    participant G1 as Goroutine 1
    participant L as Mutex Lock
    participant G2 as Goroutine 2
    participant G3 as Goroutine 3
    
    G1->>L: Lock() - 获得锁
    L-->>G1: OK! 你可以进去了
    G2->>L: Lock() - 尝试获得锁
    L-->>G2: 等等,你先阻塞等着
    G1->>L: Unlock() - 释放锁
    L->>G2: 好了,你进来吧
    G3->>L: Lock() - 又来一个
    L-->>G3: 等等,你先阻塞等着

26.4 Mutex.TryLock:尝试加锁,不阻塞

有时候我们不想傻等,想"试试看"能不能拿到锁,能就干,不能就算了。TryLock 就是这么个"佛系"操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
    "fmt"
    "sync"
    "time"
)

var resource = ""
var mu sync.Mutex

func doSomething() {
    // 尝试获取锁
    if mu.TryLock() {
        fmt.Println("成功获取锁,正在处理资源...")
        time.Sleep(100 * time.Millisecond) // 模拟处理
        resource = "处理完成"
        mu.Unlock()
    } else {
        fmt.Println("没拿到锁,我先干点别的...")
        // 可以去做其他事情
    }
}

func main() {
    // 启动两个 goroutine,它们会竞争锁
    go doSomething()
    time.Sleep(10 * time.Millisecond) // 确保第一个先启动
    go doSomething()
    
    time.Sleep(200 * time.Millisecond)
    fmt.Println("最终资源:", resource)
    // 输出: 成功获取锁,正在处理资源...
    //       没拿到锁,我先干点别的...
    //       最终资源: 处理完成
}

专业词汇解释:

  • TryLock:尝试获取锁,如果锁不可用立即返回 false,不会阻塞等待

26.5 defer unlock:最佳实践

写代码最怕什么?忘记 Unlock!一个 defer 可以拯救你于水火之中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
    "fmt"
    "sync"
)

var counter = 0
var mu sync.Mutex

func increment() {
    mu.Lock()
    defer mu.Unlock() // 无论函数怎么返回,都会执行 Unlock
    counter++
    
    // 假设这里有很多代码...
    // 可能有多个 return
    // 可能有 panic
    // 都不怕!defer 保证 Unlock 一定会执行
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("计数器:", counter) // 100,稳稳当当
    // 输出: 计数器: 100
}

为什么用 defer?

  1. 不会忘记 Unlock
  2. 即使代码提前 return 或 panic,锁也会释放
  3. 代码更清晰,锁的范围一目了然

最佳实践:永远使用 defer mu.Unlock(),除非你有极其特殊的理由。


26.6 Mutex 不可重入:同一个 goroutine 两次 Lock 会死锁

Go 的 Mutex 是**不可重入(non-reentrant)**的。这点和 Java 的 synchronized 不同哦!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "fmt"
    "sync"
)

var mu sync.Mutex

func doSomething() {
    mu.Lock()
    fmt.Println("第一层获取锁")
    doOtherThing() // 调用另一个函数,它也要获取同一把锁!
    mu.Unlock()
}

func doOtherThing() {
    mu.Lock() // 同一个 goroutine 再次 Lock,会死锁!
    fmt.Println("第二层获取锁")
    mu.Unlock()
}

func main() {
    // 运行这个程序,你会发现它卡住了(死锁)
    // 因为同一个 goroutine 不能重入同一个 Mutex
    
    go doSomething()
    
    // 等待一下看看会不会输出
    // 实际上会永远阻塞,因为 doOtherThing 永远拿不到锁
    <-make(chan struct{})
}

运行结果:程序会卡住,需要强制终止(Ctrl+C)

专业词汇解释:

  • 可重入锁(Reentrant Lock):同一线程可以多次获得同一把锁
  • 不可重入锁(Non-reentrant Lock):同一线程不能多次获得同一把锁,否则会死锁

为什么 Go 要设计成不可重入?主要是为了性能——不可重入锁实现更简单,开销更小。如果需要重入行为,可以用 sync.RWMutex 或者自己封装。


26.7 sync.RWMutex:读写锁,RLock/RUnlock(读锁)、Lock/Unlock(写锁)

读写锁是个聪明的设计:读操作可以并发进行,写操作才需要独占。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data   string
    rwMu   sync.RWMutex
)

func readData() string {
    rwMu.RLock()         // 获取读锁
    defer rwMu.RUnlock()
    time.Sleep(10 * time.Millisecond) // 模拟读取耗时
    return data
}

func writeData(newData string) {
    rwMu.Lock()          // 获取写锁
    defer rwMu.Unlock()
    time.Sleep(10 * time.Millisecond) // 模拟写入耗时
    data = newData
}

func main() {
    var wg sync.WaitGroup
    
    // 同时启动 5 个读 goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := readData()
            fmt.Printf("Reader %d 读取到: %s\n", id, result)
        }(i)
    }
    
    // 启动 1 个写 goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        writeData("Hello, World!")
        fmt.Println("Writer 写入完成")
    }()
    
    wg.Wait()
    // 读操作会并发执行(因为是 RLock)
    // 写操作会等所有读操作完成后再执行
}

工作原理图:

graph LR
    subgraph 读操作
        R1["Reader 1"] --> RL["RLock"]
        R2["Reader 2"] --> RL
        R3["Reader 3"] --> RL
    end
    
    subgraph 写操作
        W1["Writer"] --> WL["Lock"]
    end
    
    RL -->|"多个读可以并发"| RES1["✓ 并发读取"]
    WL -->|"写需要独占"| RES2["✗ 阻塞等待"]
    
    WL -.->|如果 Writer 在等| RES3["等所有 Reader 完成后"]

专业词汇解释:

  • RWMutex:读写锁,通过 RLock/RUnlock 管理读锁,Lock/Unlock 管理写锁
  • 读锁(Read Lock):也称为共享锁,多个读者可以同时持有
  • 写锁(Write Lock):也称为排他锁,写者持有时其他读者和写者都不能访问

26.8 写优先策略:防止写饥饿

读写锁的默认策略是**写优先(write-preferring)**的。这意味着当有写者在等待时,新的读者会被阻塞,防止写者饿死的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data  = "初始值"
    rwMu  sync.RWMutex
)

func reader(id int) {
    rwMu.RLock()
    defer rwMu.RUnlock()
    fmt.Printf("Reader %d 正在读取: %s\n", id, data)
    time.Sleep(50 * time.Millisecond)
}

func writer(id int, value string) {
    rwMu.Lock()
    defer rwMu.Unlock()
    fmt.Printf("Writer %d 正在写入: %s\n", id, value)
    data = value
    time.Sleep(50 * time.Millisecond)
}

func main() {
    // 场景:2个读者正在读,这时来了1个写者,然后又来2个读者
    // 期望:写者优先,等前2个读者读完,写者写,然后新来的2个读者再读
    
    var wg sync.WaitGroup
    
    // 第一个读者开始读
    wg.Add(1)
    go func() {
        defer wg.Done()
        reader(1)
    }()
    
    time.Sleep(10 * time.Millisecond) // 确保第一个读者先拿到读锁
    
    // 写者来了
    wg.Add(1)
    go func() {
        defer wg.Done()
        writer(1, "写入的值")
    }()
    
    time.Sleep(10 * time.Millisecond) // 确保写者在第一个读者之后申请锁
    
    // 第二个读者来了
    wg.Add(1)
    go func() {
        defer wg.Done()
        reader(2)
    }()
    
    // 第三个读者来了
    wg.Add(1)
    go func() {
        defer wg.Done()
        reader(3)
    }()
    
    wg.Wait()
    
    fmt.Println("最终 data:", data)
}

执行顺序(理想情况):

  1. Reader 1 获取读锁,开始读取
  2. Writer 1 尝试获取写锁,但因为 Reader 1 持有读锁,被阻塞
  3. Reader 2、Reader 3 尝试获取读锁,但因为 Writer 1 在等待,被阻塞(写优先!)
  4. Reader 1 读完,释放读锁
  5. Writer 1 获得写锁,开始写入
  6. Writer 1 写完,释放写锁
  7. Reader 2、Reader 3 获得读锁,开始读取

注意:Go 的 RWMutex 写优先策略并不能 100% 保证写者不被饿死(starvation),但在大多数情况下能有效防止。


26.9 sync.WaitGroup:等待一组任务完成,Add/Done/Wait

WaitGroup 就像一个计数器——每启动一个任务就加 1,任务完成就减 1,主 goroutine 可以等待计数器归零。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 任务完成,计数器减 1
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Duration(id) * 100 * time.Millisecond)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    fmt.Println("主 goroutine 启动 5 个 worker...")
    
    // 启动 5 个 worker
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 计数器加 1
        go worker(i, &wg)
    }
    
    fmt.Println("主 goroutine 等待所有 worker 完成...")
    wg.Wait() // 阻塞,直到计数器归零
    fmt.Println("所有 worker 完成!主 goroutine 继续执行")
    
    // 输出:
    // 主 goroutine 启动 5 个 worker...
    // 主 goroutine 等待所有 worker 完成...
    // Worker 1 开始工作
    // Worker 2 开始工作
    // Worker 3 开始工作
    // Worker 4 开始工作
    // Worker 5 开始工作
    // Worker 1 完成工作
    // Worker 2 完成工作
    // Worker 3 完成工作
    // Worker 4 完成工作
    // Worker 5 完成工作
    // 所有 worker 完成!主 goroutine 继续执行
}

工作流程图:

sequenceDiagram
    participant Main as 主 Goroutine
    participant WG as WaitGroup
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant W3 as Worker 3
    
    Main->>WG: Add(3)
    Note over WG: 计数器 = 3
    Main->>W1: 启动 goroutine
    Main->>W2: 启动 goroutine
    Main->>W3: 启动 goroutine
    Main->>WG: Wait() - 阻塞
    W1->>WG: Done() - 计数器 = 2
    W2->>WG: Done() - 计数器 = 1
    W3->>WG: Done() - 计数器 = 0
    WG-->>Main: 解除阻塞,继续执行

专业词汇解释:

  • Add(delta int):增加计数器,可以是正数或负数
  • Done():将计数器减 1,相当于 Add(-1)
  • Wait():阻塞直到计数器归零

26.10 goroutine 泄漏:忘记 Done 会导致 Wait 永久阻塞

WaitGroup 的天敌是什么?忘记调用 Done()! 一旦忘记,你的程序会永远等下去,就像等一个永远不会回来的人。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
    "fmt"
    "sync"
    "time"
)

func leakyTask(wg *sync.WaitGroup) {
    // 哎呀,忘记调用 wg.Done() 了!
    fmt.Println("任务执行中...")
    time.Sleep(time.Second)
    fmt.Println("任务执行完毕,但忘了 Done()")
    // return 时没有调用 wg.Done()
    // 这会导致 Wait() 永久阻塞!
}

func main() {
    var wg sync.WaitGroup
    
    wg.Add(1)
    go leakyTask(&wg)
    
    fmt.Println("等待任务完成...")
    
    // 设置一个超时,防止真的永久等待
    done := make(chan struct{})
    go func() {
        wg.Wait()
        close(done)
    }()
    
    select {
    case <-done:
        fmt.Println("任务完成了!")
    case <-time.After(3 * time.Second):
        fmt.Println("超时了!任务可能泄漏了!")
    }
    
    // 实际运行你会看到:
    // 等待任务完成...
    // 任务执行中...
    // 任务执行完毕,但忘了 Done()
    // 超时了!任务可能泄漏了!
}

泄漏示意图:

graph TD
    A["Wait() 阻塞中..."] --> B{"计数器 == 0 ?"}
    B -->|否| A
    B -->|是| C["继续执行"]
    
    style A fill:#ff6b6b
    style B fill:#feca57
    style C fill:#5cd89a

最佳实践:使用 defer wg.Done(),就像 defer mu.Unlock() 一样,让编译器帮你擦屁股。


26.11 sync.Once:一次性初始化,Once.Do(f) 只执行一次

sync.Once 保证某个函数只会被执行一次,即使在多个 goroutine 同时调用的情况下。堪称"单实例模式"的最佳伴侣。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "fmt"
    "sync"
)

var (
    once     sync.Once
    resource string
)

func initResource() {
    fmt.Println("初始化资源...(只会执行一次)")
    resource = "已初始化的资源"
}

func getResource() string {
    once.Do(initResource) // 无论调用多少次,只执行一次
    return resource
}

func main() {
    var wg sync.WaitGroup
    
    // 启动 10 个 goroutine 同时获取资源
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := getResource()
            fmt.Printf("Goroutine %d 获取到: %s\n", id, result)
        }(i)
    }
    
    wg.Wait()
    
    // 输出:
    // 初始化资源...(只会执行一次)
    // Goroutine 0 获取到: 已初始化的资源
    // Goroutine 1 获取到: 已初始化的资源
    // ...(所有 goroutine 都获取到同一个资源)
}

工作原理图:

sequenceDiagram
    participant G1 as Goroutine 1
    participant G2 as Goroutine 2
    participant G3 as Goroutine 3
    participant O as sync.Once
    
    G1->>O: Do(f)
    O->>O: 执行 f()
    O-->>G1: 完成
    G2->>O: Do(f)
    O-->>G2: 直接返回(已执行过)
    G3->>O: Do(f)
    O-->>G3: 直接返回(已执行过)
    
    Note over O: 内部的"已执行"标志确保 f() 只运行一次

专业词汇解释:

  • Once:一个同步原语,保证给定函数只会执行一次
  • Do(f func()):执行函数 f,如果已经执行过则直接返回

经典应用场景:配置文件读取、数据库连接初始化、Logger 初始化等只需要执行一次的操作。


26.12 sync.OnceFunc、sync.OnceValue(Go 1.21+):更便捷的封装

Go 1.21 引入了更方便的封装,让 Once 的使用更加直观。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
    "fmt"
    "sync"
    "time"
)

// Go 1.21+ 示例
func main() {
    // sync.OnceFunc: 返回一个只执行一次的函数
    getConfigOnce := sync.OnceFunc(func() string {
        fmt.Println("正在加载配置...")
        time.Sleep(100 * time.Millisecond)
        return "配置内容"
    })
    
    // sync.OnceValue: 返回一个函数,调用时返回Once执行的结果
    getDatabaseDSNOnce := sync.OnceValue(func() string {
        fmt.Println("正在连接数据库...")
        time.Sleep(100 * time.Millisecond)
        return "postgres://localhost:5432/mydb"
    })
    
    var wg sync.WaitGroup
    
    // 测试 OnceFunc
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            config := getConfigOnce()
            fmt.Printf("Goroutine %d 获取配置: %s\n", id, config)
        }(i)
    }
    
    // 测试 OnceValue
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            dsn := getDatabaseDSNOnce()
            fmt.Printf("Goroutine %d 获取 DSN: %s\n", id, dsn)
        }(i)
    }
    
    wg.Wait()
    
    // 输出(注意配置和数据库只初始化一次):
    // 正在加载配置...
    // Goroutine 0 获取配置: 配置内容
    // Goroutine 1 获取配置: 配置内容
    // Goroutine 2 获取配置: 配置内容
    // 正在连接数据库...
    // Goroutine 0 获取 DSN: postgres://localhost:5432/mydb
    // Goroutine 1 获取 DSN: postgres://localhost:5432/mydb
    // Goroutine 2 获取 DSN: postgres://localhost:5432/mydb
}

专业词汇解释:

  • OnceFunc(f func()) func():返回一个函数,该函数调用时执行 f 且仅执行一次
  • OnceValue(f func() T) func() T:返回一个函数,调用时返回 f() 的结果,且 f 仅执行一次

这两个函数是 Go 1.21 新增的,比传统的 sync.Once + 闭包更简洁!


26.13 sync.Cond:条件变量,Cond.Wait 自动释放锁并阻塞

Cond(Condition Variable)是 Go 中实现等待-通知模式的神器。Wait() 会自动释放锁并阻塞当前 goroutine,直到被唤醒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    mu    sync.Mutex
    cond  = sync.NewCond(&mu)
    ready bool
)

func waiter(id int) {
    mu.Lock()
    for !ready { // 注意:必须用 for 循环,见 26.16
        fmt.Printf("Worker %d: 还没准备好,我等着...\n", id)
        cond.Wait() // 自动释放锁,并阻塞等待
        // 被唤醒后,Wait() 会重新获取锁,goroutine 醒来继续执行
    }
    fmt.Printf("Worker %d: 准备好了!开始工作!\n", id)
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    
    // 启动 3 个等待者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            waiter(id)
        }(i)
    }
    
    time.Sleep(1 * time.Second) // 确保所有 waiter 都开始等待
    
    fmt.Println("主 goroutine: 准备好了,通知所有等待者")
    
    mu.Lock()
    ready = true
    mu.Unlock()
    
    cond.Broadcast() // 唤醒所有等待者
    
    time.Sleep(500 * time.Millisecond)
    wg.Wait()
    
    // 输出顺序可能不同,但大致是:
    // Worker 1: 还没准备好,我等着...
    // Worker 2: 还没准备好,我等着...
    // Worker 3: 还没准备好,我等着...
    // 主 goroutine: 准备好了,通知所有等待者
    // Worker 1: 准备好了!开始工作!
    // Worker 2: 准备好了!开始工作!
    // Worker 3: 准备好了!开始工作!
}

Wait() 的工作原理:

sequenceDiagram
    participant G as Goroutine
    participant M as Mutex
    participant C as Cond
    
    G->>M: Lock()
    G->>C: Wait()
    Note over G,M: 1. Wait() 自动调用 M.Unlock()<br/>2. Goroutine 进入等待状态
    G->>M: Unlock()(Wait 内部)
    Note over G: Goroutine 阻塞中...
    
    Note over C: 其他地方调用 Signal/Broadcast
    C->>G: 唤醒
    G->>M: Lock()(Wait 返回前重新 Lock)
    G->>M: Unlock()

专业词汇解释:

  • Cond:条件变量,基于某个条件进行等待和通知的同步原语
  • Wait():阻塞当前 goroutine,自动释放关联的锁,等待 Signal 或 Broadcast 唤醒

26.14 Cond.Signal:唤醒一个等待者

Signal 唤醒其中一个正在等待的 goroutine(通常是等待时间最长的那个)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    mu    sync.Mutex
    cond  = sync.NewCond(&mu)
    tasks = make(chan string, 10)
)

func worker(id int) {
    for {
        mu.Lock()
        for len(tasks) == 0 { // 队列为空,等待
            cond.Wait()
        }
        task := <-tasks
        mu.Unlock()
        
        fmt.Printf("Worker %d 收到任务: %s\n", id, task)
        time.Sleep(100 * time.Millisecond) // 模拟工作
    }
}

func main() {
    // 启动 3 个 worker
    for i := 1; i <= 3; i++ {
        go worker(i)
    }
    
    time.Sleep(100 * time.Millisecond) // 确保 worker 们都开始等待
    
    // 逐个添加任务并通知
    for i := 1; i <= 5; i++ {
        mu.Lock()
        tasks <- fmt.Sprintf("任务-%d", i)
        mu.Unlock()
        cond.Signal() // 只唤醒一个等待者
        
        time.Sleep(200 * time.Millisecond)
    }
    
    // 输出(每次 Signal 只唤醒一个 worker):
    // Worker 1 收到任务: 任务-1
    // Worker 2 收到任务: 任务-2
    // Worker 3 收到任务: 任务-3
    // Worker 1 收到任务: 任务-4
    // Worker 2 收到任务: 任务-5
}

Signal vs Broadcast:

graph LR
    subgraph Signal
        S1["等待者 1"] 
        S2["等待者 2"]
        S3["等待者 3"]
        SIG["Signal()"]
        SIG -->|"只唤醒一个"| S1
    end
    
    subgraph Broadcast
        B1["等待者 1"]
        B2["等待者 2"]
        B3["等待者 3"]
        BRO["Broadcast()"]
        BRO -->|"唤醒所有"| B1
        BRO -->|"唤醒所有"| B2
        BRO -->|"唤醒所有"| B3
    end

26.15 Cond.Broadcast:唤醒所有等待者

Broadcast 会唤醒所有正在等待的 goroutine。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    mu       sync.Mutex
    cond     = sync.NewCond(&mu)
    progress int
)

func monitor(id int) {
    mu.Lock()
    defer mu.Unlock()
    
    for progress < 100 {
        cond.Wait()
        fmt.Printf("Monitor %d: 收到通知,当前进度 %d%%\n", id, progress)
    }
}

func main() {
    // 启动 3 个监控 goroutine
    for i := 1; i <= 3; i++ {
        go monitor(i)
    }
    
    time.Sleep(100 * time.Millisecond)
    
    // 主 goroutine 更新进度并通知所有监控者
    for i := 10; i <= 100; i += 10 {
        mu.Lock()
        progress = i
        mu.Unlock()
        
        fmt.Printf("主 goroutine: 更新进度到 %d%%,广播通知\n", i)
        cond.Broadcast() // 唤醒所有等待者
        
        time.Sleep(200 * time.Millisecond)
    }
    
    // 输出:
    // 主 goroutine: 更新进度到 10%,广播通知
    // Monitor 1: 收到通知,当前进度 10%
    // Monitor 2: 收到通知,当前进度 10%
    // Monitor 3: 收到通知,当前进度 10%
    // 主 goroutine: 更新进度到 20%,广播通知
    // Monitor 1: 收到通知,当前进度 20%
    // ...
}

26.16 虚假唤醒:必须用 for 循环而不是 if 判断条件

这是 sync.Cond 最容易踩的坑!Go 的实现允许虚假唤醒(spurious wakeup),即 Wait() 可能在没有调用 Signal/Broadcast 的情况下返回。所以必须用 for 循环 而不是 if 来判断条件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
    "fmt"
    "sync"
    "time"
)

// 错误示范:使用 if 判断
func wrongWay() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false
    
    go func() {
        time.Sleep(2 * time.Second)
        mu.Lock()
        ready = true
        mu.Unlock()
        cond.Signal() // 只唤醒一个
    }()
    
    mu.Lock()
    if !ready { // ❌ 错误!应该用 for
        fmt.Println("错误方式: 准备等待...")
        cond.Wait()
    }
    fmt.Println("错误方式: 继续执行")
    mu.Unlock()
}

// 正确示范:使用 for 循环
func rightWay() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false
    
    go func() {
        time.Sleep(2 * time.Second)
        mu.Lock()
        ready = true
        mu.Unlock()
        cond.Broadcast() // 唤醒所有
    }()
    
    mu.Lock()
    for !ready { // ✅ 正确!循环检查条件
        fmt.Println("正确方式: 准备等待...")
        cond.Wait()
    }
    fmt.Println("正确方式: 继续执行")
    mu.Unlock()
}

func main() {
    fmt.Println("=== 测试错误方式 ===")
    wrongWay()
    
    time.Sleep(500 * time.Millisecond)
    
    fmt.Println("\n=== 测试正确方式 ===")
    rightWay()
}

为什么需要 for 循环?

graph TD
    A["Wait() 返回"] --> B{"条件真的满足了吗?"}
    B -->|是| C["继续执行"]
    B -->|否(虚假唤醒)| D["继续 Wait()"]
    D --> A
    
    style B fill:#feca57
    style C fill:#5cd89a
    style D fill:#ff6b6b

记住:永远用 for 而不是 if 来检查条件,即使你确定不会有虚假唤醒。这也是防御性编程的体现。


26.17 sync.Map:并发安全 Map,read + dirty 双层结构

sync.Map 是 Go 专门为并发场景设计的 Map,采用了 read + dirty 双层结构 来优化读性能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map
    
    // Store: 存储键值对
    m.Store("name", "小明")
    m.Store("age", 18)
    m.Store("city", "北京")
    
    // Load: 读取值
    if value, ok := m.Load("name"); ok {
        fmt.Printf("读取到 name: %v\n", value)
    }
    
    // LoadOrStore: 读取或存储(如果不存在)
    if value, loaded := m.LoadOrStore("country", "中国"); loaded {
        fmt.Printf("country 已存在: %v\n", value)
    } else {
        fmt.Println("country 是新插入的")
    }
    
    // Delete: 删除
    m.Delete("city")
    
    // Range: 遍历
    fmt.Println("\n遍历所有键值对:")
    m.Range(func(key, value interface{}) bool {
        fmt.Printf("  %s = %v\n", key, value)
        return true // 返回 true 继续遍历,false 停止遍历
    })
    
    // 输出:
    // 读取到 name: 小明
    // country 是新插入的
    // 遍历所有键值对:
    //   name = 小明
    //   age = 18
    //   country = 中国
}

sync.Map 内部结构:

graph TB
    subgraph sync.Map 内部结构
        R["read map<br/>(只读的快照)"]
        D["dirty map<br/>(可写的)"]
        P["epilogue<br/>(miss 计数)"]
    end
    
    R -->|"未命中时"| D
    D -->|"升级时"| R

专业词汇解释:

  • read map:存储所有数据(大部分是只读的),读取不需要加锁
  • dirty map:存储新写入的、尚未同步到 read 的数据
  • miss 计数:当 read 中找不到时,会到 dirty 中找,找不到则 miss + 1

26.18 Map.Store、Map.Load、Map.Delete、Map.Range:基本操作

sync.Map 提供了与普通 Map 类似的操作,但都是并发安全的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map
    
    // ========== Store ==========
    // 存储键值对,类似 map[key] = value
    m.Store("apple", "苹果")
    m.Store("banana", "香蕉")
    m.Store(123, "数字键也行")
    
    // ========== Load ==========
    // 读取指定 key 的值
    value, ok := m.Load("apple")
    if ok {
        fmt.Printf("Load 苹果: %s\n", value)
    }
    
    // 不存在的 key
    _, ok = m.Load("grape")
    if !ok {
        fmt.Println("Load 葡萄: 不存在")
    }
    
    // ========== Delete ==========
    // 删除指定 key
    m.Store("temp", "临时数据")
    fmt.Printf("Delete 前: ")
    if v, _ := m.Load("temp"); v != nil {
        fmt.Printf("temp = %s\n", v)
    }
    
    m.Delete("temp")
    fmt.Printf("Delete 后: ")
    if _, ok := m.Load("temp"); !ok {
        fmt.Println("temp 已被删除")
    }
    
    // ========== Range ==========
    // 遍历所有键值对
    fmt.Println("\nRange 遍历:")
    m.Store("x", 1)
    m.Store("y", 2)
    m.Store("z", 3)
    m.Range(func(key, value interface{}) bool {
        fmt.Printf("  %v -> %v\n", key, value)
        return true
    })
    
    // 输出:
    // Load 苹果: 苹果
    // Load 葡萄: 不存在
    // Delete 前: temp = 临时数据
    // Delete 后: temp 已被删除
    // Range 遍历:
    //   apple -> 苹果
    //   banana -> 香蕉
    //   123 -> 数字键也行
    //   x -> 1
    //   y -> 2
    //   z -> 3
}

26.19 Map.LoadOrStore、Map.LoadAndDelete、Map.Swap:原子操作

这些是 sync.Map 提供的高级原子操作,可以让读取、删除、替换操作一步完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map
    
    // ========== LoadOrStore ==========
    // 如果 key 存在,返回现有值;不存在则存入并返回
    value, loaded := m.LoadOrStore("name", "小明")
    fmt.Printf("第一次 LoadOrStore: value=%s, loaded=%v\n", value, loaded)
    // loaded = false,因为是新插入
    
    value, loaded = m.LoadOrStore("name", "小红")
    fmt.Printf("第二次 LoadOrStore: value=%s, loaded=%v\n", value, loaded)
    // loaded = true,返回的是原来的"小明"
    
    // ========== LoadAndDelete ==========
    // 读取并删除,如果 key 存在返回原值,不存在返回 nil
    m.Store("toDelete", "待删除的值")
    
    value, loaded = m.LoadAndDelete("toDelete")
    fmt.Printf("LoadAndDelete: value=%s, loaded=%v\n", value, loaded)
    
    _, loaded = m.Load("toDelete")
    fmt.Printf("确认删除: loaded=%v\n", loaded)
    
    // ========== Swap ==========
    // 交换新的值,并返回旧的值
    m.Store("swapKey", "旧值")
    
    oldValue, loaded := m.Swap("swapKey", "新值")
    fmt.Printf("Swap: oldValue=%s, loaded=%v\n", oldValue, loaded)
    
    newValue, _ := m.Load("swapKey")
    fmt.Printf("Swap 后新值: %s\n", newValue)
    
    // Swap 一个不存在的 key
    oldValue, loaded = m.Swap("newKey", "全新值")
    fmt.Printf("Swap 新 key: oldValue=%v, loaded=%v\n", oldValue, loaded)
    
    // 输出:
    // 第一次 LoadOrStore: value=小明, loaded=false
    // 第二次 LoadOrStore: value=小明, loaded=true
    // LoadAndDelete: value=待删除的值, loaded=true
    // 确认删除: loaded=false
    // Swap: oldValue=旧值, loaded=true
    // Swap 后新值: 新值
    // Swap 新 key: oldValue=<nil>, loaded=false
}

专业词汇解释:

  • LoadOrStore:原子地加载或存储,返回是否已存在
  • LoadAndDelete:原子地加载并删除(也叫"pop"操作)
  • Swap:原子地交换新值并返回旧值

26.20 sync.Map vs sync.RWMutex + map:Map 适合"写少读多且 key 集合相对稳定"的场景

不是所有场景都需要 sync.Map!普通 map + RWMutex 在某些场景下可能更高效。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
    "fmt"
    "sync"
    "time"
)

// 场景对比:10 个读 goroutine,1 个写 goroutine
func benchmark(name string, fn func()) {
    start := time.Now()
    fn()
    fmt.Printf("%s 耗时: %v\n", name, time.Since(start))
}

func main() {
    const goroutines = 10
    const iterations = 100000
    
    // ========== sync.Map 版本 ==========
    var syncMap sync.Map
    for i := 0; i < 100; i++ {
        syncMap.Store(i, i*i)
    }
    
    benchmark("sync.Map", func() {
        var wg sync.WaitGroup
        wg.Add(goroutines)
        
        for g := 0; g < goroutines; g++ {
            go func(id int) {
                defer wg.Done()
                for i := 0; i < iterations; i++ {
                    syncMap.Load(id % 100)
                }
            }(g)
        }
        
        wg.Wait()
    })
    
    // ========== RWMutex + map 版本 ==========
    var (
        rwMu sync.RWMutex
        m    = make(map[int]int)
    )
    for i := 0; i < 100; i++ {
        m[i] = i*i
    }
    
    benchmark("RWMutex+map", func() {
        var wg sync.WaitGroup
        wg.Add(goroutines)
        
        for g := 0; g < goroutines; g++ {
            go func(id int) {
                defer wg.Done()
                for i := 0; i < iterations; i++ {
                    rwMu.RLock()
                    _ = m[id%100]
                    rwMu.RUnlock()
                }
            }(g)
        }
        
        wg.Wait()
    })
    
    fmt.Println("\n何时用 sync.Map?")
    fmt.Println("✅ 写少读多(读多写少时 sync.Map 的 read 缓存命中率高)")
    fmt.Println("✅ key 集合相对稳定(不会频繁增删)")
    fmt.Println("✅ 多个 goroutine 独立读写不同的 key(减少竞争)")
    fmt.Println()
    fmt.Println("何时用 RWMutex + map?")
    fmt.Println("✅ 需要对 map 进行复杂操作(计数、过滤等)")
    fmt.Println("✅ key 集合经常变化")
    fmt.Println("✅ 需要 Map 的所有功能(如 len())")
}

选择决策图:

graph TD
    A["选择并发 Map 方案"] --> B{"读多还是写多?"}
    B -->|写多| C["用 RWMutex + map"]
    B -->|读多| D{"key 集合稳定吗?"}
    D -->|稳定| E["sync.Map"]
    D -->|不稳定| F{"需要复杂操作?"}
    F -->|是| C
    F -->|否| E

26.21 sync.Pool:对象池,Pool.Get 可能返回 nil

sync.Pool 是 Go 的临时对象池,用于缓存使用频繁但创建成本高的对象。Get() 返回的对象可能是之前 Put() 进去的,也可能是 nil(需要你新建)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
    "fmt"
    "sync"
)

var (
    // 创建成本较高的对象(比如数据库连接)
    pool = &sync.Pool{
        New: func() interface{} {
            fmt.Println("创建新对象(成本高)...")
            return &MyObject{name: "新对象"}
        },
    }
)

type MyObject struct {
    name string
    data [1024]byte // 模拟一些内存占用
}

func main() {
    fmt.Println("=== 第一次 Get ===")
    obj := pool.Get()
    fmt.Printf("获取到对象: %v\n", obj)
    
    fmt.Println("\n=== 放回去 ===")
    pool.Put(obj)
    fmt.Println("对象已放回池中")
    
    fmt.Println("\n=== 第二次 Get(应该复用) ===")
    obj2 := pool.Get()
    fmt.Printf("获取到对象: %v\n", obj2)
    
    fmt.Println("\n=== 第三次 Get(池已空) ===")
    obj3 := pool.Get()
    fmt.Printf("获取到对象: %v\n", obj3)
    
    // 输出:
    // === 第一次 Get ===
    // 创建新对象(成本高)...
    // 获取到对象: &{新对象}
    // === 放回去 ===
    // 对象已放回池中
    // === 第二次 Get(应该复用) ===
    // 获取到对象: &{新对象}(没有打印"创建新对象",说明复用了)
    // === 第三次 Get(池已空) ===
    // 创建新对象(成本高)...
    // 获取到对象: &{新对象}
}

专业词汇解释:

  • Pool:临时对象池,用于减少 GC 压力和对象分配开销
  • Get():从池中获取对象,如果池为空则调用 New() 创建
  • Put(x):将对象放回池中,供后续 Get() 使用

26.22 Pool.New:对象创建函数,当 Pool 为空时调用

New 是 Pool 的创建函数,当调用 Get() 但池为空时会调用它来创建新对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "fmt"
    "sync"
)

// 模拟一个创建成本较高的对象
type ExpensiveObject struct {
    ID   int
    data []byte
}

func main() {
    createCount := 0
    
    pool := &sync.Pool{
        New: func() interface{} {
            createCount++
            id := createCount
            fmt.Printf("    [创建 #%d] 创建新对象,ID=%d\n", id, id)
            return &ExpensiveObject{
                ID:   id,
                data: make([]byte, 1024), // 模拟内存分配
            }
        },
    }
    
    fmt.Println("=== 第一次 Get(池空,创建 #1)===")
    obj1 := pool.Get()
    fmt.Printf("获取到对象 ID=%d\n", obj1.(*ExpensiveObject).ID)
    
    fmt.Println("\n=== 放回去 ===")
    pool.Put(obj1)
    
    fmt.Println("\n=== 第二次 Get(复用对象)===")
    obj2 := pool.Get()
    fmt.Printf("获取到对象 ID=%d\n", obj2.(*ExpensiveObject).ID)
    
    fmt.Println("\n=== 第三次 Get(池空了,创建 #2)===")
    obj3 := pool.Get()
    fmt.Printf("获取到对象 ID=%d\n", obj3.(*ExpensiveObject).ID)
    
    fmt.Printf("\n总共创建了 %d 个对象\n", createCount)
    
    // 输出:
    // === 第一次 Get(池空,创建 #1)===
    //     [创建 #1] 创建新对象,ID=1
    // 获取到对象 ID=1
    // === 放回去 ===
    // === 第二次 Get(复用对象)===
    // 获取到对象 ID=1(没有创建新对象!)
    // === 第三次 Get(池空了,创建 #2)===
    //     [创建 #2] 创建新对象,ID=2
    // 获取到对象 ID=2
    // 总共创建了 2 个对象
}

26.23 Pool 的 GC 行为:Pool 在 GC 之前会清空,不能用 Pool 保存持久数据

sync.Pool 的一个重要特性:它在 GC 时会被清空!这是设计上的权衡,意味着 Pool 不能用来存储持久数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    pool := &sync.Pool{
        New: func() interface{} {
            return "新对象"
        },
    }
    
    // 放入一个对象
    pool.Put("持久数据-来自用户")
    
    fmt.Println("=== GC 前 ===")
    if v := pool.Get(); v != nil {
        fmt.Printf("Get: %v\n", v)
    }
    
    // 强制 GC
    fmt.Println("\n=== 强制 GC ===")
    runtime.GC()
    time.Sleep(100 * time.Millisecond) // 给 GC 一点时间
    
    fmt.Println("\n=== GC 后 ===")
    // GC 后池被清空,Get 会调用 New 创建新对象
    if v := pool.Get(); v != nil {
        fmt.Printf("Get: %v(不再是'持久数据'了)\n", v)
    }
    
    // 输出:
    // === GC 前 ===
    // Get: 持久数据-来自用户
    // === 强制 GC ===
    // === GC 后 ===
    // Get: 新对象(不再是'持久数据'了)
    
    fmt.Println("\n⚠️ Pool 不适合存储持久数据!")
    fmt.Println("Pool 的设计目标:")
    fmt.Println("1. 减少内存分配和 GC 压力")
    fmt.Println("2. 临时对象复用")
    fmt.Println("3. 不保证数据一定会被复用(GC 会清空)")
}

Pool 的生命周期图:

graph LR
    subgraph Pool 生命周期
        P1["Put(obj)"] -->|"GC 发生"| C["清空 Pool"]
        C -->|"Get() 时"| P2["New() 创建"]
        P2 --> P1
    end
    
    style C fill:#ff6b6b
    Note right of C: GC 会清空所有 Pool<br/>数据可能丢失!

记住:sync.Pool 是用来复用的,不是用来存储的。每次 GC 后,池里的东西可能全没了。


26.24 sync.SeqLock(Go 1.19+):序列锁,读多写少的无锁同步

SeqLock(Sequence Lock)是一种特殊的多读者、单写者的无锁同步机制。通过递增的序列号来检测冲突。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type SeqLock struct {
    seq uint64
}

func (sl *SeqLock) Lock() {
    atomic.AddUint64(&sl.seq, 1) // 写入前序列号 +1(变成奇数)
}

func (sl *SeqLock) Unlock() {
    atomic.AddUint64(&sl.seq, 1) // 写入后序列号再 +1(变成偶数)
}

// 读开始:返回读取时的序列号
func (sl *SeqLock) RLock() uint64 {
    return atomic.LoadUint64(&sl.seq)
}

// 读结束:检查序列号是否变化
func (sl *SeqLock) RUnlock(startSeq uint64) bool {
    return atomic.LoadUint64(&sl.seq) == startSeq
}

var (
    seqlock SeqLock
    counter int64
)

func writer() {
    for i := 0; i < 1000; i++ {
        seqlock.Lock()
        counter++
        seqlock.Unlock()
    }
}

func reader() int64 {
    var sum int64
    for i := 0; i < 1000; i++ {
        startSeq := seqlock.RLock()
        sum += counter
        if !seqlock.RUnlock(startSeq) {
            // 序列号变了,说明读取期间有写入,重试
            i-- // 这次不算
        }
    }
    return sum
}

func main() {
    var wg sync.WaitGroup
    
    // 5 个写 goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            writer()
        }()
    }
    
    // 5 个读 goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sum := reader()
            fmt.Printf("Reader %d sum: %d\n", id, sum)
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("最终 counter: %d (期望: 5000)\n", counter)
}

SeqLock 原理图:

graph LR
    subgraph 序列号状态
        E1["偶数 = 稳定状态"]
        O1["奇数 = 写入中"]
    end
    
    W["Writer"] -->|"写入前"| O1
    O1 -->|"写入后"| E1
    
    R1["Reader 1"] -->|"读取前"| E1
    E1 -->|"读取后再检查"| E2{"序列号一致?"}
    E2 -->|"是| OK
    E2 -->|"否| R1
    
    style O1 fill:#feca57
    style E1 fill:#5cd89a

专业词汇解释:

  • SeqLock:序列锁,通过递增的序列号来检测读写冲突
  • 写锁:写入前后序列号各 +1,保证序列号为奇数时表示正在写入
  • 读锁:读取前后检查序列号,如果一致说明没有写入冲突

SeqLock 适合读多写少的场景,写操作不会阻塞读操作(除非检测到冲突需要重试)。


26.25 sync.Map、sync.WaitGroup、sync.Once:不可复制,使用指针传递

Go 的 sync 包中有些类型不能被复制!这是因为它们内部包含了隐藏的同步状态,复制后会导致奇怪的行为。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
    "fmt"
    "sync"
)

// 这个函数错误地按值传递了 sync.WaitGroup
func wrongFunction(wg sync.WaitGroup) {
    wg.Add(1)
    // ...
    wg.Done()
}

// 正确做法:使用指针传递
func correctFunction(wg *sync.WaitGroup) {
    wg.Add(1)
    // ...
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    
    // 错误做法会编译警告(或错误,取决于 Go 版本)
    // go vet 会检测出这个问题
    // wrongFunction(wg) // 不要这样做!
    
    // 正确做法
    correctFunction(&wg)
    
    wg.Wait()
    fmt.Println("完成")
    
    // ========== 同样问题也适用于其他 sync 类型 ==========
    // 错误
    // m1 := sync.Map{}
    // m2 := m1 // 复制!
    
    // 正确
    // m1 := &sync.Map{} // 使用指针
}

为什么不能复制?

graph TD
    subgraph 复制后的 WaitGroup
        W1["原始 WaitGroup<br/>计数器=0"]
        W2["复制的 WaitGroup<br/>计数器=0"]
    end
    
    subgraph 问题
        A["原始调用 Done()"] -->|"计数器=0-1=-1"| E["恐慌!"]
    end
    
    style E fill:#ff6b6b
类型能否复制正确用法
sync.Mutexvar mu sync.Mutexmu := &sync.Mutex{}
sync.RWMutex使用指针
sync.WaitGroup使用指针 &wg
sync.Once使用指针
sync.Map使用指针
sync.Cond使用指针
sync.Pool使用指针

重要:Go 1.21+,go vet 默认会检测这些问题。强烈建议使用 go vet ./... 检查代码。


本章小结

sync 包是 Go 并发编程的基础设施,本章我们详细介绍了以下内容:

核心概念

  • 数据竞争(Data Race):多个 goroutine 同时访问共享数据,且至少有一个是写操作
  • 同步原语:用于协调多个 goroutine 执行顺序的工具

锁家族

类型用途特点
sync.Mutex互斥锁最基础,保证同一时刻只有一个 goroutine 进入临界区
sync.RWMutex读写锁读多写少场景,读操作可并发,写操作独占
Mutex.TryLock尝试锁不阻塞,立即返回是否成功获取锁

等待与通知

  • sync.WaitGroup:等待一组 goroutine 完成,Add/Done/Wait 三板斧
  • sync.Cond:条件变量,Wait/Signal/Broadcast 实现等待-通知模式

一次性执行

  • sync.Once:保证函数只执行一次
  • sync.OnceFunc/OnceValue(Go 1.21+):更便捷的封装

并发安全容器

  • sync.Map:read + dirty 双层结构,适合写少读多、key 集合稳定的场景
  • sync.Pool:临时对象池,注意 GC 会清空,不能存储持久数据

无锁同步

  • sync.SeqLock(Go 1.19+):序列锁,适合读多写少的无锁场景

最佳实践

  1. 永远使用 defer mu.Unlock()——防止忘记释放锁
  2. 永远使用 defer wg.Done()——防止 WaitGroup 泄漏
  3. 永远用 for 而不是 if 检查 Cond 条件——防止虚假唤醒
  4. sync 类型使用指针传递——防止复制导致状态丢失
  5. 使用 go run -race——检测数据竞争

何时选择何种工具?

graph TD
    A["并发场景"] --> B{"需要保护什么?"}
    B -->|单个值/资源| C["Mutex"]
    B -->|读多写少| D["RWMutex"]
    B -->|等待一组完成| E["WaitGroup"]
    B -->|"等待+通知"| F["Cond"]
    B -->|只执行一次| G["Once"]
    B -->|"并发 Map"| H{"key 稳定吗?"}
    H -->|是| I["sync.Map"]
    H -->|否| J["RWMutex + map"]
    B -->|临时对象复用| K["Pool"]
    B -->|"读多写少无锁"| L["SeqLock"]

掌握 sync 包,就掌握了 Go 并发编程的"内功心法"。下一章我们将介绍 context 包,学习如何优雅地取消 goroutine 和传递请求作用域的数据。

最后修改 March 30, 2026: 新增 Go 标准库基础 教程 (acbc3f6)