发布于

Serverless架构实战指南:从函数计算到全栈应用

作者

Serverless架构实战指南:从函数计算到全栈应用

Serverless架构代表了云计算的新范式,让开发者专注于业务逻辑而无需管理服务器基础设施。本文将深入探讨Serverless的核心概念和实战应用。

Serverless基础概念

函数即服务(FaaS)

// AWS Lambda函数示例
// handler.js
exports.handler = async (event, context) => {
  try {
    // 解析请求
    const { httpMethod, path, queryStringParameters, body } = event
    const requestBody = body ? JSON.parse(body) : {}
    
    // 路由处理
    const router = new Router()
    const response = await router.handle(httpMethod, path, {
      query: queryStringParameters || {},
      body: requestBody,
      headers: event.headers,
      context
    })
    
    return {
      statusCode: response.statusCode || 200,
      headers: {
        'Content-Type': 'application/json',
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Headers': 'Content-Type,Authorization',
        'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS',
        ...response.headers
      },
      body: JSON.stringify(response.body)
    }
  } catch (error) {
    console.error('Lambda error:', error)
    
    return {
      statusCode: 500,
      headers: {
        'Content-Type': 'application/json',
        'Access-Control-Allow-Origin': '*'
      },
      body: JSON.stringify({
        error: 'Internal Server Error',
        message: process.env.NODE_ENV === 'development' ? error.message : 'Something went wrong'
      })
    }
  }
}

// 路由器类
class Router {
  constructor() {
    this.routes = new Map()
    this.middleware = []
    this.setupRoutes()
  }
  
  setupRoutes() {
    // 用户相关路由
    this.get('/api/users', this.getUsers.bind(this))
    this.post('/api/users', this.createUser.bind(this))
    this.get('/api/users/:id', this.getUser.bind(this))
    this.put('/api/users/:id', this.updateUser.bind(this))
    this.delete('/api/users/:id', this.deleteUser.bind(this))
    
    // 认证路由
    this.post('/api/auth/login', this.login.bind(this))
    this.post('/api/auth/register', this.register.bind(this))
    this.post('/api/auth/refresh', this.refreshToken.bind(this))
    
    // 文件上传
    this.post('/api/upload', this.uploadFile.bind(this))
    
    // 健康检查
    this.get('/api/health', this.healthCheck.bind(this))
  }
  
  // 添加中间件
  use(middleware) {
    this.middleware.push(middleware)
  }
  
  // HTTP方法注册
  get(path, handler) {
    this.addRoute('GET', path, handler)
  }
  
  post(path, handler) {
    this.addRoute('POST', path, handler)
  }
  
  put(path, handler) {
    this.addRoute('PUT', path, handler)
  }
  
  delete(path, handler) {
    this.addRoute('DELETE', path, handler)
  }
  
  addRoute(method, path, handler) {
    const key = `${method}:${path}`
    this.routes.set(key, {
      method,
      path,
      handler,
      params: this.extractParams(path)
    })
  }
  
  // 提取路径参数
  extractParams(path) {
    const params = []
    const parts = path.split('/')
    
    parts.forEach((part, index) => {
      if (part.startsWith(':')) {
        params.push({
          name: part.slice(1),
          index
        })
      }
    })
    
    return params
  }
  
  // 处理请求
  async handle(method, path, request) {
    try {
      // 应用中间件
      for (const middleware of this.middleware) {
        const result = await middleware(request)
        if (result && result.statusCode) {
          return result
        }
      }
      
      // 查找匹配的路由
      const route = this.findRoute(method, path)
      if (!route) {
        return {
          statusCode: 404,
          body: { error: 'Not Found' }
        }
      }
      
      // 提取路径参数
      const params = this.extractPathParams(route, path)
      request.params = params
      
      // 执行处理器
      const result = await route.handler(request)
      return result
      
    } catch (error) {
      console.error('Router error:', error)
      return {
        statusCode: 500,
        body: { error: 'Internal Server Error' }
      }
    }
  }
  
  // 查找路由
  findRoute(method, path) {
    // 精确匹配
    const exactKey = `${method}:${path}`
    if (this.routes.has(exactKey)) {
      return this.routes.get(exactKey)
    }
    
    // 参数匹配
    for (const [key, route] of this.routes) {
      if (route.method === method && this.matchPath(route.path, path)) {
        return route
      }
    }
    
    return null
  }
  
  // 路径匹配
  matchPath(routePath, requestPath) {
    const routeParts = routePath.split('/')
    const requestParts = requestPath.split('/')
    
    if (routeParts.length !== requestParts.length) {
      return false
    }
    
    return routeParts.every((part, index) => {
      return part.startsWith(':') || part === requestParts[index]
    })
  }
  
