发布于

Go语言并发编程实战:掌握Goroutine和Channel的强大威力

作者

Go语言并发编程实战:掌握Goroutine和Channel的强大威力

Go语言的并发模型是其最大的特色之一,通过goroutine和channel提供了简洁而强大的并发编程能力。本文将深入探讨Go并发编程的各个方面。

Go并发基础

项目结构和依赖

// go.mod
module concurrent-programming-demo

go 1.21

require (
    github.com/stretchr/testify v1.8.4
    golang.org/x/sync v0.3.0
    golang.org/x/time v0.3.0
    github.com/panjf2000/ants/v2 v2.8.1
    github.com/gammazero/workerpool v1.1.3
)

Goroutine基础使用

// concurrent/basics/goroutine.go - Goroutine基础示例
package basics

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// 基础Goroutine示例
func BasicGoroutineExample() {
    fmt.Println("=== 基础Goroutine示例 ===")
    
    // 启动多个goroutine
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d 开始执行\n", id)
            time.Sleep(time.Millisecond * time.Duration(100+id*50))
            fmt.Printf("Goroutine %d 执行完成\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("所有goroutine执行完成")
}

// 匿名goroutine vs 命名函数
func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func NamedFunctionGoroutine() {
    fmt.Println("\n=== 命名函数Goroutine示例 ===")
    
    var wg sync.WaitGroup
    numWorkers := 3
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有worker完成")
}

// Goroutine泄漏示例和解决方案
func GoroutineLeakExample() {
    fmt.Println("\n=== Goroutine泄漏示例 ===")
    
    // 错误示例:可能导致goroutine泄漏
    fmt.Printf("启动前goroutine数量: %d\n", runtime.NumGoroutine())
    
    done := make(chan bool)
    
    // 启动一个可能永远不会结束的goroutine
    go func() {
        select {
        case <-done:
            fmt.Println("Goroutine正常退出")
            return
        case <-time.After(time.Second * 10):
            fmt.Println("Goroutine超时退出")
            return
        }
    }()
    
    // 模拟一些工作
    time.Sleep(time.Millisecond * 100)
    
    // 正确的做法:确保goroutine能够退出
    close(done)
    time.Sleep(time.Millisecond * 100)
    
    fmt.Printf("清理后goroutine数量: %d\n", runtime.NumGoroutine())
}

// 控制goroutine数量
func ControlledGoroutines() {
    fmt.Println("\n=== 控制Goroutine数量示例 ===")
    
    maxGoroutines := 3
    semaphore := make(chan struct{}, maxGoroutines)
    var wg sync.WaitGroup
    
    tasks := []string{"任务1", "任务2", "任务3", "任务4", "任务5", "任务6"}
    
    for _, task := range tasks {
        wg.Add(1)
        go func(taskName string) {
            defer wg.Done()
            
            // 获取信号量
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            fmt.Printf("开始执行 %s\n", taskName)
            time.Sleep(time.Second)
            fmt.Printf("完成执行 %s\n", taskName)
        }(task)
    }
    
    wg.Wait()
    fmt.Println("所有任务完成")
}

Channel通信机制

// concurrent/basics/channel.go - Channel基础示例
package basics

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 基础Channel使用
func BasicChannelExample() {
    fmt.Println("=== 基础Channel示例 ===")
    
    // 无缓冲channel
    ch := make(chan string)
    
    go func() {
        time.Sleep(time.Second)
        ch <- "Hello from goroutine"
    }()
    
    message := <-ch
    fmt.Println("接收到消息:", message)
    
    // 缓冲channel
    bufferedCh := make(chan int, 3)
    bufferedCh <- 1
    bufferedCh <- 2
    bufferedCh <- 3
    
    fmt.Printf("缓冲channel长度: %d, 容量: %d\n", len(bufferedCh), cap(bufferedCh))
    
    for i := 0; i < 3; i++ {
        value := <-bufferedCh
        fmt.Printf("从缓冲channel接收: %d\n", value)
    }
}

// Channel方向性
func ChannelDirections() {
    fmt.Println("\n=== Channel方向性示例 ===")
    
    ch := make(chan string, 1)
    
    // 只发送channel
    go func(sendCh chan<- string) {
        sendCh <- "单向发送"
    }(ch)
    
    // 只接收channel
    go func(receiveCh <-chan string) {
        message := <-receiveCh
        fmt.Println("接收到:", message)
    }(ch)
    
    time.Sleep(time.Millisecond * 100)
}

// Select语句
func SelectExample() {
    fmt.Println("\n=== Select语句示例 ===")
    
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(time.Millisecond * 100)
        ch1 <- "来自channel 1"
    }()
    
    go func() {
        time.Sleep(time.Millisecond * 200)
        ch2 <- "来自channel 2"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("接收到:", msg1)
        case msg2 := <-ch2:
            fmt.Println("接收到:", msg2)
        case <-time.After(time.Millisecond * 300):
            fmt.Println("超时")
        }
    }
}

