第26章 协程 Goroutine

第26章 协程 Goroutine

如果说线程是"单线程社恐",一次只能专心做一件事;那Goroutine就是"社交达人",可以同时和很多人聊天!

想象你是一位餐厅服务员:

  • 传统模式:你一次只能服务一桌客人(点菜、上菜、结账都得排队)
  • Go模式:你有了"分身术",可以同时服务几十桌客人!

这就是Goroutine的魅力——用极低的成本,实现超高并发。

26.1 协程概念

26.1.1 线程 vs 协程

为什么Go选择用Goroutine而不是线程?让我们对比一下:

特性传统线程Goroutine
栈大小固定1-2MB初始2KB,可增长至1GB
管理方操作系统内核Go运行时(用户态)
创建成本约1-2μs约200ns(快10倍!)
切换成本约1-2μs约200ns
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    // 查看当前系统的CPU核心数
    cpuNum := runtime.NumCPU()
    fmt.Printf("CPU核心数: %d\n", cpuNum) // CPU核心数: 8

    // GOMAXPROCS:控制有多少个"调度器"同时运行
    // 通常设置为CPU核心数
    gomaxprocs := runtime.GOMAXPROCS(0)
    fmt.Printf("GOMAXPROCS(调度P数量): %d\n", gomaxprocs) // GOMAXPROCS(调度P数量): 8

    const count = 100_000  // 创建10万个协程
    done := make(chan bool, count)

    start := time.Now()

    // 批量创建协程
    for i := 0; i < count; i++ {
        go func() {
            _ = i * 2  // 简单的计算任务
            done <- true
        }()
    }

    // 等待所有协程完成
    for i := 0; i < count; i++ {
        <-done
    }
    elapsed := time.Since(start)

    fmt.Printf("\n创建 %d 个协程总耗时: %v\n", count, elapsed) // 创建 100000 个协程总耗时: ~200ms
    fmt.Printf("平均每个协程创建成本: %v\n", elapsed/time.Duration(count)) // 平均每个协程创建成本: ~2000ns
}

26.1.2 M:N 调度模型

Go的调度器采用M:N模型:M个线程上调度N个Goroutine。

                         用户代码
                            ↓
    ┌───────────────────────────────────────────┐
    │              Goroutine (G)                │
    │   G1    G2    G3    G4    G5    G6        │
    │                                           │
    │  "我要干活!"  "我也要!"  "让我来!"      │
    └──────────────────┬────────────────────────┘
                       │
                       ↓
    ┌───────────────────────────────────────────┐
    │            Processor (P)                   │
    │         调度器,负责找活干                   │
    └──────────────────┬────────────────────────┘
                       │
    ┌──────────────────┼────────────────────────┐
    │                  │                        │
    ↓                  ↓                        ↓
 ┌──────┐        ┌──────┐                 ┌──────┐
 │  M1  │        │  M2  │                 │  M3  │   Machine (线程)
 │ OS线程│        │ OS线程│                 │ OS线程│
 └──────┘        └──────┘                 └──────┘

模型解释:

  • M(Machine):真正的OS线程
  • P(Processor):调度器,上有本地队列
  • G(Goroutine):任务,排队等调度

26.1.3 协程的生死

协程生命周期:

  • 创建go func() 启动
  • 运行:被调度器分配给P执行
  • 结束:函数返回或panic
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "fmt"
    "time"
)

// 永远运行的协程(模拟泄漏场景)
func leakyWorker(id int) {
    for {
        // 无限循环,每秒打印一次
        fmt.Printf("Worker %d: 我还在运行...\n", id)
        time.Sleep(time.Second)
    }
}

func main() {
    fmt.Println("主函数: 启动3个会永远运行的协程...")

    // 启动3个协程,它们永远不会结束
    for i := 1; i <= 3; i++ {
        go leakyWorker(i)
    }

    fmt.Println("主函数: 去喝杯咖啡...")
    time.Sleep(3 * time.Second)

    fmt.Println("主函数: 3秒后,程序仍在运行,协程还在跑!")
}

26.2 协程创建

26.2.1 go 关键字

go 关键字用于启动协程,语法:go 函数名(参数)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
    "fmt"
    "time"
)

// 普通函数,被协程调用
func say(msg string, times int) {
    for i := 0; i < times; i++ {
        fmt.Printf("[%s] 第%d次\n", msg, i+1)
        time.Sleep(10 * time.Millisecond)  // 模拟工作
    }
}

func main() {
    fmt.Println("主函数: 开始...")

    // 用go关键字启动协程
    // 注意:say函数还没执行完,主函数就继续往下走了
    go say("Hello", 3)  // 并发执行
    go say("World", 3)  // 另一个并发执行

    fmt.Println("主函数: 我不等了,先走一步...")
    time.Sleep(100 * time.Millisecond)  // 等待协程完成

    fmt.Println("主函数: 退出")
}