  // 提取路径参数值
  extractPathParams(route, path) {
    const params = {}
    const routeParts = route.path.split('/')
    const pathParts = path.split('/')
    
    routeParts.forEach((part, index) => {
      if (part.startsWith(':')) {
        const paramName = part.slice(1)
        params[paramName] = pathParts[index]
      }
    })
    
    return params
  }
  
  // 路由处理器
  async getUsers(request) {
    const { query } = request
    const page = parseInt(query.page) || 1
    const limit = parseInt(query.limit) || 10
    const search = query.search || ''
    
    try {
      const users = await UserService.getUsers({ page, limit, search })
      return {
        statusCode: 200,
        body: users
      }
    } catch (error) {
      return {
        statusCode: 500,
        body: { error: error.message }
      }
    }
  }
  
  async createUser(request) {
    const { body } = request
    
    try {
      // 验证输入
      const validation = UserValidator.validate(body)
      if (!validation.isValid) {
        return {
          statusCode: 400,
          body: { error: 'Validation failed', details: validation.errors }
        }
      }
      
      const user = await UserService.createUser(body)
      return {
        statusCode: 201,
        body: user
      }
    } catch (error) {
      return {
        statusCode: 500,
        body: { error: error.message }
      }
    }
  }
  
  async getUser(request) {
    const { params } = request
    const userId = params.id
    
    try {
      const user = await UserService.getUserById(userId)
      if (!user) {
        return {
          statusCode: 404,
          body: { error: 'User not found' }
        }
      }
      
      return {
        statusCode: 200,
        body: user
      }
    } catch (error) {
      return {
        statusCode: 500,
        body: { error: error.message }
      }
    }
  }
  
  async updateUser(request) {
    const { params, body } = request
    const userId = params.id
    
    try {
      const user = await UserService.updateUser(userId, body)
      if (!user) {
        return {
          statusCode: 404,
          body: { error: 'User not found' }
        }
      }
      
      return {
        statusCode: 200,
        body: user
      }
    } catch (error) {
      return {
        statusCode: 500,
        body: { error: error.message }
      }
    }
  }
  
  async deleteUser(request) {
    const { params } = request
    const userId = params.id
    
    try {
      const deleted = await UserService.deleteUser(userId)
      if (!deleted) {
        return {
          statusCode: 404,
          body: { error: 'User not found' }
        }
      }
      
      return {
        statusCode: 204,
        body: null
      }
    } catch (error) {
      return {
        statusCode: 500,
        body: { error: error.message }
      }
    }
  }
  
  async login(request) {
    const { body } = request
    const { email, password } = body
    
    try {
      const result = await AuthService.login(email, password)
      return {
        statusCode: 200,
        body: result
      }
    } catch (error) {
      return {
        statusCode: 401,
        body: { error: error.message }
      }
    }
  }
  
  async register(request) {
    const { body } = request
    
    try {
      const user = await AuthService.register(body)
      return {
        statusCode: 201,
        body: user
      }
    } catch (error) {
      return {
        statusCode: 400,
        body: { error: error.message }
      }
    }
  }
  
  async uploadFile(request) {
    const { body, headers } = request
    
    try {
      const result = await FileService.uploadFile(body, headers)
      return {
        statusCode: 200,
        body: result
      }
    } catch (error) {
      return {
        statusCode: 500,
        body: { error: error.message }
      }
    }
  }
  
  async healthCheck(request) {
    return {
      statusCode: 200,
      body: {
        status: 'healthy',
        timestamp: new Date().toISOString(),
        version: process.env.VERSION || '1.0.0'
      }
    }
  }
}

// 用户服务
class UserService {
  static async getUsers({ page = 1, limit = 10, search = '' }) {
    const db = await DatabaseConnection.getInstance()
    
    let query = 'SELECT * FROM users WHERE 1=1'
    const params = []
    
    if (search) {
      query += ' AND (name ILIKE $' + (params.length + 1) + ' OR email ILIKE $' + (params.length + 2) + ')'
      params.push(`%${search}%`, `%${search}%`)
    }
    
    query += ' ORDER BY created_at DESC LIMIT $' + (params.length + 1) + ' OFFSET $' + (params.length + 2)
    params.push(limit, (page - 1) * limit)
    
    const result = await db.query(query, params)
    
    // 获取总数
    let countQuery = 'SELECT COUNT(*) FROM users WHERE 1=1'
    const countParams = []
    
    if (search) {
      countQuery += ' AND (name ILIKE $1 OR email ILIKE $2)'
      countParams.push(`%${search}%`, `%${search}%`)
    }
    
    const countResult = await db.query(countQuery, countParams)
    const total = parseInt(countResult.rows[0].count)
    
    return {
      users: result.rows,
      pagination: {
        page,
        limit,
        total,
        pages: Math.ceil(total / limit)
      }
    }
  }
  