// 非阻塞Channel操作
func NonBlockingChannelOps() {
    fmt.Println("\n=== 非阻塞Channel操作 ===")
    
    messages := make(chan string)
    signals := make(chan bool)
    
    // 非阻塞接收
    select {
    case msg := <-messages:
        fmt.Println("接收到消息:", msg)
    default:
        fmt.Println("没有消息可接收")
    }
    
    // 非阻塞发送
    msg := "hello"
    select {
    case messages <- msg:
        fmt.Println("发送消息:", msg)
    default:
        fmt.Println("无法发送消息")
    }
    
    // 多路非阻塞select
    select {
    case msg := <-messages:
        fmt.Println("接收到消息:", msg)
    case sig := <-signals:
        fmt.Println("接收到信号:", sig)
    default:
        fmt.Println("没有活动")
    }
}

// Channel关闭和range
func ChannelCloseAndRange() {
    fmt.Println("\n=== Channel关闭和Range示例 ===")
    
    jobs := make(chan int, 5)
    done := make(chan bool)
    
    // 发送任务
    go func() {
        for j := 1; j <= 3; j++ {
            jobs <- j
            fmt.Printf("发送任务 %d\n", j)
        }
        close(jobs)
    }()
    
    // 接收任务
    go func() {
        for {
            j, more := <-jobs
            if more {
                fmt.Printf("接收到任务 %d\n", j)
                time.Sleep(time.Millisecond * 100)
            } else {
                fmt.Println("接收完所有任务")
                done <- true
                return
            }
        }
    }()
    
    <-done
    
    // 使用range遍历channel
    fmt.Println("\n使用range遍历channel:")
    numbers := make(chan int, 3)
    numbers <- 1
    numbers <- 2
    numbers <- 3
    close(numbers)
    
    for num := range numbers {
        fmt.Printf("数字: %d\n", num)
    }
}

高级并发模式

// concurrent/patterns/patterns.go - 并发模式
package patterns

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Worker Pool模式
type Job struct {
    ID     int
    Data   string
    Result chan string
}

type WorkerPool struct {
    WorkerCount int
    JobQueue    chan Job
    quit        chan bool
}

func NewWorkerPool(workerCount int, queueSize int) *WorkerPool {
    return &WorkerPool{
        WorkerCount: workerCount,
        JobQueue:    make(chan Job, queueSize),
        quit:        make(chan bool),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.WorkerCount; i++ {
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    for {
        select {
        case job := <-wp.JobQueue:
            fmt.Printf("Worker %d 开始处理任务 %d\n", id, job.ID)
            
            // 模拟工作
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
            result := fmt.Sprintf("任务 %d 处理完成,数据: %s", job.ID, job.Data)
            
            job.Result <- result
            fmt.Printf("Worker %d 完成任务 %d\n", id, job.ID)
            
        case <-wp.quit:
            fmt.Printf("Worker %d 退出\n", id)
            return
        }
    }
}

func (wp *WorkerPool) Submit(job Job) {
    wp.JobQueue <- job
}

func (wp *WorkerPool) Stop() {
    for i := 0; i < wp.WorkerCount; i++ {
        wp.quit <- true
    }
}

func WorkerPoolExample() {
    fmt.Println("=== Worker Pool模式示例 ===")
    
    pool := NewWorkerPool(3, 10)
    pool.Start()
    
    // 提交任务
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(jobID int) {
            defer wg.Done()
            
            job := Job{
                ID:     jobID,
                Data:   fmt.Sprintf("数据-%d", jobID),
                Result: make(chan string, 1),
            }
            
            pool.Submit(job)
            result := <-job.Result
            fmt.Printf("收到结果: %s\n", result)
        }(i)
    }
    
    wg.Wait()
    pool.Stop()
    time.Sleep(time.Millisecond * 100) // 等待worker退出
}

// Fan-out/Fan-in模式
func FanOutFanInExample() {
    fmt.Println("\n=== Fan-out/Fan-in模式示例 ===")
    
    // 输入数据
    input := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        input <- i
    }
    close(input)
    
    // Fan-out: 启动多个worker处理数据
    numWorkers := 3
    outputs := make([]<-chan int, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        output := make(chan int)
        outputs[i] = output
        
        go func(in <-chan int, out chan<- int, workerID int) {
            defer close(out)
            for n := range in {
                result := n * n // 计算平方
                fmt.Printf("Worker %d: %d^2 = %d\n", workerID, n, result)
                time.Sleep(time.Millisecond * 100) // 模拟处理时间
                out <- result
            }
        }(input, output, i)
    }
    
    // Fan-in: 合并所有worker的输出
    merged := fanIn(outputs...)
    
    // 收集结果
    var results []int
    for result := range merged {
        results = append(results, result)
    }
    
    fmt.Printf("所有结果: %v\n", results)
}

