发布于

Node.js 性能优化实战:从入门到精通

作者

Node.js 性能优化实战:从入门到精通

Node.js 以其高性能和非阻塞 I/O 而闻名,但要真正发挥其潜力,我们需要深入理解其工作原理并采用正确的优化策略。本文将全面介绍 Node.js 性能优化的各个方面。

理解 Node.js 事件循环

事件循环的工作原理

// 事件循环阶段示例
console.log('开始')

// 1. Timer 阶段
setTimeout(() => {
  console.log('setTimeout')
}, 0)

// 2. I/O 回调阶段
setImmediate(() => {
  console.log('setImmediate')
})

// 3. 微任务队列
Promise.resolve().then(() => {
  console.log('Promise')
})

// 4. process.nextTick(优先级最高)
process.nextTick(() => {
  console.log('nextTick')
})

console.log('结束')

// 输出顺序:开始 -> 结束 -> nextTick -> Promise -> setTimeout -> setImmediate

避免阻塞事件循环

// ❌ 错误:阻塞事件循环
function heavyComputation() {
  let result = 0
  for (let i = 0; i < 10000000; i++) {
    result += Math.random()
  }
  return result
}

// ✅ 正确:使用 Worker Threads
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads')

if (isMainThread) {
  // 主线程
  function heavyComputationAsync(data) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, { workerData: data })
      worker.on('message', resolve)
      worker.on('error', reject)
    })
  }

  // 使用
  heavyComputationAsync({ iterations: 10000000 }).then((result) => console.log('计算结果:', result))
} else {
  // Worker 线程
  const { iterations } = workerData
  let result = 0
  for (let i = 0; i < iterations; i++) {
    result += Math.random()
  }
  parentPort.postMessage(result)
}

内存管理优化

1. 内存泄漏检测和预防

// 内存使用监控
function monitorMemory() {
  const used = process.memoryUsage()
  console.log({
    rss: `${Math.round(used.rss / 1024 / 1024)} MB`, // 常驻内存
    heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`, // 堆总大小
    heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`, // 已使用堆
    external: `${Math.round(used.external / 1024 / 1024)} MB`, // 外部内存
  })
}

// 定期监控内存使用
setInterval(monitorMemory, 5000)

// 避免内存泄漏的最佳实践
class EventEmitterManager {
  constructor() {
    this.listeners = new Map()
  }

  addListener(emitter, event, handler) {
    emitter.on(event, handler)

    // 记录监听器以便清理
    const key = `${emitter.constructor.name}-${event}`
    if (!this.listeners.has(key)) {
      this.listeners.set(key, [])
    }
    this.listeners.get(key).push({ emitter, handler })
  }

  cleanup() {
    // 清理所有监听器
    for (const [key, listeners] of this.listeners) {
      const [, event] = key.split('-')
      listeners.forEach(({ emitter, handler }) => {
        emitter.removeListener(event, handler)
      })
    }
    this.listeners.clear()
  }
}

2. 对象池模式

// 对象池减少 GC 压力
class ObjectPool {
  constructor(createFn, resetFn, initialSize = 10) {
    this.createFn = createFn
    this.resetFn = resetFn
    this.pool = []

    // 预创建对象
    for (let i = 0; i < initialSize; i++) {
      this.pool.push(this.createFn())
    }
  }

  acquire() {
    if (this.pool.length > 0) {
      return this.pool.pop()
    }
    return this.createFn()
  }

  release(obj) {
    this.resetFn(obj)
    this.pool.push(obj)
  }
}

// 使用示例
const bufferPool = new ObjectPool(
  () => Buffer.alloc(1024), // 创建函数
  (buffer) => buffer.fill(0), // 重置函数
  50 // 初始大小
)

function processData(data) {
  const buffer = bufferPool.acquire()
  try {
    // 使用 buffer 处理数据
    buffer.write(data)
    return buffer.toString()
  } finally {
    bufferPool.release(buffer)
  }
}

数据库优化

1. 连接池管理

const mysql = require('mysql2/promise')

// 创建连接池
const pool = mysql.createPool({
  host: 'localhost',
  user: 'root',
  password: 'password',
  database: 'mydb',
  waitForConnections: true,
  connectionLimit: 10, // 最大连接数
  queueLimit: 0,
  acquireTimeout: 60000, // 获取连接超时时间
  timeout: 60000, // 查询超时时间
  reconnect: true,
})

// 数据库操作封装
class DatabaseManager {
  static async query(sql, params = []) {
    const start = Date.now()
    try {
      const [rows] = await pool.execute(sql, params)
      const duration = Date.now() - start

      // 记录慢查询
      if (duration > 1000) {
        console.warn(`慢查询警告: ${sql} - ${duration}ms`)
      }

      return rows
    } catch (error) {
      console.error('数据库查询错误:', error)
      throw error
    }
  }

  static async transaction(callback) {
    const connection = await pool.getConnection()
    await connection.beginTransaction()

    try {
      const result = await callback(connection)
      await connection.commit()
      return result
    } catch (error) {
      await connection.rollback()
      throw error
    } finally {
      connection.release()
    }
  }
}