  static async getUserById(id) {
    const db = await DatabaseConnection.getInstance()
    const result = await db.query('SELECT * FROM users WHERE id = $1', [id])
    return result.rows[0] || null
  }
  
  static async createUser(userData) {
    const db = await DatabaseConnection.getInstance()
    const { name, email, password } = userData
    
    // 检查邮箱是否已存在
    const existing = await db.query('SELECT id FROM users WHERE email = $1', [email])
    if (existing.rows.length > 0) {
      throw new Error('Email already exists')
    }
    
    // 哈希密码
    const hashedPassword = await bcrypt.hash(password, 10)
    
    const result = await db.query(
      'INSERT INTO users (name, email, password, created_at) VALUES ($1, $2, $3, NOW()) RETURNING *',
      [name, email, hashedPassword]
    )
    
    const user = result.rows[0]
    delete user.password // 不返回密码
    
    return user
  }
  
  static async updateUser(id, userData) {
    const db = await DatabaseConnection.getInstance()
    const { name, email } = userData
    
    const result = await db.query(
      'UPDATE users SET name = $1, email = $2, updated_at = NOW() WHERE id = $3 RETURNING *',
      [name, email, id]
    )
    
    if (result.rows.length === 0) {
      return null
    }
    
    const user = result.rows[0]
    delete user.password
    
    return user
  }
  
  static async deleteUser(id) {
    const db = await DatabaseConnection.getInstance()
    const result = await db.query('DELETE FROM users WHERE id = $1', [id])
    return result.rowCount > 0
  }
}

// 认证服务
class AuthService {
  static async login(email, password) {
    const db = await DatabaseConnection.getInstance()
    const result = await db.query('SELECT * FROM users WHERE email = $1', [email])
    
    if (result.rows.length === 0) {
      throw new Error('Invalid credentials')
    }
    
    const user = result.rows[0]
    const isValidPassword = await bcrypt.compare(password, user.password)
    
    if (!isValidPassword) {
      throw new Error('Invalid credentials')
    }
    
    // 生成JWT令牌
    const token = jwt.sign(
      { userId: user.id, email: user.email },
      process.env.JWT_SECRET,
      { expiresIn: '24h' }
    )
    
    const refreshToken = jwt.sign(
      { userId: user.id },
      process.env.JWT_REFRESH_SECRET,
      { expiresIn: '7d' }
    )
    
    delete user.password
    
    return {
      user,
      token,
      refreshToken
    }
  }
  
  static async register(userData) {
    return UserService.createUser(userData)
  }
  
  static async refreshToken(refreshToken) {
    try {
      const decoded = jwt.verify(refreshToken, process.env.JWT_REFRESH_SECRET)
      const user = await UserService.getUserById(decoded.userId)
      
      if (!user) {
        throw new Error('User not found')
      }
      
      const newToken = jwt.sign(
        { userId: user.id, email: user.email },
        process.env.JWT_SECRET,
        { expiresIn: '24h' }
      )
      
      return { token: newToken }
    } catch (error) {
      throw new Error('Invalid refresh token')
    }
  }
}

// 数据库连接
class DatabaseConnection {
  static instance = null
  
  static async getInstance() {
    if (!this.instance) {
      const { Pool } = require('pg')
      this.instance = new Pool({
        connectionString: process.env.DATABASE_URL,
        ssl: process.env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false
      })
    }
    return this.instance
  }
}

// 用户验证器
class UserValidator {
  static validate(userData) {
    const errors = []
    const { name, email, password } = userData
    
    if (!name || name.trim().length < 2) {
      errors.push('Name must be at least 2 characters long')
    }
    
    if (!email || !this.isValidEmail(email)) {
      errors.push('Valid email is required')
    }
    
    if (!password || password.length < 6) {
      errors.push('Password must be at least 6 characters long')
    }
    
    return {
      isValid: errors.length === 0,
      errors
    }
  }
  
  static isValidEmail(email) {
    const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/
    return emailRegex.test(email)
  }
}