func fanIn(inputs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    wg.Add(len(inputs))
    for _, input := range inputs {
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                out <- n
            }
        }(input)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Pipeline模式
func PipelineExample() {
    fmt.Println("\n=== Pipeline模式示例 ===")
    
    // 阶段1: 生成数字
    numbers := generate(1, 2, 3, 4, 5)
    
    // 阶段2: 计算平方
    squares := square(numbers)
    
    // 阶段3: 过滤偶数
    evens := filterEven(squares)
    
    // 消费结果
    for result := range evens {
        fmt.Printf("最终结果: %d\n", result)
    }
}

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            result := n * n
            fmt.Printf("计算平方: %d^2 = %d\n", n, result)
            out <- result
        }
    }()
    return out
}

func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                fmt.Printf("过滤偶数: %d\n", n)
                out <- n
            }
        }
    }()
    return out
}

// Context取消模式
func ContextCancellationExample() {
    fmt.Println("\n=== Context取消模式示例 ===")
    
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
    defer cancel()
    
    result := make(chan string, 1)
    
    go func() {
        select {
        case <-time.After(time.Second * 3):
            result <- "工作完成"
        case <-ctx.Done():
            fmt.Printf("工作被取消: %v\n", ctx.Err())
            result <- "工作被取消"
        }
    }()
    
    select {
    case res := <-result:
        fmt.Printf("结果: %s\n", res)
    case <-ctx.Done():
        fmt.Printf("主程序超时: %v\n", ctx.Err())
    }
}

// 生产者-消费者模式
func ProducerConsumerExample() {
    fmt.Println("\n=== 生产者-消费者模式示例 ===")
    
    buffer := make(chan string, 5) // 缓冲区
    var wg sync.WaitGroup
    
    // 启动生产者
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(buffer)
        
        for i := 1; i <= 10; i++ {
            item := fmt.Sprintf("商品-%d", i)
            buffer <- item
            fmt.Printf("生产: %s\n", item)
            time.Sleep(time.Millisecond * 100)
        }
        fmt.Println("生产者完成")
    }()
    
    // 启动多个消费者
    numConsumers := 2
    for i := 0; i < numConsumers; i++ {
        wg.Add(1)
        go func(consumerID int) {
            defer wg.Done()
            
            for item := range buffer {
                fmt.Printf("消费者 %d 消费: %s\n", consumerID, item)
                time.Sleep(time.Millisecond * 200)
            }
            fmt.Printf("消费者 %d 完成\n", consumerID)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("所有生产者和消费者完成")
}

并发安全和同步

// concurrent/sync/synchronization.go - 同步机制
package sync

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// Mutex示例
type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func MutexExample() {
    fmt.Println("=== Mutex示例 ===")
    
    counter := &SafeCounter{}
    var wg sync.WaitGroup
    
    // 启动多个goroutine并发增加计数器
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc()
        }()
    }
    
    wg.Wait()
    fmt.Printf("最终计数: %d\n", counter.Value())
}

// RWMutex示例
type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        data: make(map[string]int),
    }
}

func (sm *SafeMap) Set(key string, value int) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.data[key] = value
}

func (sm *SafeMap) Get(key string) (int, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    value, ok := sm.data[key]
    return value, ok
}

func (sm *SafeMap) Len() int {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    return len(sm.data)
}