注意: 如果主函数退出太快,协程可能还没执行完!

26.2.2 匿名函数协程

协程也可以直接用匿名函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
    "fmt"
    "time"
)

func main() {
    // 匿名函数协程:带参数
    go func(name string) {
        for i := 0; i < 3; i++ {
            fmt.Printf("Hello from %s, round %d\n", name, i+1)
            time.Sleep(20 * time.Millisecond)
        }
    }("closure")

    // 匿名函数协程:捕获外部变量
    counter := 0
    go func() {
        for i := 0; i < 5; i++ {
            counter++  // 捕获counter变量
            time.Sleep(10 * time.Millisecond)
        }
    }()

    time.Sleep(100 * time.Millisecond)
    fmt.Printf("最终counter值: %d\n", counter) // 最终counter值: 5
}

26.3 协程调度

26.3.1 主动让出

runtime.Gosched() 让当前协程主动让出CPU,让其他协程有机会运行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "fmt"
    "runtime"
)

func cooperative() {
    for i := 0; i < 3; i++ {
        fmt.Printf("协程A: 我在做事 %d\n", i)
        runtime.Gosched()  // 主动说:我先休息一下让别人干
    }
}

func greedy() {
    for i := 0; i < 3; i++ {
        fmt.Printf("协程B: 我也在做事 %d\n", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)  // 只用1个CPU核心,更容易看到调度效果

    done := make(chan bool, 2)

    go func() {
        cooperative()
        done <- true
    }()

    go func() {
        greedy()
        done <- true
    }()

    <-done
    <-done
}

26.3.2 阻塞与调度

当协程阻塞时(如等待channel),调度器会自动切到其他协程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
    "fmt"
    "time"
)

func blockingTask(id int, ch chan int) {
    fmt.Printf("Task %d: 开始,等待数据...\n", id)
    result := <-ch  // 阻塞等待数据
    fmt.Printf("Task %d: 收到数据 %d\n", id, result)
}

func main() {
    ch := make(chan int)

    // 启动3个协程,它们都会阻塞在channel上
    for i := 1; i <= 3; i++ {
        go blockingTask(i, ch)
    }

    time.Sleep(100 * time.Millisecond)  // 等协程们都开始等待

    fmt.Println("主函数: 发送数据...")

    // 按顺序发送数据,调度器会自动唤醒对应协程
    for i := 1; i <= 3; i++ {
        ch <- i * 100
        time.Sleep(50 * time.Millisecond)
    }
}

26.4 协程同步

26.4.1 sync.WaitGroup

WaitGroup 用于等待一组协程完成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()  // 完成后通知WaitGroup
    fmt.Printf("Worker %d: 开始工作\n", id)
    time.Sleep(time.Duration(100*id) * time.Millisecond)  // 模拟工作
    fmt.Printf("Worker %d: 完成!\n", id)
}

func main() {
    var wg sync.WaitGroup  // 等待组

    fmt.Println("主函数: 启动3个Worker...")

    for i := 1; i <= 3; i++ {
        wg.Add(1)      // 注册一个协程
        go worker(i, &wg)
    }

    fmt.Println("主函数: 等待Worker们完成...")
    wg.Wait()          // 阻塞,直到所有注册的协程都调用Done

    fmt.Println("主函数: 所有Worker已完成!")

    // 主函数: 启动3个Worker...
    // 主函数: 等待Worker们完成...
    // Worker 1: 开始工作
    // Worker 2: 开始工作
    // Worker 3: 开始工作
    // Worker 1: 完成!
    // Worker 2: 完成!
    // Worker 3: 完成!
    // 主函数: 所有Worker已完成!
}

26.4.2 sync.Mutex(重要!)

多个协程同时修改共享数据会造成数据竞争。Mutex可以加锁保护:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "sync"
)

var (
    counter int  // 共享变量
    mu      sync.Mutex  // 互斥锁
)

// SafeIncrement 安全地增加计数器
func SafeIncrement() {
    mu.Lock()      // 加锁:其他协程要等这里释放才能进来
    defer mu.Unlock()
    counter++       // 访问共享数据
    // 函数结束自动解锁
}

func main() {
    var wg sync.WaitGroup

    fmt.Printf("初始counter: %d\n", counter)

    // 启动1000个协程同时修改counter
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            SafeIncrement()
        }()
    }

    wg.Wait()
    fmt.Printf("最终counter: %d (正确!)\n", counter)

    // 初始counter: 0
    // 最终counter: 1000 (正确!)
}

26.5 协程模式

26.5.1 工作池模式

预先创建一组Worker,从任务池取任务执行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    numWorkers = 3   // Worker数量
    numJobs    = 10  // 任务数量
)