// 文件服务
class FileService {
  static async uploadFile(fileData, headers) {
    const AWS = require('aws-sdk')
    const s3 = new AWS.S3()
    
    const contentType = headers['content-type'] || 'application/octet-stream'
    const fileName = `uploads/${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
    
    const params = {
      Bucket: process.env.S3_BUCKET,
      Key: fileName,
      Body: Buffer.from(fileData, 'base64'),
      ContentType: contentType,
      ACL: 'public-read'
    }
    
    const result = await s3.upload(params).promise()
    
    return {
      url: result.Location,
      key: result.Key,
      bucket: result.Bucket
    }
  }
}

Vercel Functions示例

// api/users/index.js - Vercel API路由
import { NextApiRequest, NextApiResponse } from 'next'
import { PrismaClient } from '@prisma/client'
import jwt from 'jsonwebtoken'

const prisma = new PrismaClient()

// 中间件:认证检查
function withAuth(handler) {
  return async (req, res) => {
    try {
      const token = req.headers.authorization?.replace('Bearer ', '')
      
      if (!token) {
        return res.status(401).json({ error: 'No token provided' })
      }
      
      const decoded = jwt.verify(token, process.env.JWT_SECRET)
      req.user = decoded
      
      return handler(req, res)
    } catch (error) {
      return res.status(401).json({ error: 'Invalid token' })
    }
  }
}

// 中间件:CORS
function withCORS(handler) {
  return async (req, res) => {
    res.setHeader('Access-Control-Allow-Origin', '*')
    res.setHeader('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS')
    res.setHeader('Access-Control-Allow-Headers', 'Content-Type,Authorization')
    
    if (req.method === 'OPTIONS') {
      return res.status(200).end()
    }
    
    return handler(req, res)
  }
}

// 中间件:错误处理
function withErrorHandling(handler) {
  return async (req, res) => {
    try {
      return await handler(req, res)
    } catch (error) {
      console.error('API Error:', error)
      
      if (error.code === 'P2002') {
        return res.status(400).json({ error: 'Duplicate entry' })
      }
      
      return res.status(500).json({ 
        error: 'Internal Server Error',
        message: process.env.NODE_ENV === 'development' ? error.message : undefined
      })
    }
  }
}

// 组合中间件
function withMiddleware(handler, middlewares = []) {
  return middlewares.reduce((acc, middleware) => middleware(acc), handler)
}

// 用户API处理器
async function usersHandler(req, res) {
  const { method, query, body } = req
  
  switch (method) {
    case 'GET':
      return handleGetUsers(req, res)
    case 'POST':
      return handleCreateUser(req, res)
    default:
      res.setHeader('Allow', ['GET', 'POST'])
      return res.status(405).json({ error: `Method ${method} not allowed` })
  }
}

async function handleGetUsers(req, res) {
  const { page = 1, limit = 10, search = '' } = req.query
  
  const skip = (parseInt(page) - 1) * parseInt(limit)
  const take = parseInt(limit)
  
  const where = search ? {
    OR: [
      { name: { contains: search, mode: 'insensitive' } },
      { email: { contains: search, mode: 'insensitive' } }
    ]
  } : {}
  
  const [users, total] = await Promise.all([
    prisma.user.findMany({
      where,
      skip,
      take,
      select: {
        id: true,
        name: true,
        email: true,
        createdAt: true,
        updatedAt: true
      },
      orderBy: { createdAt: 'desc' }
    }),
    prisma.user.count({ where })
  ])
  
  return res.status(200).json({
    users,
    pagination: {
      page: parseInt(page),
      limit: parseInt(limit),
      total,
      pages: Math.ceil(total / parseInt(limit))
    }
  })
}

async function handleCreateUser(req, res) {
  const { name, email, password } = req.body
  
  // 验证输入
  if (!name || !email || !password) {
    return res.status(400).json({ error: 'Name, email, and password are required' })
  }
  
  if (password.length < 6) {
    return res.status(400).json({ error: 'Password must be at least 6 characters long' })
  }
  
  // 检查邮箱是否已存在
  const existingUser = await prisma.user.findUnique({
    where: { email }
  })
  
  if (existingUser) {
    return res.status(400).json({ error: 'Email already exists' })
  }
  
  // 创建用户
  const bcrypt = require('bcryptjs')
  const hashedPassword = await bcrypt.hash(password, 10)
  
  const user = await prisma.user.create({
    data: {
      name,
      email,
      password: hashedPassword
    },
    select: {
      id: true,
      name: true,
      email: true,
      createdAt: true
    }
  })
  
  return res.status(201).json(user)
}

// 导出处理器
export default withMiddleware(usersHandler, [
  withCORS,
  withErrorHandling,
  withAuth
])

// api/users/[id].js - 动态路由
export default withMiddleware(async (req, res) => {
  const { method, query } = req
  const { id } = query
  
  switch (method) {
    case 'GET':
      return handleGetUser(req, res, id)
    case 'PUT':
      return handleUpdateUser(req, res, id)
    case 'DELETE':
      return handleDeleteUser(req, res, id)
    default:
      res.setHeader('Allow', ['GET', 'PUT', 'DELETE'])
      return res.status(405).json({ error: `Method ${method} not allowed` })
  }
}, [withCORS, withErrorHandling, withAuth])

async function handleGetUser(req, res, id) {
  const user = await prisma.user.findUnique({
    where: { id },
    select: {
      id: true,
      name: true,
      email: true,
      createdAt: true,
      updatedAt: true
    }
  })
  
  if (!user) {
    return res.status(404).json({ error: 'User not found' })
  }
  
  return res.status(200).json(user)
}

async function handleUpdateUser(req, res, id) {
  const { name, email } = req.body
  
  const user = await prisma.user.update({
    where: { id },
    data: { name, email },
    select: {
      id: true,
      name: true,
      email: true,
      updatedAt: true
    }
  })
  
  return res.status(200).json(user)
}

async function handleDeleteUser(req, res, id) {
  await prisma.user.delete({
    where: { id }
  })
  
  return res.status(204).end()
}

事件驱动架构

事件处理系统

// 事件驱动的Serverless架构
class EventProcessor {
  constructor() {
    this.handlers = new Map()
    this.middleware = []
    this.deadLetterQueue = []
  }
  
  // 注册事件处理器
  on(eventType, handler) {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, [])
    }
    this.handlers.get(eventType).push(handler)
  }
  
  // 添加中间件
  use(middleware) {
    this.middleware.push(middleware)
  }
  
  // 处理事件
  async process(event) {
    try {
      // 应用中间件
      let processedEvent = event
      for (const middleware of this.middleware) {
        processedEvent = await middleware(processedEvent)
      }
      
      const { eventType } = processedEvent
      const handlers = this.handlers.get(eventType) || []
      
      if (handlers.length === 0) {
        console.warn(`No handlers found for event type: ${eventType}`)
        return
      }
      
      // 并行处理所有处理器
      const results = await Promise.allSettled(
        handlers.map(handler => handler(processedEvent))
      )
      
      // 检查失败的处理器
      const failures = results.filter(result => result.status === 'rejected')
      if (failures.length > 0) {
        console.error('Some event handlers failed:', failures)
        await this.handleFailures(processedEvent, failures)
      }
      
      return results
    } catch (error) {
      console.error('Event processing error:', error)
      await this.sendToDeadLetterQueue(event, error)
      throw error
    }
  }
  
  // 处理失败
  async handleFailures(event, failures) {
    // 重试逻辑
    const retryableFailures = failures.filter(failure => 
      this.isRetryable(failure.reason)
    )
    
    if (retryableFailures.length > 0) {
      await this.scheduleRetry(event, retryableFailures)
    }
  }
  
  // 判断是否可重试
  isRetryable(error) {
    const retryableErrors = [
      'ECONNRESET',
      'ETIMEDOUT',
      'ENOTFOUND'
    ]
    
    return retryableErrors.some(code => 
      error.code === code || error.message.includes(code)
    )
  }
  
  // 调度重试
  async scheduleRetry(event, failures) {
    // 使用AWS SQS或其他消息队列进行重试
    const retryEvent = {
      ...event,
      retryCount: (event.retryCount || 0) + 1,
      retryAt: Date.now() + (Math.pow(2, event.retryCount || 0) * 1000) // 指数退避
    }
    
    if (retryEvent.retryCount <= 3) {
      await this.sendToRetryQueue(retryEvent)
    } else {
      await this.sendToDeadLetterQueue(event, new Error('Max retries exceeded'))
    }
  }
  
  // 发送到重试队列
  async sendToRetryQueue(event) {
    // 实现重试队列逻辑
    console.log('Sending to retry queue:', event)
  }
  
  // 发送到死信队列
  async sendToDeadLetterQueue(event, error) {
    this.deadLetterQueue.push({
      event,
      error: error.message,
      timestamp: new Date().toISOString()
    })
    
    console.error('Event sent to dead letter queue:', { event, error: error.message })
  }
}

// 用户事件处理器
class UserEventHandlers {
  static async handleUserCreated(event) {
    const { user } = event.data
    
    // 发送欢迎邮件
    await EmailService.sendWelcomeEmail(user)
    
    // 创建用户配置文件
    await ProfileService.createProfile(user.id)
    
    // 记录分析事件
    await AnalyticsService.track('user_created', {
      userId: user.id,
      email: user.email,
      timestamp: event.timestamp
    })
    
    console.log(`User created event processed for user: ${user.id}`)
  }
  
  static async handleUserUpdated(event) {
    const { user, changes } = event.data
    
    // 如果邮箱发生变化,发送确认邮件
    if (changes.email) {
      await EmailService.sendEmailChangeConfirmation(user)
    }
    
    // 更新搜索索引
    await SearchService.updateUserIndex(user)
    
    // 记录分析事件
    await AnalyticsService.track('user_updated', {
      userId: user.id,
      changes: Object.keys(changes),
      timestamp: event.timestamp
    })
    
    console.log(`User updated event processed for user: ${user.id}`)
  }
  
  static async handleUserDeleted(event) {
    const { userId } = event.data
    
    // 清理用户相关数据
    await ProfileService.deleteProfile(userId)
    await FileService.deleteUserFiles(userId)
    await SearchService.removeFromIndex(userId)
    
    // 记录分析事件
    await AnalyticsService.track('user_deleted', {
      userId,
      timestamp: event.timestamp
    })
    
    console.log(`User deleted event processed for user: ${userId}`)
  }
}

// 订单事件处理器
class OrderEventHandlers {
  static async handleOrderCreated(event) {
    const { order } = event.data
    
    // 发送订单确认邮件
    await EmailService.sendOrderConfirmation(order)
    
    // 更新库存
    await InventoryService.reserveItems(order.items)
    
    // 创建发货任务
    await ShippingService.createShippingTask(order)
    
    // 记录分析事件
    await AnalyticsService.track('order_created', {
      orderId: order.id,
      userId: order.userId,
      amount: order.total,
      timestamp: event.timestamp
    })
  }
  
  static async handleOrderPaid(event) {
    const { order, payment } = event.data
    
    // 发送支付确认
    await EmailService.sendPaymentConfirmation(order, payment)
    
    // 触发发货流程
    await ShippingService.processShipping(order.id)
    
    // 更新订单状态
    await OrderService.updateStatus(order.id, 'paid')
    
    // 记录分析事件
    await AnalyticsService.track('order_paid', {
      orderId: order.id,
      paymentId: payment.id,
      amount: payment.amount,
      timestamp: event.timestamp
    })
  }
  
  static async handleOrderShipped(event) {
    const { order, tracking } = event.data
    
    // 发送发货通知
    await EmailService.sendShippingNotification(order, tracking)
    
    // 发送短信通知
    await SMSService.sendShippingNotification(order.phone, tracking.number)
    
    // 更新订单状态
    await OrderService.updateStatus(order.id, 'shipped')
    
    // 记录分析事件
    await AnalyticsService.track('order_shipped', {
      orderId: order.id,
      trackingNumber: tracking.number,
      timestamp: event.timestamp
    })
  }
}

// 事件中间件
const loggingMiddleware = async (event) => {
  console.log('Processing event:', {
    type: event.eventType,
    id: event.id,
    timestamp: event.timestamp
  })
  return event
}

const validationMiddleware = async (event) => {
  if (!event.eventType) {
    throw new Error('Event type is required')
  }
  
  if (!event.id) {
    event.id = generateEventId()
  }
  
  if (!event.timestamp) {
    event.timestamp = new Date().toISOString()
  }
  
  return event
}

const enrichmentMiddleware = async (event) => {
  // 添加元数据
  event.metadata = {
    source: 'serverless-function',
    version: '1.0.0',
    environment: process.env.NODE_ENV || 'development'
  }
  
  return event
}

// 初始化事件处理器
const eventProcessor = new EventProcessor()

// 添加中间件
eventProcessor.use(validationMiddleware)
eventProcessor.use(loggingMiddleware)
eventProcessor.use(enrichmentMiddleware)

// 注册事件处理器
eventProcessor.on('user.created', UserEventHandlers.handleUserCreated)
eventProcessor.on('user.updated', UserEventHandlers.handleUserUpdated)
eventProcessor.on('user.deleted', UserEventHandlers.handleUserDeleted)

eventProcessor.on('order.created', OrderEventHandlers.handleOrderCreated)
eventProcessor.on('order.paid', OrderEventHandlers.handleOrderPaid)
eventProcessor.on('order.shipped', OrderEventHandlers.handleOrderShipped)

// Lambda处理器
exports.eventHandler = async (event, context) => {
  try {
    // 处理SQS消息
    if (event.Records) {
      const results = await Promise.allSettled(
        event.Records.map(record => {
          const eventData = JSON.parse(record.body)
          return eventProcessor.process(eventData)
        })
      )
      
      return {
        statusCode: 200,
        body: JSON.stringify({
          processed: results.length,
          successful: results.filter(r => r.status === 'fulfilled').length,
          failed: results.filter(r => r.status === 'rejected').length
        })
      }
    }
    
    // 处理直接事件
    await eventProcessor.process(event)
    
    return {
      statusCode: 200,
      body: JSON.stringify({ message: 'Event processed successfully' })
    }
  } catch (error) {
    console.error('Event handler error:', error)
    
    return {
      statusCode: 500,
      body: JSON.stringify({ error: error.message })
    }
  }
}

// 工具函数
function generateEventId() {
  return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}

数据库集成

Serverless数据库模式

// Serverless数据库连接管理
class ServerlessDatabase {
  constructor(config) {
    this.config = config
    this.pool = null
    this.connectionCount = 0
    this.maxConnections = config.maxConnections || 5
  }
  
  async getConnection() {
    if (!this.pool) {
      await this.createPool()
    }
    
    if (this.connectionCount >= this.maxConnections) {
      throw new Error('Connection pool exhausted')
    }
    
    this.connectionCount++
    return this.pool
  }
  
  async createPool() {
    const { Pool } = require('pg')
    
    this.pool = new Pool({
      connectionString: this.config.connectionString,
      ssl: this.config.ssl,
      max: this.maxConnections,
      idleTimeoutMillis: 30000,
      connectionTimeoutMillis: 2000,
    })
    
    // 连接事件监听
    this.pool.on('connect', () => {
      console.log('Database connected')
    })
    
    this.pool.on('error', (err) => {
      console.error('Database error:', err)
    })
  }
  
  async query(text, params) {
    const client = await this.getConnection()
    
    try {
      const start = Date.now()
      const result = await client.query(text, params)
      const duration = Date.now() - start
      
      console.log('Query executed:', { text, duration, rows: result.rowCount })
      return result
    } finally {
      this.connectionCount--
    }
  }
  
  async transaction(callback) {
    const client = await this.pool.connect()
    
    try {
      await client.query('BEGIN')
      const result = await callback(client)
      await client.query('COMMIT')
      return result
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }
  
  async close() {
    if (this.pool) {
      await this.pool.end()
      this.pool = null
    }
  }
}

// 数据访问层
class UserRepository {
  constructor(database) {
    this.db = database
  }
  
  async findById(id) {
    const result = await this.db.query(
      'SELECT * FROM users WHERE id = $1',
      [id]
    )
    return result.rows[0] || null
  }
  
  async findByEmail(email) {
    const result = await this.db.query(
      'SELECT * FROM users WHERE email = $1',
      [email]
    )
    return result.rows[0] || null
  }
  
  async create(userData) {
    const { name, email, password } = userData
    
    const result = await this.db.query(
      `INSERT INTO users (name, email, password, created_at, updated_at) 
       VALUES ($1, $2, $3, NOW(), NOW()) 
       RETURNING *`,
      [name, email, password]
    )
    
    return result.rows[0]
  }
  
  async update(id, userData) {
    const fields = []
    const values = []
    let paramCount = 1
    
    Object.entries(userData).forEach(([key, value]) => {
      if (value !== undefined) {
        fields.push(`${key} = $${paramCount}`)
        values.push(value)
        paramCount++
      }
    })
    
    if (fields.length === 0) {
      throw new Error('No fields to update')
    }
    
    fields.push(`updated_at = NOW()`)
    values.push(id)
    
    const query = `
      UPDATE users 
      SET ${fields.join(', ')} 
      WHERE id = $${paramCount} 
      RETURNING *
    `
    
    const result = await this.db.query(query, values)
    return result.rows[0] || null
  }
  
  async delete(id) {
    const result = await this.db.query(
      'DELETE FROM users WHERE id = $1',
      [id]
    )
    return result.rowCount > 0
  }
  
  async findMany(options = {}) {
    const { 
      limit = 10, 
      offset = 0, 
      search = '', 
      orderBy = 'created_at',
      orderDirection = 'DESC' 
    } = options
    
    let query = 'SELECT * FROM users WHERE 1=1'
    const params = []
    let paramCount = 1
    
    if (search) {
      query += ` AND (name ILIKE $${paramCount} OR email ILIKE $${paramCount + 1})`
      params.push(`%${search}%`, `%${search}%`)
      paramCount += 2
    }
    
    query += ` ORDER BY ${orderBy} ${orderDirection}`
    query += ` LIMIT $${paramCount} OFFSET $${paramCount + 1}`
    params.push(limit, offset)
    
    const result = await this.db.query(query, params)
    return result.rows
  }
  
  async count(options = {}) {
    const { search = '' } = options
    
    let query = 'SELECT COUNT(*) FROM users WHERE 1=1'
    const params = []
    
    if (search) {
      query += ' AND (name ILIKE $1 OR email ILIKE $2)'
      params.push(`%${search}%`, `%${search}%`)
    }
    
    const result = await this.db.query(query, params)
    return parseInt(result.rows[0].count)
  }
}

// 缓存层
class CacheManager {
  constructor(config = {}) {
    this.config = {
      ttl: config.ttl || 300, // 5分钟默认TTL
      prefix: config.prefix || 'cache:',
      ...config
    }
    this.cache = new Map()
    this.timers = new Map()
  }
  
  generateKey(key) {
    return `${this.config.prefix}${key}`
  }
  
  set(key, value, ttl = this.config.ttl) {
    const cacheKey = this.generateKey(key)
    
    // 清除现有定时器
    if (this.timers.has(cacheKey)) {
      clearTimeout(this.timers.get(cacheKey))
    }
    
    // 设置缓存值
    this.cache.set(cacheKey, {
      value,
      createdAt: Date.now(),
      ttl
    })
    
    // 设置过期定时器
    const timer = setTimeout(() => {
      this.delete(key)
    }, ttl * 1000)
    
    this.timers.set(cacheKey, timer)
  }
  
  get(key) {
    const cacheKey = this.generateKey(key)
    const item = this.cache.get(cacheKey)
    
    if (!item) {
      return null
    }
    
    // 检查是否过期
    const now = Date.now()
    if (now - item.createdAt > item.ttl * 1000) {
      this.delete(key)
      return null
    }
    
    return item.value
  }
  
  delete(key) {
    const cacheKey = this.generateKey(key)
    
    // 清除定时器
    if (this.timers.has(cacheKey)) {
      clearTimeout(this.timers.get(cacheKey))
      this.timers.delete(cacheKey)
    }
    
    // 删除缓存项
    return this.cache.delete(cacheKey)
  }
  
  clear() {
    // 清除所有定时器
    this.timers.forEach(timer => clearTimeout(timer))
    this.timers.clear()
    
    // 清除所有缓存
    this.cache.clear()
  }
  
  async getOrSet(key, factory, ttl) {
    let value = this.get(key)
    
    if (value === null) {
      value = await factory()
      this.set(key, value, ttl)
    }
    
    return value
  }
}

// 服务层
class UserService {
  constructor(repository, cache) {
    this.repository = repository
    this.cache = cache
  }
  
  async getUserById(id) {
    const cacheKey = `user:${id}`
    
    return this.cache.getOrSet(cacheKey, async () => {
      const user = await this.repository.findById(id)
      if (user) {
        delete user.password // 不缓存密码
      }
      return user
    }, 300) // 5分钟缓存
  }
  
  async createUser(userData) {
    const user = await this.repository.create(userData)
    
    // 缓存新用户
    const cacheKey = `user:${user.id}`
    const userWithoutPassword = { ...user }
    delete userWithoutPassword.password
    this.cache.set(cacheKey, userWithoutPassword)
    
    return userWithoutPassword
  }
  
  async updateUser(id, userData) {
    const user = await this.repository.update(id, userData)
    
    if (user) {
      // 更新缓存
      const cacheKey = `user:${id}`
      const userWithoutPassword = { ...user }
      delete userWithoutPassword.password
      this.cache.set(cacheKey, userWithoutPassword)
    }
    
    return user
  }
  
  async deleteUser(id) {
    const deleted = await this.repository.delete(id)
    
    if (deleted) {
      // 清除缓存
      this.cache.delete(`user:${id}`)
    }
    
    return deleted
  }
  
  async getUsers(options) {
    const cacheKey = `users:${JSON.stringify(options)}`
    
    return this.cache.getOrSet(cacheKey, async () => {
      const [users, total] = await Promise.all([
        this.repository.findMany(options),
        this.repository.count(options)
      ])
      
      // 移除密码字段
      const safeUsers = users.map(user => {
        const { password, ...safeUser } = user
        return safeUser
      })
      
      return {
        users: safeUsers,
        total,
        pagination: {
          limit: options.limit || 10,
          offset: options.offset || 0,
          total
        }
      }
    }, 60) // 1分钟缓存
  }
}

// 初始化
const database = new ServerlessDatabase({
  connectionString: process.env.DATABASE_URL,
  ssl: process.env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false,
  maxConnections: 5
})

const cache = new CacheManager({
  ttl: 300,
  prefix: 'myapp:'
})

const userRepository = new UserRepository(database)
const userService = new UserService(userRepository, cache)

// 导出服务
module.exports = {
  database,
  cache,
  userService,
  UserService,
  UserRepository,
  CacheManager,
  ServerlessDatabase
}

总结

Serverless架构的核心要点:

  1. 函数即服务:无服务器计算、事件驱动、按需扩展
  2. 事件驱动:异步处理、消息队列、重试机制
  3. 数据库集成:连接池管理、缓存策略、事务处理
  4. 成本优化:按使用付费、冷启动优化、资源管理
  5. 监控运维:日志聚合、性能监控、错误追踪

Serverless架构为现代应用开发提供了新的可能性,通过合理的设计和实践,可以构建出高可用、高性能、低成本的云原生应用。