func RWMutexExample() {
    fmt.Println("\n=== RWMutex示例 ===")
    
    safeMap := NewSafeMap()
    var wg sync.WaitGroup
    
    // 写操作
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", i)
            safeMap.Set(key, i*10)
            fmt.Printf("写入: %s = %d\n", key, i*10)
        }(i)
    }
    
    // 读操作
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", i%10)
            if value, ok := safeMap.Get(key); ok {
                fmt.Printf("读取: %s = %d\n", key, value)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Map大小: %d\n", safeMap.Len())
}

// 原子操作示例
func AtomicExample() {
    fmt.Println("\n=== 原子操作示例 ===")
    
    var counter int64
    var wg sync.WaitGroup
    
    // 使用原子操作进行并发计数
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    
    wg.Wait()
    fmt.Printf("原子计数器最终值: %d\n", atomic.LoadInt64(&counter))
    
    // 原子操作的其他用法
    var flag int32
    
    // 设置标志
    atomic.StoreInt32(&flag, 1)
    fmt.Printf("标志值: %d\n", atomic.LoadInt32(&flag))
    
    // 比较并交换
    swapped := atomic.CompareAndSwapInt32(&flag, 1, 2)
    fmt.Printf("交换成功: %t, 新值: %d\n", swapped, atomic.LoadInt32(&flag))
}

// Once示例
func OnceExample() {
    fmt.Println("\n=== Once示例 ===")
    
    var once sync.Once
    var initialized bool
    
    initialize := func() {
        fmt.Println("执行初始化...")
        time.Sleep(time.Millisecond * 100)
        initialized = true
        fmt.Println("初始化完成")
    }
    
    var wg sync.WaitGroup
    
    // 多个goroutine尝试初始化
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d 尝试初始化\n", id)
            once.Do(initialize)
            fmt.Printf("Goroutine %d 看到初始化状态: %t\n", id, initialized)
        }(i)
    }
    
    wg.Wait()
}

// Cond示例
func CondExample() {
    fmt.Println("\n=== Cond示例 ===")
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var ready bool
    
    // 等待者
    for i := 0; i < 3; i++ {
        go func(id int) {
            mu.Lock()
            defer mu.Unlock()
            
            for !ready {
                fmt.Printf("等待者 %d 开始等待\n", id)
                cond.Wait()
            }
            fmt.Printf("等待者 %d 被唤醒\n", id)
        }(i)
    }
    
    // 等待一段时间后发送信号
    time.Sleep(time.Millisecond * 100)
    
    mu.Lock()
    ready = true
    fmt.Println("条件已满足,广播信号")
    cond.Broadcast()
    mu.Unlock()
    
    time.Sleep(time.Millisecond * 100)
}

实际应用示例

// main.go - 完整的并发应用示例
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"

    "concurrent-programming-demo/concurrent/basics"
    "concurrent-programming-demo/concurrent/patterns"
    syncpkg "concurrent-programming-demo/concurrent/sync"
)

// 并发HTTP客户端示例
type HTTPClient struct {
    client  *http.Client
    timeout time.Duration
}

func NewHTTPClient(timeout time.Duration) *HTTPClient {
    return &HTTPClient{
        client: &http.Client{
            Timeout: timeout,
        },
        timeout: timeout,
    }
}

func (c *HTTPClient) FetchURLs(urls []string) map[string]string {
    results := make(map[string]string)
    var mu sync.Mutex
    var wg sync.WaitGroup

    // 限制并发数
    semaphore := make(chan struct{}, 5)

    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()

            // 获取信号量
            semaphore <- struct{}{}
            defer func() { <-semaphore }()

            resp, err := c.client.Get(u)
            if err != nil {
                mu.Lock()
                results[u] = fmt.Sprintf("Error: %v", err)
                mu.Unlock()
                return
            }
            defer resp.Body.Close()

            mu.Lock()
            results[u] = fmt.Sprintf("Status: %s", resp.Status)
            mu.Unlock()
        }(url)
    }

    wg.Wait()
    return results
}

// 并发数据处理示例
type DataProcessor struct {
    workerCount int
}

func NewDataProcessor(workerCount int) *DataProcessor {
    return &DataProcessor{workerCount: workerCount}
}

