- 发布于
Serverless架构实战指南:从函数计算到全栈应用
- 作者

- 姓名
- 全能波
- GitHub
- @weicracker
Serverless架构实战指南:从函数计算到全栈应用
Serverless架构代表了云计算的新范式,让开发者专注于业务逻辑而无需管理服务器基础设施。本文将深入探讨Serverless的核心概念和实战应用。
Serverless基础概念
函数即服务(FaaS)
// AWS Lambda函数示例
// handler.js
exports.handler = async (event, context) => {
try {
// 解析请求
const { httpMethod, path, queryStringParameters, body } = event
const requestBody = body ? JSON.parse(body) : {}
// 路由处理
const router = new Router()
const response = await router.handle(httpMethod, path, {
query: queryStringParameters || {},
body: requestBody,
headers: event.headers,
context
})
return {
statusCode: response.statusCode || 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type,Authorization',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS',
...response.headers
},
body: JSON.stringify(response.body)
}
} catch (error) {
console.error('Lambda error:', error)
return {
statusCode: 500,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
body: JSON.stringify({
error: 'Internal Server Error',
message: process.env.NODE_ENV === 'development' ? error.message : 'Something went wrong'
})
}
}
}
// 路由器类
class Router {
constructor() {
this.routes = new Map()
this.middleware = []
this.setupRoutes()
}
setupRoutes() {
// 用户相关路由
this.get('/api/users', this.getUsers.bind(this))
this.post('/api/users', this.createUser.bind(this))
this.get('/api/users/:id', this.getUser.bind(this))
this.put('/api/users/:id', this.updateUser.bind(this))
this.delete('/api/users/:id', this.deleteUser.bind(this))
// 认证路由
this.post('/api/auth/login', this.login.bind(this))
this.post('/api/auth/register', this.register.bind(this))
this.post('/api/auth/refresh', this.refreshToken.bind(this))
// 文件上传
this.post('/api/upload', this.uploadFile.bind(this))
// 健康检查
this.get('/api/health', this.healthCheck.bind(this))
}
// 添加中间件
use(middleware) {
this.middleware.push(middleware)
}
// HTTP方法注册
get(path, handler) {
this.addRoute('GET', path, handler)
}
post(path, handler) {
this.addRoute('POST', path, handler)
}
put(path, handler) {
this.addRoute('PUT', path, handler)
}
delete(path, handler) {
this.addRoute('DELETE', path, handler)
}
addRoute(method, path, handler) {
const key = `${method}:${path}`
this.routes.set(key, {
method,
path,
handler,
params: this.extractParams(path)
})
}
// 提取路径参数
extractParams(path) {
const params = []
const parts = path.split('/')
parts.forEach((part, index) => {
if (part.startsWith(':')) {
params.push({
name: part.slice(1),
index
})
}
})
return params
}
// 处理请求
async handle(method, path, request) {
try {
// 应用中间件
for (const middleware of this.middleware) {
const result = await middleware(request)
if (result && result.statusCode) {
return result
}
}
// 查找匹配的路由
const route = this.findRoute(method, path)
if (!route) {
return {
statusCode: 404,
body: { error: 'Not Found' }
}
}
// 提取路径参数
const params = this.extractPathParams(route, path)
request.params = params
// 执行处理器
const result = await route.handler(request)
return result
} catch (error) {
console.error('Router error:', error)
return {
statusCode: 500,
body: { error: 'Internal Server Error' }
}
}
}
// 查找路由
findRoute(method, path) {
// 精确匹配
const exactKey = `${method}:${path}`
if (this.routes.has(exactKey)) {
return this.routes.get(exactKey)
}
// 参数匹配
for (const [key, route] of this.routes) {
if (route.method === method && this.matchPath(route.path, path)) {
return route
}
}
return null
}
// 路径匹配
matchPath(routePath, requestPath) {
const routeParts = routePath.split('/')
const requestParts = requestPath.split('/')
if (routeParts.length !== requestParts.length) {
return false
}
return routeParts.every((part, index) => {
return part.startsWith(':') || part === requestParts[index]
})
}
// 提取路径参数值
extractPathParams(route, path) {
const params = {}
const routeParts = route.path.split('/')
const pathParts = path.split('/')
routeParts.forEach((part, index) => {
if (part.startsWith(':')) {
const paramName = part.slice(1)
params[paramName] = pathParts[index]
}
})
return params
}
// 路由处理器
async getUsers(request) {
const { query } = request
const page = parseInt(query.page) || 1
const limit = parseInt(query.limit) || 10
const search = query.search || ''
try {
const users = await UserService.getUsers({ page, limit, search })
return {
statusCode: 200,
body: users
}
} catch (error) {
return {
statusCode: 500,
body: { error: error.message }
}
}
}
async createUser(request) {
const { body } = request
try {
// 验证输入
const validation = UserValidator.validate(body)
if (!validation.isValid) {
return {
statusCode: 400,
body: { error: 'Validation failed', details: validation.errors }
}
}
const user = await UserService.createUser(body)
return {
statusCode: 201,
body: user
}
} catch (error) {
return {
statusCode: 500,
body: { error: error.message }
}
}
}
async getUser(request) {
const { params } = request
const userId = params.id
try {
const user = await UserService.getUserById(userId)
if (!user) {
return {
statusCode: 404,
body: { error: 'User not found' }
}
}
return {
statusCode: 200,
body: user
}
} catch (error) {
return {
statusCode: 500,
body: { error: error.message }
}
}
}
async updateUser(request) {
const { params, body } = request
const userId = params.id
try {
const user = await UserService.updateUser(userId, body)
if (!user) {
return {
statusCode: 404,
body: { error: 'User not found' }
}
}
return {
statusCode: 200,
body: user
}
} catch (error) {
return {
statusCode: 500,
body: { error: error.message }
}
}
}
async deleteUser(request) {
const { params } = request
const userId = params.id
try {
const deleted = await UserService.deleteUser(userId)
if (!deleted) {
return {
statusCode: 404,
body: { error: 'User not found' }
}
}
return {
statusCode: 204,
body: null
}
} catch (error) {
return {
statusCode: 500,
body: { error: error.message }
}
}
}
async login(request) {
const { body } = request
const { email, password } = body
try {
const result = await AuthService.login(email, password)
return {
statusCode: 200,
body: result
}
} catch (error) {
return {
statusCode: 401,
body: { error: error.message }
}
}
}
async register(request) {
const { body } = request
try {
const user = await AuthService.register(body)
return {
statusCode: 201,
body: user
}
} catch (error) {
return {
statusCode: 400,
body: { error: error.message }
}
}
}
async uploadFile(request) {
const { body, headers } = request
try {
const result = await FileService.uploadFile(body, headers)
return {
statusCode: 200,
body: result
}
} catch (error) {
return {
statusCode: 500,
body: { error: error.message }
}
}
}
async healthCheck(request) {
return {
statusCode: 200,
body: {
status: 'healthy',
timestamp: new Date().toISOString(),
version: process.env.VERSION || '1.0.0'
}
}
}
}
// 用户服务
class UserService {
static async getUsers({ page = 1, limit = 10, search = '' }) {
const db = await DatabaseConnection.getInstance()
let query = 'SELECT * FROM users WHERE 1=1'
const params = []
if (search) {
query += ' AND (name ILIKE $' + (params.length + 1) + ' OR email ILIKE $' + (params.length + 2) + ')'
params.push(`%${search}%`, `%${search}%`)
}
query += ' ORDER BY created_at DESC LIMIT $' + (params.length + 1) + ' OFFSET $' + (params.length + 2)
params.push(limit, (page - 1) * limit)
const result = await db.query(query, params)
// 获取总数
let countQuery = 'SELECT COUNT(*) FROM users WHERE 1=1'
const countParams = []
if (search) {
countQuery += ' AND (name ILIKE $1 OR email ILIKE $2)'
countParams.push(`%${search}%`, `%${search}%`)
}
const countResult = await db.query(countQuery, countParams)
const total = parseInt(countResult.rows[0].count)
return {
users: result.rows,
pagination: {
page,
limit,
total,
pages: Math.ceil(total / limit)
}
}
}
static async getUserById(id) {
const db = await DatabaseConnection.getInstance()
const result = await db.query('SELECT * FROM users WHERE id = $1', [id])
return result.rows[0] || null
}
static async createUser(userData) {
const db = await DatabaseConnection.getInstance()
const { name, email, password } = userData
// 检查邮箱是否已存在
const existing = await db.query('SELECT id FROM users WHERE email = $1', [email])
if (existing.rows.length > 0) {
throw new Error('Email already exists')
}
// 哈希密码
const hashedPassword = await bcrypt.hash(password, 10)
const result = await db.query(
'INSERT INTO users (name, email, password, created_at) VALUES ($1, $2, $3, NOW()) RETURNING *',
[name, email, hashedPassword]
)
const user = result.rows[0]
delete user.password // 不返回密码
return user
}
static async updateUser(id, userData) {
const db = await DatabaseConnection.getInstance()
const { name, email } = userData
const result = await db.query(
'UPDATE users SET name = $1, email = $2, updated_at = NOW() WHERE id = $3 RETURNING *',
[name, email, id]
)
if (result.rows.length === 0) {
return null
}
const user = result.rows[0]
delete user.password
return user
}
static async deleteUser(id) {
const db = await DatabaseConnection.getInstance()
const result = await db.query('DELETE FROM users WHERE id = $1', [id])
return result.rowCount > 0
}
}
// 认证服务
class AuthService {
static async login(email, password) {
const db = await DatabaseConnection.getInstance()
const result = await db.query('SELECT * FROM users WHERE email = $1', [email])
if (result.rows.length === 0) {
throw new Error('Invalid credentials')
}
const user = result.rows[0]
const isValidPassword = await bcrypt.compare(password, user.password)
if (!isValidPassword) {
throw new Error('Invalid credentials')
}
// 生成JWT令牌
const token = jwt.sign(
{ userId: user.id, email: user.email },
process.env.JWT_SECRET,
{ expiresIn: '24h' }
)
const refreshToken = jwt.sign(
{ userId: user.id },
process.env.JWT_REFRESH_SECRET,
{ expiresIn: '7d' }
)
delete user.password
return {
user,
token,
refreshToken
}
}
static async register(userData) {
return UserService.createUser(userData)
}
static async refreshToken(refreshToken) {
try {
const decoded = jwt.verify(refreshToken, process.env.JWT_REFRESH_SECRET)
const user = await UserService.getUserById(decoded.userId)
if (!user) {
throw new Error('User not found')
}
const newToken = jwt.sign(
{ userId: user.id, email: user.email },
process.env.JWT_SECRET,
{ expiresIn: '24h' }
)
return { token: newToken }
} catch (error) {
throw new Error('Invalid refresh token')
}
}
}
// 数据库连接
class DatabaseConnection {
static instance = null
static async getInstance() {
if (!this.instance) {
const { Pool } = require('pg')
this.instance = new Pool({
connectionString: process.env.DATABASE_URL,
ssl: process.env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false
})
}
return this.instance
}
}
// 用户验证器
class UserValidator {
static validate(userData) {
const errors = []
const { name, email, password } = userData
if (!name || name.trim().length < 2) {
errors.push('Name must be at least 2 characters long')
}
if (!email || !this.isValidEmail(email)) {
errors.push('Valid email is required')
}
if (!password || password.length < 6) {
errors.push('Password must be at least 6 characters long')
}
return {
isValid: errors.length === 0,
errors
}
}
static isValidEmail(email) {
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/
return emailRegex.test(email)
}
}
// 文件服务
class FileService {
static async uploadFile(fileData, headers) {
const AWS = require('aws-sdk')
const s3 = new AWS.S3()
const contentType = headers['content-type'] || 'application/octet-stream'
const fileName = `uploads/${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
const params = {
Bucket: process.env.S3_BUCKET,
Key: fileName,
Body: Buffer.from(fileData, 'base64'),
ContentType: contentType,
ACL: 'public-read'
}
const result = await s3.upload(params).promise()
return {
url: result.Location,
key: result.Key,
bucket: result.Bucket
}
}
}
Vercel Functions示例
// api/users/index.js - Vercel API路由
import { NextApiRequest, NextApiResponse } from 'next'
import { PrismaClient } from '@prisma/client'
import jwt from 'jsonwebtoken'
const prisma = new PrismaClient()
// 中间件:认证检查
function withAuth(handler) {
return async (req, res) => {
try {
const token = req.headers.authorization?.replace('Bearer ', '')
if (!token) {
return res.status(401).json({ error: 'No token provided' })
}
const decoded = jwt.verify(token, process.env.JWT_SECRET)
req.user = decoded
return handler(req, res)
} catch (error) {
return res.status(401).json({ error: 'Invalid token' })
}
}
}
// 中间件:CORS
function withCORS(handler) {
return async (req, res) => {
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE,OPTIONS')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type,Authorization')
if (req.method === 'OPTIONS') {
return res.status(200).end()
}
return handler(req, res)
}
}
// 中间件:错误处理
function withErrorHandling(handler) {
return async (req, res) => {
try {
return await handler(req, res)
} catch (error) {
console.error('API Error:', error)
if (error.code === 'P2002') {
return res.status(400).json({ error: 'Duplicate entry' })
}
return res.status(500).json({
error: 'Internal Server Error',
message: process.env.NODE_ENV === 'development' ? error.message : undefined
})
}
}
}
// 组合中间件
function withMiddleware(handler, middlewares = []) {
return middlewares.reduce((acc, middleware) => middleware(acc), handler)
}
// 用户API处理器
async function usersHandler(req, res) {
const { method, query, body } = req
switch (method) {
case 'GET':
return handleGetUsers(req, res)
case 'POST':
return handleCreateUser(req, res)
default:
res.setHeader('Allow', ['GET', 'POST'])
return res.status(405).json({ error: `Method ${method} not allowed` })
}
}
async function handleGetUsers(req, res) {
const { page = 1, limit = 10, search = '' } = req.query
const skip = (parseInt(page) - 1) * parseInt(limit)
const take = parseInt(limit)
const where = search ? {
OR: [
{ name: { contains: search, mode: 'insensitive' } },
{ email: { contains: search, mode: 'insensitive' } }
]
} : {}
const [users, total] = await Promise.all([
prisma.user.findMany({
where,
skip,
take,
select: {
id: true,
name: true,
email: true,
createdAt: true,
updatedAt: true
},
orderBy: { createdAt: 'desc' }
}),
prisma.user.count({ where })
])
return res.status(200).json({
users,
pagination: {
page: parseInt(page),
limit: parseInt(limit),
total,
pages: Math.ceil(total / parseInt(limit))
}
})
}
async function handleCreateUser(req, res) {
const { name, email, password } = req.body
// 验证输入
if (!name || !email || !password) {
return res.status(400).json({ error: 'Name, email, and password are required' })
}
if (password.length < 6) {
return res.status(400).json({ error: 'Password must be at least 6 characters long' })
}
// 检查邮箱是否已存在
const existingUser = await prisma.user.findUnique({
where: { email }
})
if (existingUser) {
return res.status(400).json({ error: 'Email already exists' })
}
// 创建用户
const bcrypt = require('bcryptjs')
const hashedPassword = await bcrypt.hash(password, 10)
const user = await prisma.user.create({
data: {
name,
email,
password: hashedPassword
},
select: {
id: true,
name: true,
email: true,
createdAt: true
}
})
return res.status(201).json(user)
}
// 导出处理器
export default withMiddleware(usersHandler, [
withCORS,
withErrorHandling,
withAuth
])
// api/users/[id].js - 动态路由
export default withMiddleware(async (req, res) => {
const { method, query } = req
const { id } = query
switch (method) {
case 'GET':
return handleGetUser(req, res, id)
case 'PUT':
return handleUpdateUser(req, res, id)
case 'DELETE':
return handleDeleteUser(req, res, id)
default:
res.setHeader('Allow', ['GET', 'PUT', 'DELETE'])
return res.status(405).json({ error: `Method ${method} not allowed` })
}
}, [withCORS, withErrorHandling, withAuth])
async function handleGetUser(req, res, id) {
const user = await prisma.user.findUnique({
where: { id },
select: {
id: true,
name: true,
email: true,
createdAt: true,
updatedAt: true
}
})
if (!user) {
return res.status(404).json({ error: 'User not found' })
}
return res.status(200).json(user)
}
async function handleUpdateUser(req, res, id) {
const { name, email } = req.body
const user = await prisma.user.update({
where: { id },
data: { name, email },
select: {
id: true,
name: true,
email: true,
updatedAt: true
}
})
return res.status(200).json(user)
}
async function handleDeleteUser(req, res, id) {
await prisma.user.delete({
where: { id }
})
return res.status(204).end()
}
事件驱动架构
事件处理系统
// 事件驱动的Serverless架构
class EventProcessor {
constructor() {
this.handlers = new Map()
this.middleware = []
this.deadLetterQueue = []
}
// 注册事件处理器
on(eventType, handler) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, [])
}
this.handlers.get(eventType).push(handler)
}
// 添加中间件
use(middleware) {
this.middleware.push(middleware)
}
// 处理事件
async process(event) {
try {
// 应用中间件
let processedEvent = event
for (const middleware of this.middleware) {
processedEvent = await middleware(processedEvent)
}
const { eventType } = processedEvent
const handlers = this.handlers.get(eventType) || []
if (handlers.length === 0) {
console.warn(`No handlers found for event type: ${eventType}`)
return
}
// 并行处理所有处理器
const results = await Promise.allSettled(
handlers.map(handler => handler(processedEvent))
)
// 检查失败的处理器
const failures = results.filter(result => result.status === 'rejected')
if (failures.length > 0) {
console.error('Some event handlers failed:', failures)
await this.handleFailures(processedEvent, failures)
}
return results
} catch (error) {
console.error('Event processing error:', error)
await this.sendToDeadLetterQueue(event, error)
throw error
}
}
// 处理失败
async handleFailures(event, failures) {
// 重试逻辑
const retryableFailures = failures.filter(failure =>
this.isRetryable(failure.reason)
)
if (retryableFailures.length > 0) {
await this.scheduleRetry(event, retryableFailures)
}
}
// 判断是否可重试
isRetryable(error) {
const retryableErrors = [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND'
]
return retryableErrors.some(code =>
error.code === code || error.message.includes(code)
)
}
// 调度重试
async scheduleRetry(event, failures) {
// 使用AWS SQS或其他消息队列进行重试
const retryEvent = {
...event,
retryCount: (event.retryCount || 0) + 1,
retryAt: Date.now() + (Math.pow(2, event.retryCount || 0) * 1000) // 指数退避
}
if (retryEvent.retryCount <= 3) {
await this.sendToRetryQueue(retryEvent)
} else {
await this.sendToDeadLetterQueue(event, new Error('Max retries exceeded'))
}
}
// 发送到重试队列
async sendToRetryQueue(event) {
// 实现重试队列逻辑
console.log('Sending to retry queue:', event)
}
// 发送到死信队列
async sendToDeadLetterQueue(event, error) {
this.deadLetterQueue.push({
event,
error: error.message,
timestamp: new Date().toISOString()
})
console.error('Event sent to dead letter queue:', { event, error: error.message })
}
}
// 用户事件处理器
class UserEventHandlers {
static async handleUserCreated(event) {
const { user } = event.data
// 发送欢迎邮件
await EmailService.sendWelcomeEmail(user)
// 创建用户配置文件
await ProfileService.createProfile(user.id)
// 记录分析事件
await AnalyticsService.track('user_created', {
userId: user.id,
email: user.email,
timestamp: event.timestamp
})
console.log(`User created event processed for user: ${user.id}`)
}
static async handleUserUpdated(event) {
const { user, changes } = event.data
// 如果邮箱发生变化,发送确认邮件
if (changes.email) {
await EmailService.sendEmailChangeConfirmation(user)
}
// 更新搜索索引
await SearchService.updateUserIndex(user)
// 记录分析事件
await AnalyticsService.track('user_updated', {
userId: user.id,
changes: Object.keys(changes),
timestamp: event.timestamp
})
console.log(`User updated event processed for user: ${user.id}`)
}
static async handleUserDeleted(event) {
const { userId } = event.data
// 清理用户相关数据
await ProfileService.deleteProfile(userId)
await FileService.deleteUserFiles(userId)
await SearchService.removeFromIndex(userId)
// 记录分析事件
await AnalyticsService.track('user_deleted', {
userId,
timestamp: event.timestamp
})
console.log(`User deleted event processed for user: ${userId}`)
}
}
// 订单事件处理器
class OrderEventHandlers {
static async handleOrderCreated(event) {
const { order } = event.data
// 发送订单确认邮件
await EmailService.sendOrderConfirmation(order)
// 更新库存
await InventoryService.reserveItems(order.items)
// 创建发货任务
await ShippingService.createShippingTask(order)
// 记录分析事件
await AnalyticsService.track('order_created', {
orderId: order.id,
userId: order.userId,
amount: order.total,
timestamp: event.timestamp
})
}
static async handleOrderPaid(event) {
const { order, payment } = event.data
// 发送支付确认
await EmailService.sendPaymentConfirmation(order, payment)
// 触发发货流程
await ShippingService.processShipping(order.id)
// 更新订单状态
await OrderService.updateStatus(order.id, 'paid')
// 记录分析事件
await AnalyticsService.track('order_paid', {
orderId: order.id,
paymentId: payment.id,
amount: payment.amount,
timestamp: event.timestamp
})
}
static async handleOrderShipped(event) {
const { order, tracking } = event.data
// 发送发货通知
await EmailService.sendShippingNotification(order, tracking)
// 发送短信通知
await SMSService.sendShippingNotification(order.phone, tracking.number)
// 更新订单状态
await OrderService.updateStatus(order.id, 'shipped')
// 记录分析事件
await AnalyticsService.track('order_shipped', {
orderId: order.id,
trackingNumber: tracking.number,
timestamp: event.timestamp
})
}
}
// 事件中间件
const loggingMiddleware = async (event) => {
console.log('Processing event:', {
type: event.eventType,
id: event.id,
timestamp: event.timestamp
})
return event
}
const validationMiddleware = async (event) => {
if (!event.eventType) {
throw new Error('Event type is required')
}
if (!event.id) {
event.id = generateEventId()
}
if (!event.timestamp) {
event.timestamp = new Date().toISOString()
}
return event
}
const enrichmentMiddleware = async (event) => {
// 添加元数据
event.metadata = {
source: 'serverless-function',
version: '1.0.0',
environment: process.env.NODE_ENV || 'development'
}
return event
}
// 初始化事件处理器
const eventProcessor = new EventProcessor()
// 添加中间件
eventProcessor.use(validationMiddleware)
eventProcessor.use(loggingMiddleware)
eventProcessor.use(enrichmentMiddleware)
// 注册事件处理器
eventProcessor.on('user.created', UserEventHandlers.handleUserCreated)
eventProcessor.on('user.updated', UserEventHandlers.handleUserUpdated)
eventProcessor.on('user.deleted', UserEventHandlers.handleUserDeleted)
eventProcessor.on('order.created', OrderEventHandlers.handleOrderCreated)
eventProcessor.on('order.paid', OrderEventHandlers.handleOrderPaid)
eventProcessor.on('order.shipped', OrderEventHandlers.handleOrderShipped)
// Lambda处理器
exports.eventHandler = async (event, context) => {
try {
// 处理SQS消息
if (event.Records) {
const results = await Promise.allSettled(
event.Records.map(record => {
const eventData = JSON.parse(record.body)
return eventProcessor.process(eventData)
})
)
return {
statusCode: 200,
body: JSON.stringify({
processed: results.length,
successful: results.filter(r => r.status === 'fulfilled').length,
failed: results.filter(r => r.status === 'rejected').length
})
}
}
// 处理直接事件
await eventProcessor.process(event)
return {
statusCode: 200,
body: JSON.stringify({ message: 'Event processed successfully' })
}
} catch (error) {
console.error('Event handler error:', error)
return {
statusCode: 500,
body: JSON.stringify({ error: error.message })
}
}
}
// 工具函数
function generateEventId() {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
数据库集成
Serverless数据库模式
// Serverless数据库连接管理
class ServerlessDatabase {
constructor(config) {
this.config = config
this.pool = null
this.connectionCount = 0
this.maxConnections = config.maxConnections || 5
}
async getConnection() {
if (!this.pool) {
await this.createPool()
}
if (this.connectionCount >= this.maxConnections) {
throw new Error('Connection pool exhausted')
}
this.connectionCount++
return this.pool
}
async createPool() {
const { Pool } = require('pg')
this.pool = new Pool({
connectionString: this.config.connectionString,
ssl: this.config.ssl,
max: this.maxConnections,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
})
// 连接事件监听
this.pool.on('connect', () => {
console.log('Database connected')
})
this.pool.on('error', (err) => {
console.error('Database error:', err)
})
}
async query(text, params) {
const client = await this.getConnection()
try {
const start = Date.now()
const result = await client.query(text, params)
const duration = Date.now() - start
console.log('Query executed:', { text, duration, rows: result.rowCount })
return result
} finally {
this.connectionCount--
}
}
async transaction(callback) {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
const result = await callback(client)
await client.query('COMMIT')
return result
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
async close() {
if (this.pool) {
await this.pool.end()
this.pool = null
}
}
}
// 数据访问层
class UserRepository {
constructor(database) {
this.db = database
}
async findById(id) {
const result = await this.db.query(
'SELECT * FROM users WHERE id = $1',
[id]
)
return result.rows[0] || null
}
async findByEmail(email) {
const result = await this.db.query(
'SELECT * FROM users WHERE email = $1',
[email]
)
return result.rows[0] || null
}
async create(userData) {
const { name, email, password } = userData
const result = await this.db.query(
`INSERT INTO users (name, email, password, created_at, updated_at)
VALUES ($1, $2, $3, NOW(), NOW())
RETURNING *`,
[name, email, password]
)
return result.rows[0]
}
async update(id, userData) {
const fields = []
const values = []
let paramCount = 1
Object.entries(userData).forEach(([key, value]) => {
if (value !== undefined) {
fields.push(`${key} = $${paramCount}`)
values.push(value)
paramCount++
}
})
if (fields.length === 0) {
throw new Error('No fields to update')
}
fields.push(`updated_at = NOW()`)
values.push(id)
const query = `
UPDATE users
SET ${fields.join(', ')}
WHERE id = $${paramCount}
RETURNING *
`
const result = await this.db.query(query, values)
return result.rows[0] || null
}
async delete(id) {
const result = await this.db.query(
'DELETE FROM users WHERE id = $1',
[id]
)
return result.rowCount > 0
}
async findMany(options = {}) {
const {
limit = 10,
offset = 0,
search = '',
orderBy = 'created_at',
orderDirection = 'DESC'
} = options
let query = 'SELECT * FROM users WHERE 1=1'
const params = []
let paramCount = 1
if (search) {
query += ` AND (name ILIKE $${paramCount} OR email ILIKE $${paramCount + 1})`
params.push(`%${search}%`, `%${search}%`)
paramCount += 2
}
query += ` ORDER BY ${orderBy} ${orderDirection}`
query += ` LIMIT $${paramCount} OFFSET $${paramCount + 1}`
params.push(limit, offset)
const result = await this.db.query(query, params)
return result.rows
}
async count(options = {}) {
const { search = '' } = options
let query = 'SELECT COUNT(*) FROM users WHERE 1=1'
const params = []
if (search) {
query += ' AND (name ILIKE $1 OR email ILIKE $2)'
params.push(`%${search}%`, `%${search}%`)
}
const result = await this.db.query(query, params)
return parseInt(result.rows[0].count)
}
}
// 缓存层
class CacheManager {
constructor(config = {}) {
this.config = {
ttl: config.ttl || 300, // 5分钟默认TTL
prefix: config.prefix || 'cache:',
...config
}
this.cache = new Map()
this.timers = new Map()
}
generateKey(key) {
return `${this.config.prefix}${key}`
}
set(key, value, ttl = this.config.ttl) {
const cacheKey = this.generateKey(key)
// 清除现有定时器
if (this.timers.has(cacheKey)) {
clearTimeout(this.timers.get(cacheKey))
}
// 设置缓存值
this.cache.set(cacheKey, {
value,
createdAt: Date.now(),
ttl
})
// 设置过期定时器
const timer = setTimeout(() => {
this.delete(key)
}, ttl * 1000)
this.timers.set(cacheKey, timer)
}
get(key) {
const cacheKey = this.generateKey(key)
const item = this.cache.get(cacheKey)
if (!item) {
return null
}
// 检查是否过期
const now = Date.now()
if (now - item.createdAt > item.ttl * 1000) {
this.delete(key)
return null
}
return item.value
}
delete(key) {
const cacheKey = this.generateKey(key)
// 清除定时器
if (this.timers.has(cacheKey)) {
clearTimeout(this.timers.get(cacheKey))
this.timers.delete(cacheKey)
}
// 删除缓存项
return this.cache.delete(cacheKey)
}
clear() {
// 清除所有定时器
this.timers.forEach(timer => clearTimeout(timer))
this.timers.clear()
// 清除所有缓存
this.cache.clear()
}
async getOrSet(key, factory, ttl) {
let value = this.get(key)
if (value === null) {
value = await factory()
this.set(key, value, ttl)
}
return value
}
}
// 服务层
class UserService {
constructor(repository, cache) {
this.repository = repository
this.cache = cache
}
async getUserById(id) {
const cacheKey = `user:${id}`
return this.cache.getOrSet(cacheKey, async () => {
const user = await this.repository.findById(id)
if (user) {
delete user.password // 不缓存密码
}
return user
}, 300) // 5分钟缓存
}
async createUser(userData) {
const user = await this.repository.create(userData)
// 缓存新用户
const cacheKey = `user:${user.id}`
const userWithoutPassword = { ...user }
delete userWithoutPassword.password
this.cache.set(cacheKey, userWithoutPassword)
return userWithoutPassword
}
async updateUser(id, userData) {
const user = await this.repository.update(id, userData)
if (user) {
// 更新缓存
const cacheKey = `user:${id}`
const userWithoutPassword = { ...user }
delete userWithoutPassword.password
this.cache.set(cacheKey, userWithoutPassword)
}
return user
}
async deleteUser(id) {
const deleted = await this.repository.delete(id)
if (deleted) {
// 清除缓存
this.cache.delete(`user:${id}`)
}
return deleted
}
async getUsers(options) {
const cacheKey = `users:${JSON.stringify(options)}`
return this.cache.getOrSet(cacheKey, async () => {
const [users, total] = await Promise.all([
this.repository.findMany(options),
this.repository.count(options)
])
// 移除密码字段
const safeUsers = users.map(user => {
const { password, ...safeUser } = user
return safeUser
})
return {
users: safeUsers,
total,
pagination: {
limit: options.limit || 10,
offset: options.offset || 0,
total
}
}
}, 60) // 1分钟缓存
}
}
// 初始化
const database = new ServerlessDatabase({
connectionString: process.env.DATABASE_URL,
ssl: process.env.NODE_ENV === 'production' ? { rejectUnauthorized: false } : false,
maxConnections: 5
})
const cache = new CacheManager({
ttl: 300,
prefix: 'myapp:'
})
const userRepository = new UserRepository(database)
const userService = new UserService(userRepository, cache)
// 导出服务
module.exports = {
database,
cache,
userService,
UserService,
UserRepository,
CacheManager,
ServerlessDatabase
}
总结
Serverless架构的核心要点:
- 函数即服务:无服务器计算、事件驱动、按需扩展
- 事件驱动:异步处理、消息队列、重试机制
- 数据库集成:连接池管理、缓存策略、事务处理
- 成本优化:按使用付费、冷启动优化、资源管理
- 监控运维:日志聚合、性能监控、错误追踪
Serverless架构为现代应用开发提供了新的可能性,通过合理的设计和实践,可以构建出高可用、高性能、低成本的云原生应用。