- 发布于
Go语言并发编程实战:掌握Goroutine和Channel的强大威力
- 作者

- 姓名
- 全能波
- GitHub
- @weicracker
Go语言并发编程实战:掌握Goroutine和Channel的强大威力
Go语言的并发模型是其最大的特色之一,通过goroutine和channel提供了简洁而强大的并发编程能力。本文将深入探讨Go并发编程的各个方面。
Go并发基础
项目结构和依赖
// go.mod
module concurrent-programming-demo
go 1.21
require (
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.3.0
golang.org/x/time v0.3.0
github.com/panjf2000/ants/v2 v2.8.1
github.com/gammazero/workerpool v1.1.3
)
Goroutine基础使用
// concurrent/basics/goroutine.go - Goroutine基础示例
package basics
import (
"fmt"
"runtime"
"sync"
"time"
)
// 基础Goroutine示例
func BasicGoroutineExample() {
fmt.Println("=== 基础Goroutine示例 ===")
// 启动多个goroutine
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d 开始执行\n", id)
time.Sleep(time.Millisecond * time.Duration(100+id*50))
fmt.Printf("Goroutine %d 执行完成\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有goroutine执行完成")
}
// 匿名goroutine vs 命名函数
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
}
func NamedFunctionGoroutine() {
fmt.Println("\n=== 命名函数Goroutine示例 ===")
var wg sync.WaitGroup
numWorkers := 3
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有worker完成")
}
// Goroutine泄漏示例和解决方案
func GoroutineLeakExample() {
fmt.Println("\n=== Goroutine泄漏示例 ===")
// 错误示例:可能导致goroutine泄漏
fmt.Printf("启动前goroutine数量: %d\n", runtime.NumGoroutine())
done := make(chan bool)
// 启动一个可能永远不会结束的goroutine
go func() {
select {
case <-done:
fmt.Println("Goroutine正常退出")
return
case <-time.After(time.Second * 10):
fmt.Println("Goroutine超时退出")
return
}
}()
// 模拟一些工作
time.Sleep(time.Millisecond * 100)
// 正确的做法:确保goroutine能够退出
close(done)
time.Sleep(time.Millisecond * 100)
fmt.Printf("清理后goroutine数量: %d\n", runtime.NumGoroutine())
}
// 控制goroutine数量
func ControlledGoroutines() {
fmt.Println("\n=== 控制Goroutine数量示例 ===")
maxGoroutines := 3
semaphore := make(chan struct{}, maxGoroutines)
var wg sync.WaitGroup
tasks := []string{"任务1", "任务2", "任务3", "任务4", "任务5", "任务6"}
for _, task := range tasks {
wg.Add(1)
go func(taskName string) {
defer wg.Done()
// 获取信号量
semaphore <- struct{}{}
defer func() { <-semaphore }()
fmt.Printf("开始执行 %s\n", taskName)
time.Sleep(time.Second)
fmt.Printf("完成执行 %s\n", taskName)
}(task)
}
wg.Wait()
fmt.Println("所有任务完成")
}
Channel通信机制
// concurrent/basics/channel.go - Channel基础示例
package basics
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 基础Channel使用
func BasicChannelExample() {
fmt.Println("=== 基础Channel示例 ===")
// 无缓冲channel
ch := make(chan string)
go func() {
time.Sleep(time.Second)
ch <- "Hello from goroutine"
}()
message := <-ch
fmt.Println("接收到消息:", message)
// 缓冲channel
bufferedCh := make(chan int, 3)
bufferedCh <- 1
bufferedCh <- 2
bufferedCh <- 3
fmt.Printf("缓冲channel长度: %d, 容量: %d\n", len(bufferedCh), cap(bufferedCh))
for i := 0; i < 3; i++ {
value := <-bufferedCh
fmt.Printf("从缓冲channel接收: %d\n", value)
}
}
// Channel方向性
func ChannelDirections() {
fmt.Println("\n=== Channel方向性示例 ===")
ch := make(chan string, 1)
// 只发送channel
go func(sendCh chan<- string) {
sendCh <- "单向发送"
}(ch)
// 只接收channel
go func(receiveCh <-chan string) {
message := <-receiveCh
fmt.Println("接收到:", message)
}(ch)
time.Sleep(time.Millisecond * 100)
}
// Select语句
func SelectExample() {
fmt.Println("\n=== Select语句示例 ===")
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Millisecond * 100)
ch1 <- "来自channel 1"
}()
go func() {
time.Sleep(time.Millisecond * 200)
ch2 <- "来自channel 2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("接收到:", msg1)
case msg2 := <-ch2:
fmt.Println("接收到:", msg2)
case <-time.After(time.Millisecond * 300):
fmt.Println("超时")
}
}
}
// 非阻塞Channel操作
func NonBlockingChannelOps() {
fmt.Println("\n=== 非阻塞Channel操作 ===")
messages := make(chan string)
signals := make(chan bool)
// 非阻塞接收
select {
case msg := <-messages:
fmt.Println("接收到消息:", msg)
default:
fmt.Println("没有消息可接收")
}
// 非阻塞发送
msg := "hello"
select {
case messages <- msg:
fmt.Println("发送消息:", msg)
default:
fmt.Println("无法发送消息")
}
// 多路非阻塞select
select {
case msg := <-messages:
fmt.Println("接收到消息:", msg)
case sig := <-signals:
fmt.Println("接收到信号:", sig)
default:
fmt.Println("没有活动")
}
}
// Channel关闭和range
func ChannelCloseAndRange() {
fmt.Println("\n=== Channel关闭和Range示例 ===")
jobs := make(chan int, 5)
done := make(chan bool)
// 发送任务
go func() {
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Printf("发送任务 %d\n", j)
}
close(jobs)
}()
// 接收任务
go func() {
for {
j, more := <-jobs
if more {
fmt.Printf("接收到任务 %d\n", j)
time.Sleep(time.Millisecond * 100)
} else {
fmt.Println("接收完所有任务")
done <- true
return
}
}
}()
<-done
// 使用range遍历channel
fmt.Println("\n使用range遍历channel:")
numbers := make(chan int, 3)
numbers <- 1
numbers <- 2
numbers <- 3
close(numbers)
for num := range numbers {
fmt.Printf("数字: %d\n", num)
}
}
高级并发模式
// concurrent/patterns/patterns.go - 并发模式
package patterns
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Worker Pool模式
type Job struct {
ID int
Data string
Result chan string
}
type WorkerPool struct {
WorkerCount int
JobQueue chan Job
quit chan bool
}
func NewWorkerPool(workerCount int, queueSize int) *WorkerPool {
return &WorkerPool{
WorkerCount: workerCount,
JobQueue: make(chan Job, queueSize),
quit: make(chan bool),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.WorkerCount; i++ {
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
for {
select {
case job := <-wp.JobQueue:
fmt.Printf("Worker %d 开始处理任务 %d\n", id, job.ID)
// 模拟工作
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
result := fmt.Sprintf("任务 %d 处理完成,数据: %s", job.ID, job.Data)
job.Result <- result
fmt.Printf("Worker %d 完成任务 %d\n", id, job.ID)
case <-wp.quit:
fmt.Printf("Worker %d 退出\n", id)
return
}
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.JobQueue <- job
}
func (wp *WorkerPool) Stop() {
for i := 0; i < wp.WorkerCount; i++ {
wp.quit <- true
}
}
func WorkerPoolExample() {
fmt.Println("=== Worker Pool模式示例 ===")
pool := NewWorkerPool(3, 10)
pool.Start()
// 提交任务
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(jobID int) {
defer wg.Done()
job := Job{
ID: jobID,
Data: fmt.Sprintf("数据-%d", jobID),
Result: make(chan string, 1),
}
pool.Submit(job)
result := <-job.Result
fmt.Printf("收到结果: %s\n", result)
}(i)
}
wg.Wait()
pool.Stop()
time.Sleep(time.Millisecond * 100) // 等待worker退出
}
// Fan-out/Fan-in模式
func FanOutFanInExample() {
fmt.Println("\n=== Fan-out/Fan-in模式示例 ===")
// 输入数据
input := make(chan int, 10)
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
// Fan-out: 启动多个worker处理数据
numWorkers := 3
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
outputs[i] = output
go func(in <-chan int, out chan<- int, workerID int) {
defer close(out)
for n := range in {
result := n * n // 计算平方
fmt.Printf("Worker %d: %d^2 = %d\n", workerID, n, result)
time.Sleep(time.Millisecond * 100) // 模拟处理时间
out <- result
}
}(input, output, i)
}
// Fan-in: 合并所有worker的输出
merged := fanIn(outputs...)
// 收集结果
var results []int
for result := range merged {
results = append(results, result)
}
fmt.Printf("所有结果: %v\n", results)
}
func fanIn(inputs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, input := range inputs {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(input)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Pipeline模式
func PipelineExample() {
fmt.Println("\n=== Pipeline模式示例 ===")
// 阶段1: 生成数字
numbers := generate(1, 2, 3, 4, 5)
// 阶段2: 计算平方
squares := square(numbers)
// 阶段3: 过滤偶数
evens := filterEven(squares)
// 消费结果
for result := range evens {
fmt.Printf("最终结果: %d\n", result)
}
}
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
result := n * n
fmt.Printf("计算平方: %d^2 = %d\n", n, result)
out <- result
}
}()
return out
}
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
fmt.Printf("过滤偶数: %d\n", n)
out <- n
}
}
}()
return out
}
// Context取消模式
func ContextCancellationExample() {
fmt.Println("\n=== Context取消模式示例 ===")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
result := make(chan string, 1)
go func() {
select {
case <-time.After(time.Second * 3):
result <- "工作完成"
case <-ctx.Done():
fmt.Printf("工作被取消: %v\n", ctx.Err())
result <- "工作被取消"
}
}()
select {
case res := <-result:
fmt.Printf("结果: %s\n", res)
case <-ctx.Done():
fmt.Printf("主程序超时: %v\n", ctx.Err())
}
}
// 生产者-消费者模式
func ProducerConsumerExample() {
fmt.Println("\n=== 生产者-消费者模式示例 ===")
buffer := make(chan string, 5) // 缓冲区
var wg sync.WaitGroup
// 启动生产者
wg.Add(1)
go func() {
defer wg.Done()
defer close(buffer)
for i := 1; i <= 10; i++ {
item := fmt.Sprintf("商品-%d", i)
buffer <- item
fmt.Printf("生产: %s\n", item)
time.Sleep(time.Millisecond * 100)
}
fmt.Println("生产者完成")
}()
// 启动多个消费者
numConsumers := 2
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go func(consumerID int) {
defer wg.Done()
for item := range buffer {
fmt.Printf("消费者 %d 消费: %s\n", consumerID, item)
time.Sleep(time.Millisecond * 200)
}
fmt.Printf("消费者 %d 完成\n", consumerID)
}(i)
}
wg.Wait()
fmt.Println("所有生产者和消费者完成")
}
并发安全和同步
// concurrent/sync/synchronization.go - 同步机制
package sync
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// Mutex示例
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func MutexExample() {
fmt.Println("=== Mutex示例 ===")
counter := &SafeCounter{}
var wg sync.WaitGroup
// 启动多个goroutine并发增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc()
}()
}
wg.Wait()
fmt.Printf("最终计数: %d\n", counter.Value())
}
// RWMutex示例
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func NewSafeMap() *SafeMap {
return &SafeMap{
data: make(map[string]int),
}
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.data[key] = value
}
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
value, ok := sm.data[key]
return value, ok
}
func (sm *SafeMap) Len() int {
sm.mu.RLock()
defer sm.mu.RUnlock()
return len(sm.data)
}
func RWMutexExample() {
fmt.Println("\n=== RWMutex示例 ===")
safeMap := NewSafeMap()
var wg sync.WaitGroup
// 写操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", i)
safeMap.Set(key, i*10)
fmt.Printf("写入: %s = %d\n", key, i*10)
}(i)
}
// 读操作
for i := 0; i < 20; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", i%10)
if value, ok := safeMap.Get(key); ok {
fmt.Printf("读取: %s = %d\n", key, value)
}
}(i)
}
wg.Wait()
fmt.Printf("Map大小: %d\n", safeMap.Len())
}
// 原子操作示例
func AtomicExample() {
fmt.Println("\n=== 原子操作示例 ===")
var counter int64
var wg sync.WaitGroup
// 使用原子操作进行并发计数
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Printf("原子计数器最终值: %d\n", atomic.LoadInt64(&counter))
// 原子操作的其他用法
var flag int32
// 设置标志
atomic.StoreInt32(&flag, 1)
fmt.Printf("标志值: %d\n", atomic.LoadInt32(&flag))
// 比较并交换
swapped := atomic.CompareAndSwapInt32(&flag, 1, 2)
fmt.Printf("交换成功: %t, 新值: %d\n", swapped, atomic.LoadInt32(&flag))
}
// Once示例
func OnceExample() {
fmt.Println("\n=== Once示例 ===")
var once sync.Once
var initialized bool
initialize := func() {
fmt.Println("执行初始化...")
time.Sleep(time.Millisecond * 100)
initialized = true
fmt.Println("初始化完成")
}
var wg sync.WaitGroup
// 多个goroutine尝试初始化
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d 尝试初始化\n", id)
once.Do(initialize)
fmt.Printf("Goroutine %d 看到初始化状态: %t\n", id, initialized)
}(i)
}
wg.Wait()
}
// Cond示例
func CondExample() {
fmt.Println("\n=== Cond示例 ===")
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
// 等待者
for i := 0; i < 3; i++ {
go func(id int) {
mu.Lock()
defer mu.Unlock()
for !ready {
fmt.Printf("等待者 %d 开始等待\n", id)
cond.Wait()
}
fmt.Printf("等待者 %d 被唤醒\n", id)
}(i)
}
// 等待一段时间后发送信号
time.Sleep(time.Millisecond * 100)
mu.Lock()
ready = true
fmt.Println("条件已满足,广播信号")
cond.Broadcast()
mu.Unlock()
time.Sleep(time.Millisecond * 100)
}
实际应用示例
// main.go - 完整的并发应用示例
package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
"concurrent-programming-demo/concurrent/basics"
"concurrent-programming-demo/concurrent/patterns"
syncpkg "concurrent-programming-demo/concurrent/sync"
)
// 并发HTTP客户端示例
type HTTPClient struct {
client *http.Client
timeout time.Duration
}
func NewHTTPClient(timeout time.Duration) *HTTPClient {
return &HTTPClient{
client: &http.Client{
Timeout: timeout,
},
timeout: timeout,
}
}
func (c *HTTPClient) FetchURLs(urls []string) map[string]string {
results := make(map[string]string)
var mu sync.Mutex
var wg sync.WaitGroup
// 限制并发数
semaphore := make(chan struct{}, 5)
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
// 获取信号量
semaphore <- struct{}{}
defer func() { <-semaphore }()
resp, err := c.client.Get(u)
if err != nil {
mu.Lock()
results[u] = fmt.Sprintf("Error: %v", err)
mu.Unlock()
return
}
defer resp.Body.Close()
mu.Lock()
results[u] = fmt.Sprintf("Status: %s", resp.Status)
mu.Unlock()
}(url)
}
wg.Wait()
return results
}
// 并发数据处理示例
type DataProcessor struct {
workerCount int
}
func NewDataProcessor(workerCount int) *DataProcessor {
return &DataProcessor{workerCount: workerCount}
}
func (dp *DataProcessor) ProcessData(data []int) []int {
input := make(chan int, len(data))
output := make(chan int, len(data))
// 发送数据到输入channel
go func() {
defer close(input)
for _, item := range data {
input <- item
}
}()
// 启动worker处理数据
var wg sync.WaitGroup
for i := 0; i < dp.workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range input {
// 模拟复杂计算
result := item * item + item
output <- result
}
}()
}
// 关闭输出channel
go func() {
wg.Wait()
close(output)
}()
// 收集结果
var results []int
for result := range output {
results = append(results, result)
}
return results
}
// 实时数据流处理
type StreamProcessor struct {
input chan interface{}
output chan interface{}
ctx context.Context
cancel context.CancelFunc
}
func NewStreamProcessor() *StreamProcessor {
ctx, cancel := context.WithCancel(context.Background())
return &StreamProcessor{
input: make(chan interface{}, 100),
output: make(chan interface{}, 100),
ctx: ctx,
cancel: cancel,
}
}
func (sp *StreamProcessor) Start() {
go sp.process()
}
func (sp *StreamProcessor) process() {
for {
select {
case data := <-sp.input:
// 处理数据
processed := sp.processItem(data)
select {
case sp.output <- processed:
case <-sp.ctx.Done():
return
}
case <-sp.ctx.Done():
return
}
}
}
func (sp *StreamProcessor) processItem(data interface{}) interface{} {
// 模拟数据处理
time.Sleep(time.Millisecond * 10)
return fmt.Sprintf("Processed: %v", data)
}
func (sp *StreamProcessor) Send(data interface{}) {
select {
case sp.input <- data:
case <-sp.ctx.Done():
}
}
func (sp *StreamProcessor) Receive() interface{} {
select {
case data := <-sp.output:
return data
case <-sp.ctx.Done():
return nil
}
}
func (sp *StreamProcessor) Stop() {
sp.cancel()
}
// 并发缓存示例
type ConcurrentCache struct {
mu sync.RWMutex
cache map[string]interface{}
ttl map[string]time.Time
}
func NewConcurrentCache() *ConcurrentCache {
cache := &ConcurrentCache{
cache: make(map[string]interface{}),
ttl: make(map[string]time.Time),
}
// 启动清理goroutine
go cache.cleanup()
return cache
}
func (c *ConcurrentCache) Set(key string, value interface{}, duration time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.cache[key] = value
c.ttl[key] = time.Now().Add(duration)
}
func (c *ConcurrentCache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, exists := c.cache[key]
if !exists {
return nil, false
}
// 检查是否过期
if expiry, exists := c.ttl[key]; exists && time.Now().After(expiry) {
return nil, false
}
return value, true
}
func (c *ConcurrentCache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cache, key)
delete(c.ttl, key)
}
func (c *ConcurrentCache) cleanup() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
now := time.Now()
for key, expiry := range c.ttl {
if now.After(expiry) {
delete(c.cache, key)
delete(c.ttl, key)
}
}
c.mu.Unlock()
}
}
func main() {
fmt.Println("Go并发编程实战示例")
fmt.Println("==================")
// 基础示例
basics.BasicGoroutineExample()
basics.NamedFunctionGoroutine()
basics.GoroutineLeakExample()
basics.ControlledGoroutines()
basics.BasicChannelExample()
basics.ChannelDirections()
basics.SelectExample()
basics.NonBlockingChannelOps()
basics.ChannelCloseAndRange()
// 并发模式示例
patterns.WorkerPoolExample()
patterns.FanOutFanInExample()
patterns.PipelineExample()
patterns.ContextCancellationExample()
patterns.ProducerConsumerExample()
// 同步机制示例
syncpkg.MutexExample()
syncpkg.RWMutexExample()
syncpkg.AtomicExample()
syncpkg.OnceExample()
syncpkg.CondExample()
// 实际应用示例
fmt.Println("\n=== 实际应用示例 ===")
// HTTP客户端示例
fmt.Println("\n--- 并发HTTP请求 ---")
httpClient := NewHTTPClient(time.Second * 5)
urls := []string{
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
}
start := time.Now()
results := httpClient.FetchURLs(urls)
duration := time.Since(start)
fmt.Printf("并发请求完成,耗时: %v\n", duration)
for url, result := range results {
fmt.Printf(" %s: %s\n", url, result)
}
// 数据处理示例
fmt.Println("\n--- 并发数据处理 ---")
processor := NewDataProcessor(3)
data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
start = time.Now()
processed := processor.ProcessData(data)
duration = time.Since(start)
fmt.Printf("数据处理完成,耗时: %v\n", duration)
fmt.Printf("原始数据: %v\n", data)
fmt.Printf("处理结果: %v\n", processed)
// 流处理示例
fmt.Println("\n--- 实时流处理 ---")
streamProcessor := NewStreamProcessor()
streamProcessor.Start()
// 发送数据
go func() {
for i := 0; i < 5; i++ {
streamProcessor.Send(fmt.Sprintf("数据-%d", i))
time.Sleep(time.Millisecond * 50)
}
}()
// 接收处理结果
for i := 0; i < 5; i++ {
result := streamProcessor.Receive()
if result != nil {
fmt.Printf("流处理结果: %v\n", result)
}
}
streamProcessor.Stop()
// 并发缓存示例
fmt.Println("\n--- 并发缓存 ---")
cache := NewConcurrentCache()
// 设置缓存
cache.Set("key1", "value1", time.Second*2)
cache.Set("key2", "value2", time.Second*1)
// 读取缓存
if value, ok := cache.Get("key1"); ok {
fmt.Printf("缓存命中: key1 = %v\n", value)
}
// 等待过期
time.Sleep(time.Second * 1.5)
if value, ok := cache.Get("key2"); ok {
fmt.Printf("缓存命中: key2 = %v\n", value)
} else {
fmt.Println("key2 已过期")
}
if value, ok := cache.Get("key1"); ok {
fmt.Printf("缓存命中: key1 = %v\n", value)
}
fmt.Println("\n所有示例执行完成!")
}
性能测试和基准测试
// concurrent/benchmark/benchmark_test.go - 性能测试
package benchmark
import (
"sync"
"sync/atomic"
"testing"
)
// 测试不同并发方式的性能
func BenchmarkMutexIncrement(b *testing.B) {
var mu sync.Mutex
var counter int
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.Lock()
counter++
mu.Unlock()
}
})
}
func BenchmarkAtomicIncrement(b *testing.B) {
var counter int64
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomic.AddInt64(&counter, 1)
}
})
}
func BenchmarkChannelCommunication(b *testing.B) {
ch := make(chan int, 1000)
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
}()
b.ResetTimer()
for range ch {
// 消费数据
}
}
func BenchmarkBufferedChannel(b *testing.B) {
ch := make(chan int, 1000)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
select {
case ch <- 1:
default:
}
}
})
}
func BenchmarkUnbufferedChannel(b *testing.B) {
ch := make(chan int)
go func() {
for i := 0; i < b.N; i++ {
<-ch
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
}
总结
Go语言并发编程的核心要点:
🎯 并发基础
- Goroutine:轻量级线程,高效并发执行
- Channel:类型安全的通信机制
- Select:多路复用通信操作
- 同步原语:Mutex、RWMutex、Once等
✅ 并发模式
- Worker Pool:任务分发和处理
- Fan-out/Fan-in:并行处理和结果合并
- Pipeline:流水线处理
- Producer-Consumer:生产者消费者
🚀 最佳实践
- 避免goroutine泄漏
- 合理使用缓冲channel
- 选择合适的同步机制
- Context控制生命周期
💡 性能优化
- 控制goroutine数量
- 使用原子操作
- 减少锁竞争
- 合理设计并发架构
🔧 实际应用
- 并发HTTP客户端
- 实时数据流处理
- 并发缓存系统
- 高性能服务器
掌握Go并发编程,构建高性能应用!
Go语言的并发模型简洁而强大,通过goroutine和channel提供了优雅的并发编程体验,是构建高性能服务的理想选择。