// 使用示例
async function getUserWithPosts(userId) {
  return await DatabaseManager.transaction(async (connection) => {
    const [user] = await connection.execute('SELECT * FROM users WHERE id = ?', [userId])

    const posts = await connection.execute('SELECT * FROM posts WHERE user_id = ?', [userId])

    return { user: user[0], posts }
  })
}

2. 查询优化

// 批量操作优化
class BatchProcessor {
  constructor(batchSize = 100, flushInterval = 1000) {
    this.batchSize = batchSize
    this.flushInterval = flushInterval
    this.queue = []
    this.timer = null
  }

  add(item) {
    this.queue.push(item)

    if (this.queue.length >= this.batchSize) {
      this.flush()
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.flushInterval)
    }
  }

  async flush() {
    if (this.queue.length === 0) return

    const batch = this.queue.splice(0)
    if (this.timer) {
      clearTimeout(this.timer)
      this.timer = null
    }

    try {
      await this.processBatch(batch)
    } catch (error) {
      console.error('批量处理失败:', error)
      // 可以选择重试或记录失败的项目
    }
  }

  async processBatch(items) {
    // 批量插入
    const values = items.map((item) => [item.name, item.email, item.age])
    const placeholders = items.map(() => '(?, ?, ?)').join(', ')
    const sql = `INSERT INTO users (name, email, age) VALUES ${placeholders}`
    const flatValues = values.flat()

    await DatabaseManager.query(sql, flatValues)
    console.log(`批量插入 ${items.length} 条记录`)
  }
}

const userBatchProcessor = new BatchProcessor(50, 2000)

// 使用批量处理器
function addUser(userData) {
  userBatchProcessor.add(userData)
}

缓存策略

1. 内存缓存

// LRU 缓存实现
class LRUCache {
  constructor(capacity) {
    this.capacity = capacity
    this.cache = new Map()
  }

  get(key) {
    if (this.cache.has(key)) {
      // 移动到最前面(最近使用)
      const value = this.cache.get(key)
      this.cache.delete(key)
      this.cache.set(key, value)
      return value
    }
    return null
  }

  set(key, value) {
    if (this.cache.has(key)) {
      this.cache.delete(key)
    } else if (this.cache.size >= this.capacity) {
      // 删除最久未使用的项目
      const firstKey = this.cache.keys().next().value
      this.cache.delete(firstKey)
    }
    this.cache.set(key, value)
  }

  clear() {
    this.cache.clear()
  }

  size() {
    return this.cache.size
  }
}

// 缓存装饰器
function cached(ttl = 300000) {
  // 默认 5 分钟
  const cache = new LRUCache(1000)

  return function (target, propertyName, descriptor) {
    const originalMethod = descriptor.value

    descriptor.value = async function (...args) {
      const key = `${propertyName}:${JSON.stringify(args)}`
      const cached = cache.get(key)

      if (cached && Date.now() - cached.timestamp < ttl) {
        return cached.value
      }

      const result = await originalMethod.apply(this, args)
      cache.set(key, {
        value: result,
        timestamp: Date.now(),
      })

      return result
    }

    return descriptor
  }
}

// 使用缓存装饰器
class UserService {
  @cached(600000) // 10 分钟缓存
  async getUserById(id) {
    return await DatabaseManager.query('SELECT * FROM users WHERE id = ?', [id])
  }
}

2. Redis 缓存

const redis = require('redis')

class RedisCache {
  constructor() {
    this.client = redis.createClient({
      host: 'localhost',
      port: 6379,
      retry_strategy: (options) => {
        if (options.error && options.error.code === 'ECONNREFUSED') {
          return new Error('Redis 服务器拒绝连接')
        }
        if (options.total_retry_time > 1000 * 60 * 60) {
          return new Error('重试时间已用尽')
        }
        if (options.attempt > 10) {
          return undefined
        }
        return Math.min(options.attempt * 100, 3000)
      },
    })

    this.client.on('error', (err) => {
      console.error('Redis 错误:', err)
    })
  }

  async get(key) {
    try {
      const value = await this.client.get(key)
      return value ? JSON.parse(value) : null
    } catch (error) {
      console.error('Redis GET 错误:', error)
      return null
    }
  }

  async set(key, value, ttl = 3600) {
    try {
      await this.client.setex(key, ttl, JSON.stringify(value))
      return true
    } catch (error) {
      console.error('Redis SET 错误:', error)
      return false
    }
  }

  async del(key) {
    try {
      await this.client.del(key)
      return true
    } catch (error) {
      console.error('Redis DEL 错误:', error)
      return false
    }
  }

  async mget(keys) {
    try {
      const values = await this.client.mget(keys)
      return values.map((value) => (value ? JSON.parse(value) : null))
    } catch (error) {
      console.error('Redis MGET 错误:', error)
      return new Array(keys.length).fill(null)
    }
  }
}

const cache = new RedisCache()