func (dp *DataProcessor) ProcessData(data []int) []int {
    input := make(chan int, len(data))
    output := make(chan int, len(data))

    // 发送数据到输入channel
    go func() {
        defer close(input)
        for _, item := range data {
            input <- item
        }
    }()

    // 启动worker处理数据
    var wg sync.WaitGroup
    for i := 0; i < dp.workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range input {
                // 模拟复杂计算
                result := item * item + item
                output <- result
            }
        }()
    }

    // 关闭输出channel
    go func() {
        wg.Wait()
        close(output)
    }()

    // 收集结果
    var results []int
    for result := range output {
        results = append(results, result)
    }

    return results
}

// 实时数据流处理
type StreamProcessor struct {
    input  chan interface{}
    output chan interface{}
    ctx    context.Context
    cancel context.CancelFunc
}

func NewStreamProcessor() *StreamProcessor {
    ctx, cancel := context.WithCancel(context.Background())
    return &StreamProcessor{
        input:  make(chan interface{}, 100),
        output: make(chan interface{}, 100),
        ctx:    ctx,
        cancel: cancel,
    }
}

func (sp *StreamProcessor) Start() {
    go sp.process()
}

func (sp *StreamProcessor) process() {
    for {
        select {
        case data := <-sp.input:
            // 处理数据
            processed := sp.processItem(data)

            select {
            case sp.output <- processed:
            case <-sp.ctx.Done():
                return
            }

        case <-sp.ctx.Done():
            return
        }
    }
}

func (sp *StreamProcessor) processItem(data interface{}) interface{} {
    // 模拟数据处理
    time.Sleep(time.Millisecond * 10)
    return fmt.Sprintf("Processed: %v", data)
}

func (sp *StreamProcessor) Send(data interface{}) {
    select {
    case sp.input <- data:
    case <-sp.ctx.Done():
    }
}

func (sp *StreamProcessor) Receive() interface{} {
    select {
    case data := <-sp.output:
        return data
    case <-sp.ctx.Done():
        return nil
    }
}

func (sp *StreamProcessor) Stop() {
    sp.cancel()
}

// 并发缓存示例
type ConcurrentCache struct {
    mu    sync.RWMutex
    cache map[string]interface{}
    ttl   map[string]time.Time
}

func NewConcurrentCache() *ConcurrentCache {
    cache := &ConcurrentCache{
        cache: make(map[string]interface{}),
        ttl:   make(map[string]time.Time),
    }

    // 启动清理goroutine
    go cache.cleanup()

    return cache
}

func (c *ConcurrentCache) Set(key string, value interface{}, duration time.Duration) {
    c.mu.Lock()
    defer c.mu.Unlock()

    c.cache[key] = value
    c.ttl[key] = time.Now().Add(duration)
}

func (c *ConcurrentCache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    value, exists := c.cache[key]
    if !exists {
        return nil, false
    }

    // 检查是否过期
    if expiry, exists := c.ttl[key]; exists && time.Now().After(expiry) {
        return nil, false
    }

    return value, true
}

func (c *ConcurrentCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    delete(c.cache, key)
    delete(c.ttl, key)
}

func (c *ConcurrentCache) cleanup() {
    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        c.mu.Lock()
        now := time.Now()

        for key, expiry := range c.ttl {
            if now.After(expiry) {
                delete(c.cache, key)
                delete(c.ttl, key)
            }
        }
        c.mu.Unlock()
    }
}