// Worker 从jobs取任务,处理后发送到results
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()

    for job := range jobs {  // 从jobs取任务,nil时会阻塞
        fmt.Printf("Worker %d: 接到任务 %d\n", id, job)
        time.Sleep(50 * time.Millisecond)  // 模拟处理
        results <- job * 2                 // 发送结果
        fmt.Printf("Worker %d: 完成任务 %d -> %d\n", id, job, job*2)
    }
}

func main() {
    // 创建通道
    jobs := make(chan int, numJobs)    // 任务池
    results := make(chan int, numJobs)  // 结果池

    var wg sync.WaitGroup

    // 启动Worker们
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)  // 关闭任务通道,表示没有更多任务

    // 等待所有Worker完成,然后关闭结果通道
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    count := 0
    for result := range results {
        fmt.Printf("  主函数: 收到结果 %d\n", result)
        count++
    }

    fmt.Printf("\n共处理 %d 个任务\n", count)
}

26.5.2 扇出模式

一个生产者,多个消费者并行处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "fmt"
    "sync"
)

// 生产者:产生数据
func producer(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)  // 数据发完,关闭通道
    }()
    return out
}

// 消费者:处理数据
func processor(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range in {
        result := v * v  // 平方运算
        fmt.Printf("Processor %d: %d² = %d\n", id, v, result)
        out <- result
    }
}

func main() {
    data := []int{1, 2, 3, 4, 5}
    input := producer(data...)  // 创建生产者

    const numProcessors = 2     // 2个消费者
    results := make(chan int, len(data))

    var wg sync.WaitGroup

    // 启动消费者
    for i := 1; i <= numProcessors; i++ {
        wg.Add(1)
        go processor(i, input, results, &wg)
    }

    // 等待完成后关闭结果通道
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    sum := 0
    for r := range results {
        sum += r
    }
    fmt.Printf("\n平方和: %d\n", sum) // 1+4+9+16+25 = 55
}

26.6 协程与通道

协程和通道是好搭档:协程负责干活,通道负责传递数据:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "time"
)

// sender 协程:发送数据
func sender(ch chan<- string) {
    messages := []string{"Hello", "World", "Go", "Routines"}
    for _, msg := range messages {
        fmt.Printf("[发送者] 发送: '%s'\n", msg)
        ch <- msg  // 发送数据,阻塞直到接收者准备好
        time.Sleep(30 * time.Millisecond)
    }
    close(ch)  // 发送完毕,关闭通道
}

func main() {
    ch := make(chan string)  // 创建通道

    go sender(ch)  // 启动发送协程

    fmt.Println("[主函数] 开始接收...")

    // range channel:自动等待通道关闭
    for msg := range ch {
        fmt.Printf("[主函数] 收到: '%s'\n", msg)
    }

    fmt.Println("[主函数] 通道已关闭,接收完毕")

    // [主函数] 开始接收...
    // [发送者] 发送: 'Hello'
    // [主函数] 收到: 'Hello'
    // [发送者] 发送: 'World'
    // [主函数] 收到: 'World'
    // ...
    // [主函数] 通道已关闭,接收完毕
}

26.7 常见问题

26.7.1 主协程退出

问题: 如果主协程(main函数)退出,其他协程会被直接终止!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
    "fmt"
    "time"
)

func lazyWorker() {
    fmt.Println("Worker: 我刚开始...")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Worker: 我终于干完了!")
}

func main() {
    go lazyWorker()  // 启动协程

    fmt.Println("主函数: 我不等了,再见!")
    time.Sleep(10 * time.Millisecond)  // 只等10ms

    fmt.Println("主函数: 退出!")
    // 此时Worker还没执行完,但程序已经退出了
    // Worker的结果不会被看到
}

26.7.2 数据竞争

问题: 多个协程同时修改同一个变量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
    "fmt"
    "sync"
)

var counter int

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++  // 数据竞争!
        }()
    }

    wg.Wait()
    fmt.Printf("counter: %d (可能是978或1023等,而不是1000)\n", counter)
}

解决: 使用Mutex或atomic操作保护共享变量。


本章小结

本章我们学会了Goroutine:

核心概念:

  • Goroutine由Go运行时管理,初始栈仅2KB(vs线程1-2MB)
  • M:N调度模型:M个线程调度N个协程
  • GOMAXPROCS控制并发数,通常=CPU核心数

创建与同步:

  • go func() 创建协程
  • sync.WaitGroup 等待协程完成
  • sync.Mutex 保护共享资源

通道模式:

  • 工作池:预先创建Worker,消费任务池
  • 扇出:一个生产者,多个消费者并行处理

黄金法则:

主协程退出 = 所有协程强制终止!一定要等所有协程完成再退出。

最后修改 March 20, 2026: 新增Go基础部分 (8cce995)