// 缓存中间件
function cacheMiddleware(ttl = 3600) {
  return async (req, res, next) => {
    const key = `cache:${req.method}:${req.originalUrl}`

    try {
      const cached = await cache.get(key)
      if (cached) {
        return res.json(cached)
      }

      // 重写 res.json 以缓存响应
      const originalJson = res.json
      res.json = function (data) {
        cache.set(key, data, ttl)
        return originalJson.call(this, data)
      }

      next()
    } catch (error) {
      console.error('缓存中间件错误:', error)
      next()
    }
  }
}

HTTP 性能优化

1. 压缩和缓存

const express = require('express')
const compression = require('compression')
const helmet = require('helmet')

const app = express()

// 启用 gzip 压缩
app.use(
  compression({
    filter: (req, res) => {
      if (req.headers['x-no-compression']) {
        return false
      }
      return compression.filter(req, res)
    },
    level: 6, // 压缩级别 1-9
    threshold: 1024, // 只压缩大于 1KB 的响应
  })
)

// 安全头部
app.use(helmet())

// 静态文件缓存
app.use(
  '/static',
  express.static('public', {
    maxAge: '1y', // 缓存一年
    etag: true,
    lastModified: true,
  })
)

// API 响应缓存
app.get('/api/users', cacheMiddleware(300), async (req, res) => {
  const users = await UserService.getAllUsers()
  res.json(users)
})

2. 请求限流

const rateLimit = require('express-rate-limit')

// 基础限流
const limiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 分钟
  max: 100, // 最多 100 个请求
  message: '请求过于频繁,请稍后再试',
  standardHeaders: true,
  legacyHeaders: false,
})

// 高级限流(基于用户)
const createUserLimiter = rateLimit({
  windowMs: 60 * 60 * 1000, // 1 小时
  max: 5, // 每小时最多创建 5 个用户
  keyGenerator: (req) => req.user?.id || req.ip,
  skip: (req) => req.user?.role === 'admin',
})

app.use('/api/', limiter)
app.post('/api/users', createUserLimiter, async (req, res) => {
  // 创建用户逻辑
})

监控和诊断

1. 性能监控

// 性能指标收集
class PerformanceMonitor {
  constructor() {
    this.metrics = {
      requests: 0,
      errors: 0,
      responseTime: [],
      memoryUsage: [],
    }

    // 定期收集内存使用情况
    setInterval(() => {
      const usage = process.memoryUsage()
      this.metrics.memoryUsage.push({
        timestamp: Date.now(),
        heapUsed: usage.heapUsed,
        heapTotal: usage.heapTotal,
        rss: usage.rss,
      })

      // 只保留最近 1000 条记录
      if (this.metrics.memoryUsage.length > 1000) {
        this.metrics.memoryUsage.shift()
      }
    }, 5000)
  }

  recordRequest(duration, error = false) {
    this.metrics.requests++
    if (error) this.metrics.errors++

    this.metrics.responseTime.push({
      timestamp: Date.now(),
      duration,
    })

    // 只保留最近 1000 条记录
    if (this.metrics.responseTime.length > 1000) {
      this.metrics.responseTime.shift()
    }
  }

  getStats() {
    const recentResponseTimes = this.metrics.responseTime
      .filter((r) => Date.now() - r.timestamp < 300000) // 最近 5 分钟
      .map((r) => r.duration)

    return {
      totalRequests: this.metrics.requests,
      totalErrors: this.metrics.errors,
      errorRate: this.metrics.errors / this.metrics.requests,
      avgResponseTime: recentResponseTimes.reduce((a, b) => a + b, 0) / recentResponseTimes.length,
      p95ResponseTime: this.percentile(recentResponseTimes, 0.95),
      currentMemoryUsage: process.memoryUsage(),
    }
  }

  percentile(arr, p) {
    const sorted = arr.sort((a, b) => a - b)
    const index = Math.ceil(sorted.length * p) - 1
    return sorted[index]
  }
}

const monitor = new PerformanceMonitor()

// 监控中间件
app.use((req, res, next) => {
  const start = Date.now()

  res.on('finish', () => {
    const duration = Date.now() - start
    const error = res.statusCode >= 400
    monitor.recordRequest(duration, error)
  })

  next()
})

// 监控端点
app.get('/metrics', (req, res) => {
  res.json(monitor.getStats())
})

总结

Node.js 性能优化是一个系统性工程,需要从多个维度进行考虑:

  1. 事件循环优化:避免阻塞操作,合理使用 Worker Threads
  2. 内存管理:监控内存使用,防止内存泄漏,使用对象池
  3. 数据库优化:连接池管理,批量操作,查询优化
  4. 缓存策略:多层缓存,合理的 TTL 设置
  5. HTTP 优化:压缩、缓存、限流
  6. 监控诊断:实时监控,性能指标收集

通过这些优化策略,你可以构建出高性能、可扩展的 Node.js 应用!


性能优化是一个持续的过程,需要根据实际业务场景进行调整和优化。