func main() {
    fmt.Println("Go并发编程实战示例")
    fmt.Println("==================")

    // 基础示例
    basics.BasicGoroutineExample()
    basics.NamedFunctionGoroutine()
    basics.GoroutineLeakExample()
    basics.ControlledGoroutines()

    basics.BasicChannelExample()
    basics.ChannelDirections()
    basics.SelectExample()
    basics.NonBlockingChannelOps()
    basics.ChannelCloseAndRange()

    // 并发模式示例
    patterns.WorkerPoolExample()
    patterns.FanOutFanInExample()
    patterns.PipelineExample()
    patterns.ContextCancellationExample()
    patterns.ProducerConsumerExample()

    // 同步机制示例
    syncpkg.MutexExample()
    syncpkg.RWMutexExample()
    syncpkg.AtomicExample()
    syncpkg.OnceExample()
    syncpkg.CondExample()

    // 实际应用示例
    fmt.Println("\n=== 实际应用示例 ===")

    // HTTP客户端示例
    fmt.Println("\n--- 并发HTTP请求 ---")
    httpClient := NewHTTPClient(time.Second * 5)
    urls := []string{
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
    }

    start := time.Now()
    results := httpClient.FetchURLs(urls)
    duration := time.Since(start)

    fmt.Printf("并发请求完成,耗时: %v\n", duration)
    for url, result := range results {
        fmt.Printf("  %s: %s\n", url, result)
    }

    // 数据处理示例
    fmt.Println("\n--- 并发数据处理 ---")
    processor := NewDataProcessor(3)
    data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

    start = time.Now()
    processed := processor.ProcessData(data)
    duration = time.Since(start)

    fmt.Printf("数据处理完成,耗时: %v\n", duration)
    fmt.Printf("原始数据: %v\n", data)
    fmt.Printf("处理结果: %v\n", processed)

    // 流处理示例
    fmt.Println("\n--- 实时流处理 ---")
    streamProcessor := NewStreamProcessor()
    streamProcessor.Start()

    // 发送数据
    go func() {
        for i := 0; i < 5; i++ {
            streamProcessor.Send(fmt.Sprintf("数据-%d", i))
            time.Sleep(time.Millisecond * 50)
        }
    }()

    // 接收处理结果
    for i := 0; i < 5; i++ {
        result := streamProcessor.Receive()
        if result != nil {
            fmt.Printf("流处理结果: %v\n", result)
        }
    }

    streamProcessor.Stop()

    // 并发缓存示例
    fmt.Println("\n--- 并发缓存 ---")
    cache := NewConcurrentCache()

    // 设置缓存
    cache.Set("key1", "value1", time.Second*2)
    cache.Set("key2", "value2", time.Second*1)

    // 读取缓存
    if value, ok := cache.Get("key1"); ok {
        fmt.Printf("缓存命中: key1 = %v\n", value)
    }

    // 等待过期
    time.Sleep(time.Second * 1.5)

    if value, ok := cache.Get("key2"); ok {
        fmt.Printf("缓存命中: key2 = %v\n", value)
    } else {
        fmt.Println("key2 已过期")
    }

    if value, ok := cache.Get("key1"); ok {
        fmt.Printf("缓存命中: key1 = %v\n", value)
    }

    fmt.Println("\n所有示例执行完成!")
}

性能测试和基准测试

// concurrent/benchmark/benchmark_test.go - 性能测试
package benchmark

import (
    "sync"
    "sync/atomic"
    "testing"
)

// 测试不同并发方式的性能
func BenchmarkMutexIncrement(b *testing.B) {
    var mu sync.Mutex
    var counter int

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            counter++
            mu.Unlock()
        }
    })
}

func BenchmarkAtomicIncrement(b *testing.B) {
    var counter int64

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            atomic.AddInt64(&counter, 1)
        }
    })
}

func BenchmarkChannelCommunication(b *testing.B) {
    ch := make(chan int, 1000)

    go func() {
        for i := 0; i < b.N; i++ {
            ch <- i
        }
        close(ch)
    }()

    b.ResetTimer()
    for range ch {
        // 消费数据
    }
}

func BenchmarkBufferedChannel(b *testing.B) {
    ch := make(chan int, 1000)

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            select {
            case ch <- 1:
            default:
            }
        }
    })
}

func BenchmarkUnbufferedChannel(b *testing.B) {
    ch := make(chan int)

    go func() {
        for i := 0; i < b.N; i++ {
            <-ch
        }
    }()

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        ch <- i
    }
}

总结

Go语言并发编程的核心要点:

🎯 并发基础

  1. Goroutine:轻量级线程,高效并发执行
  2. Channel:类型安全的通信机制
  3. Select:多路复用通信操作
  4. 同步原语:Mutex、RWMutex、Once等

✅ 并发模式

  • Worker Pool:任务分发和处理
  • Fan-out/Fan-in:并行处理和结果合并
  • Pipeline:流水线处理
  • Producer-Consumer:生产者消费者

🚀 最佳实践

  • 避免goroutine泄漏
  • 合理使用缓冲channel
  • 选择合适的同步机制
  • Context控制生命周期

💡 性能优化

  • 控制goroutine数量
  • 使用原子操作
  • 减少锁竞争
  • 合理设计并发架构

🔧 实际应用

  • 并发HTTP客户端
  • 实时数据流处理
  • 并发缓存系统
  • 高性能服务器

掌握Go并发编程,构建高性能应用!


Go语言的并发模型简洁而强大,通过goroutine和channel提供了优雅的并发编程体验,是构建高性能服务的理想选择。