发布于

Node.js实时应用开发:WebSocket与Socket.io构建实时通信系统

作者

Node.js实时应用开发:WebSocket与Socket.io构建实时通信系统

实时通信是现代Web应用的重要特性,Node.js凭借其事件驱动架构在实时应用开发方面具有天然优势。本文将详细介绍如何构建高性能的实时通信系统。

WebSocket基础和原生实现

项目环境搭建

{
  "name": "nodejs-realtime-app",
  "version": "1.0.0",
  "description": "Node.js Real-time Application with WebSocket and Socket.io",
  "main": "src/server.js",
  "scripts": {
    "start": "node src/server.js",
    "dev": "nodemon src/server.js",
    "test": "jest",
    "build": "webpack --mode production",
    "client": "cd client && npm start"
  },
  "dependencies": {
    "express": "^4.18.2",
    "socket.io": "^4.7.2",
    "ws": "^8.13.0",
    "redis": "^4.6.7",
    "mongoose": "^7.5.0",
    "jsonwebtoken": "^9.0.2",
    "bcryptjs": "^2.4.3",
    "cors": "^2.8.5",
    "helmet": "^7.0.0",
    "compression": "^1.7.4",
    "winston": "^3.10.0",
    "dotenv": "^16.3.1",
    "uuid": "^9.0.0",
    "joi": "^17.9.2"
  },
  "devDependencies": {
    "nodemon": "^3.0.1",
    "jest": "^29.6.2",
    "supertest": "^6.3.3",
    "webpack": "^5.88.2",
    "webpack-cli": "^5.1.4"
  }
}

原生WebSocket服务器实现

// src/websocket/nativeWebSocket.js - 原生WebSocket实现
const WebSocket = require('ws');
const http = require('http');
const url = require('url');
const jwt = require('jsonwebtoken');
const logger = require('../utils/logger');

class NativeWebSocketServer {
  constructor(server) {
    this.wss = new WebSocket.Server({
      server,
      verifyClient: this.verifyClient.bind(this)
    });
    
    this.clients = new Map(); // 存储客户端连接
    this.rooms = new Map();   // 存储房间信息
    
    this.setupEventHandlers();
  }

  // 客户端验证
  verifyClient(info) {
    try {
      const query = url.parse(info.req.url, true).query;
      const token = query.token;
      
      if (!token) {
        logger.warn('WebSocket connection rejected: No token provided');
        return false;
      }
      
      const decoded = jwt.verify(token, process.env.JWT_SECRET);
      info.req.user = decoded;
      
      return true;
    } catch (error) {
      logger.warn('WebSocket connection rejected: Invalid token', error.message);
      return false;
    }
  }

  setupEventHandlers() {
    this.wss.on('connection', (ws, req) => {
      const user = req.user;
      const clientId = this.generateClientId();
      
      // 存储客户端信息
      this.clients.set(clientId, {
        ws,
        user,
        rooms: new Set(),
        lastPing: Date.now()
      });
      
      logger.info(`WebSocket client connected: ${user.username} (${clientId})`);
      
      // 发送连接确认
      this.sendMessage(ws, {
        type: 'connection',
        data: {
          clientId,
          message: 'Connected successfully',
          timestamp: new Date().toISOString()
        }
      });
      
      // 设置消息处理
      ws.on('message', (data) => {
        this.handleMessage(clientId, data);
      });
      
      // 设置连接关闭处理
      ws.on('close', () => {
        this.handleDisconnection(clientId);
      });
      
      // 设置错误处理
      ws.on('error', (error) => {
        logger.error(`WebSocket error for client ${clientId}:`, error);
      });
      
      // 设置心跳检测
      ws.on('pong', () => {
        const client = this.clients.get(clientId);
        if (client) {
          client.lastPing = Date.now();
        }
      });
    });
    
    // 启动心跳检测
    this.startHeartbeat();
  }

  handleMessage(clientId, data) {
    try {
      const message = JSON.parse(data);
      const client = this.clients.get(clientId);
      
      if (!client) {
        logger.warn(`Message from unknown client: ${clientId}`);
        return;
      }
      
      logger.info(`Message from ${client.user.username}:`, message);
      
      switch (message.type) {
        case 'join_room':
          this.handleJoinRoom(clientId, message.data);
          break;
        case 'leave_room':
          this.handleLeaveRoom(clientId, message.data);
          break;
        case 'chat_message':
          this.handleChatMessage(clientId, message.data);
          break;
        case 'private_message':
          this.handlePrivateMessage(clientId, message.data);
          break;
        case 'typing':
          this.handleTyping(clientId, message.data);
          break;
        case 'ping':
          this.handlePing(clientId);
          break;
        default:
          logger.warn(`Unknown message type: ${message.type}`);
      }
    } catch (error) {
      logger.error(`Error handling message from client ${clientId}:`, error);
    }
  }

  handleJoinRoom(clientId, data) {
    const { roomId } = data;
    const client = this.clients.get(clientId);
    
    if (!client) return;
    
    // 添加客户端到房间
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }
    
    this.rooms.get(roomId).add(clientId);
    client.rooms.add(roomId);
    
    // 通知房间内其他用户
    this.broadcastToRoom(roomId, {
      type: 'user_joined',
      data: {
        user: client.user,
        roomId,
        timestamp: new Date().toISOString()
      }
    }, clientId);
    
    // 确认加入成功
    this.sendMessage(client.ws, {
      type: 'room_joined',
      data: {
        roomId,
        message: `Joined room ${roomId}`,
        timestamp: new Date().toISOString()
      }
    });
    
    logger.info(`Client ${client.user.username} joined room ${roomId}`);
  }

  handleLeaveRoom(clientId, data) {
    const { roomId } = data;
    const client = this.clients.get(clientId);
    
    if (!client) return;
    
    // 从房间移除客户端
    if (this.rooms.has(roomId)) {
      this.rooms.get(roomId).delete(clientId);
      
      // 如果房间为空,删除房间
      if (this.rooms.get(roomId).size === 0) {
        this.rooms.delete(roomId);
      }
    }
    
    client.rooms.delete(roomId);
    
    // 通知房间内其他用户
    this.broadcastToRoom(roomId, {
      type: 'user_left',
      data: {
        user: client.user,
        roomId,
        timestamp: new Date().toISOString()
      }
    });
    
    logger.info(`Client ${client.user.username} left room ${roomId}`);
  }

  handleChatMessage(clientId, data) {
    const { roomId, message } = data;
    const client = this.clients.get(clientId);
    
    if (!client || !client.rooms.has(roomId)) {
      return;
    }
    
    const chatMessage = {
      type: 'chat_message',
      data: {
        id: this.generateMessageId(),
        user: client.user,
        message,
        roomId,
        timestamp: new Date().toISOString()
      }
    };
    
    // 广播消息到房间
    this.broadcastToRoom(roomId, chatMessage);
    
    logger.info(`Chat message in room ${roomId} from ${client.user.username}: ${message}`);
  }

  handlePrivateMessage(clientId, data) {
    const { targetUserId, message } = data;
    const client = this.clients.get(clientId);
    
    if (!client) return;
    
    // 查找目标用户
    const targetClient = Array.from(this.clients.values())
      .find(c => c.user.id === targetUserId);
    
    if (!targetClient) {
      this.sendMessage(client.ws, {
        type: 'error',
        data: {
          message: 'Target user not found or offline',
          timestamp: new Date().toISOString()
        }
      });
      return;
    }
    
    const privateMessage = {
      type: 'private_message',
      data: {
        id: this.generateMessageId(),
        from: client.user,
        message,
        timestamp: new Date().toISOString()
      }
    };
    
    // 发送给目标用户
    this.sendMessage(targetClient.ws, privateMessage);
    
    // 发送确认给发送者
    this.sendMessage(client.ws, {
      type: 'message_sent',
      data: {
        targetUser: targetClient.user,
        message,
        timestamp: new Date().toISOString()
      }
    });
    
    logger.info(`Private message from ${client.user.username} to ${targetClient.user.username}`);
  }

  handleTyping(clientId, data) {
    const { roomId, isTyping } = data;
    const client = this.clients.get(clientId);
    
    if (!client || !client.rooms.has(roomId)) return;
    
    this.broadcastToRoom(roomId, {
      type: 'typing',
      data: {
        user: client.user,
        roomId,
        isTyping,
        timestamp: new Date().toISOString()
      }
    }, clientId);
  }

  handlePing(clientId) {
    const client = this.clients.get(clientId);
    if (client) {
      this.sendMessage(client.ws, {
        type: 'pong',
        data: {
          timestamp: new Date().toISOString()
        }
      });
    }
  }

  handleDisconnection(clientId) {
    const client = this.clients.get(clientId);
    
    if (!client) return;
    
    // 从所有房间移除客户端
    client.rooms.forEach(roomId => {
      this.handleLeaveRoom(clientId, { roomId });
    });
    
    // 移除客户端
    this.clients.delete(clientId);
    
    logger.info(`Client ${client.user.username} disconnected (${clientId})`);
  }

  // 工具方法
  sendMessage(ws, message) {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify(message));
    }
  }

  broadcastToRoom(roomId, message, excludeClientId = null) {
    const room = this.rooms.get(roomId);
    if (!room) return;
    
    room.forEach(clientId => {
      if (clientId !== excludeClientId) {
        const client = this.clients.get(clientId);
        if (client) {
          this.sendMessage(client.ws, message);
        }
      }
    });
  }

  broadcastToAll(message, excludeClientId = null) {
    this.clients.forEach((client, clientId) => {
      if (clientId !== excludeClientId) {
        this.sendMessage(client.ws, message);
      }
    });
  }

  generateClientId() {
    return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  generateMessageId() {
    return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  startHeartbeat() {
    setInterval(() => {
      const now = Date.now();
      const timeout = 30000; // 30秒超时
      
      this.clients.forEach((client, clientId) => {
        if (now - client.lastPing > timeout) {
          logger.info(`Client ${clientId} timed out, closing connection`);
          client.ws.terminate();
          this.handleDisconnection(clientId);
        } else {
          // 发送心跳
          if (client.ws.readyState === WebSocket.OPEN) {
            client.ws.ping();
          }
        }
      });
    }, 15000); // 每15秒检查一次
  }

  // 获取统计信息
  getStats() {
    return {
      connectedClients: this.clients.size,
      activeRooms: this.rooms.size,
      roomDetails: Array.from(this.rooms.entries()).map(([roomId, clients]) => ({
        roomId,
        clientCount: clients.size
      }))
    };
  }
}

module.exports = NativeWebSocketServer;

Socket.io高级实现

// src/socketio/socketServer.js - Socket.io实现
const socketIo = require('socket.io');
const jwt = require('jsonwebtoken');
const logger = require('../utils/logger');
const RedisAdapter = require('./redisAdapter');

class SocketIOServer {
  constructor(server) {
    this.io = socketIo(server, {
      cors: {
        origin: process.env.CLIENT_URL || "http://localhost:3000",
        methods: ["GET", "POST"],
        credentials: true
      },
      transports: ['websocket', 'polling']
    });
    
    // 设置Redis适配器用于集群支持
    if (process.env.REDIS_URL) {
      this.io.adapter(RedisAdapter.createAdapter());
    }
    
    this.connectedUsers = new Map();
    this.userRooms = new Map();
    
    this.setupMiddleware();
    this.setupEventHandlers();
  }

  setupMiddleware() {
    // 认证中间件
    this.io.use((socket, next) => {
      try {
        const token = socket.handshake.auth.token || socket.handshake.query.token;
        
        if (!token) {
          return next(new Error('Authentication error: No token provided'));
        }
        
        const decoded = jwt.verify(token, process.env.JWT_SECRET);
        socket.user = decoded;
        
        logger.info(`Socket.io authentication successful for user: ${decoded.username}`);
        next();
      } catch (error) {
        logger.warn('Socket.io authentication failed:', error.message);
        next(new Error('Authentication error: Invalid token'));
      }
    });

    // 速率限制中间件
    this.io.use((socket, next) => {
      const rateLimiter = new Map();
      
      socket.use((packet, next) => {
        const now = Date.now();
        const userId = socket.user.id;
        const userLimit = rateLimiter.get(userId) || { count: 0, resetTime: now + 60000 };
        
        if (now > userLimit.resetTime) {
          userLimit.count = 0;
          userLimit.resetTime = now + 60000;
        }
        
        if (userLimit.count >= 100) { // 每分钟最多100个消息
          return next(new Error('Rate limit exceeded'));
        }
        
        userLimit.count++;
        rateLimiter.set(userId, userLimit);
        next();
      });
      
      next();
    });
  }

  setupEventHandlers() {
    this.io.on('connection', (socket) => {
      const user = socket.user;
      
      // 存储用户连接
      this.connectedUsers.set(user.id, {
        socket,
        user,
        rooms: new Set(),
        status: 'online',
        lastActivity: Date.now()
      });
      
      logger.info(`Socket.io client connected: ${user.username} (${socket.id})`);
      
      // 发送连接确认
      socket.emit('connected', {
        message: 'Connected successfully',
        user: user,
        timestamp: new Date().toISOString()
      });
      
      // 广播用户上线状态
      socket.broadcast.emit('user_status', {
        user: user,
        status: 'online',
        timestamp: new Date().toISOString()
      });
      
      this.setupSocketEvents(socket);
    });
  }

  setupSocketEvents(socket) {
    const user = socket.user;
    
    // 加入房间
    socket.on('join_room', async (data) => {
      try {
        const { roomId, roomType = 'public' } = data;
        
        await socket.join(roomId);
        
        // 更新用户房间信息
        const userConnection = this.connectedUsers.get(user.id);
        if (userConnection) {
          userConnection.rooms.add(roomId);
        }
        
        // 获取房间信息
        const roomInfo = await this.getRoomInfo(roomId);
        
        // 通知房间内其他用户
        socket.to(roomId).emit('user_joined', {
          user: user,
          roomId: roomId,
          roomInfo: roomInfo,
          timestamp: new Date().toISOString()
        });
        
        // 确认加入成功
        socket.emit('room_joined', {
          roomId: roomId,
          roomInfo: roomInfo,
          message: `Joined room ${roomId}`,
          timestamp: new Date().toISOString()
        });
        
        logger.info(`User ${user.username} joined room ${roomId}`);
      } catch (error) {
        logger.error('Error joining room:', error);
        socket.emit('error', { message: 'Failed to join room' });
      }
    });
    
    // 离开房间
    socket.on('leave_room', async (data) => {
      try {
        const { roomId } = data;
        
        await socket.leave(roomId);
        
        // 更新用户房间信息
        const userConnection = this.connectedUsers.get(user.id);
        if (userConnection) {
          userConnection.rooms.delete(roomId);
        }
        
        // 通知房间内其他用户
        socket.to(roomId).emit('user_left', {
          user: user,
          roomId: roomId,
          timestamp: new Date().toISOString()
        });
        
        socket.emit('room_left', {
          roomId: roomId,
          message: `Left room ${roomId}`,
          timestamp: new Date().toISOString()
        });
        
        logger.info(`User ${user.username} left room ${roomId}`);
      } catch (error) {
        logger.error('Error leaving room:', error);
        socket.emit('error', { message: 'Failed to leave room' });
      }
    });
    
    // 聊天消息
    socket.on('chat_message', async (data) => {
      try {
        const { roomId, message, messageType = 'text' } = data;
        
        // 验证用户是否在房间中
        const rooms = Array.from(socket.rooms);
        if (!rooms.includes(roomId)) {
          socket.emit('error', { message: 'You are not in this room' });
          return;
        }
        
        const chatMessage = {
          id: this.generateMessageId(),
          user: user,
          message: message,
          messageType: messageType,
          roomId: roomId,
          timestamp: new Date().toISOString()
        };
        
        // 保存消息到数据库(可选)
        await this.saveMessage(chatMessage);
        
        // 广播消息到房间
        this.io.to(roomId).emit('chat_message', chatMessage);
        
        logger.info(`Chat message in room ${roomId} from ${user.username}`);
      } catch (error) {
        logger.error('Error handling chat message:', error);
        socket.emit('error', { message: 'Failed to send message' });
      }
    });
    
    // 私聊消息
    socket.on('private_message', async (data) => {
      try {
        const { targetUserId, message, messageType = 'text' } = data;
        
        const targetConnection = this.connectedUsers.get(targetUserId);
        
        if (!targetConnection) {
          socket.emit('error', { message: 'Target user is offline' });
          return;
        }
        
        const privateMessage = {
          id: this.generateMessageId(),
          from: user,
          to: targetConnection.user,
          message: message,
          messageType: messageType,
          timestamp: new Date().toISOString()
        };
        
        // 保存私聊消息
        await this.savePrivateMessage(privateMessage);
        
        // 发送给目标用户
        targetConnection.socket.emit('private_message', privateMessage);
        
        // 发送确认给发送者
        socket.emit('message_sent', {
          targetUser: targetConnection.user,
          message: privateMessage,
          timestamp: new Date().toISOString()
        });
        
        logger.info(`Private message from ${user.username} to ${targetConnection.user.username}`);
      } catch (error) {
        logger.error('Error handling private message:', error);
        socket.emit('error', { message: 'Failed to send private message' });
      }
    });
    
    // 输入状态
    socket.on('typing', (data) => {
      const { roomId, isTyping } = data;
      
      socket.to(roomId).emit('typing', {
        user: user,
        roomId: roomId,
        isTyping: isTyping,
        timestamp: new Date().toISOString()
      });
    });
    
    // 用户状态更新
    socket.on('status_update', (data) => {
      const { status } = data;
      
      const userConnection = this.connectedUsers.get(user.id);
      if (userConnection) {
        userConnection.status = status;
        userConnection.lastActivity = Date.now();
      }
      
      // 广播状态更新
      socket.broadcast.emit('user_status', {
        user: user,
        status: status,
        timestamp: new Date().toISOString()
      });
    });
    
    // 文件分享
    socket.on('file_share', async (data) => {
      try {
        const { roomId, fileInfo } = data;
        
        const fileMessage = {
          id: this.generateMessageId(),
          user: user,
          messageType: 'file',
          fileInfo: fileInfo,
          roomId: roomId,
          timestamp: new Date().toISOString()
        };
        
        // 保存文件消息
        await this.saveMessage(fileMessage);
        
        // 广播文件分享
        this.io.to(roomId).emit('file_shared', fileMessage);
        
        logger.info(`File shared in room ${roomId} by ${user.username}: ${fileInfo.name}`);
      } catch (error) {
        logger.error('Error handling file share:', error);
        socket.emit('error', { message: 'Failed to share file' });
      }
    });
    
    // 断开连接处理
    socket.on('disconnect', (reason) => {
      logger.info(`Socket.io client disconnected: ${user.username} (${socket.id}), reason: ${reason}`);
      
      // 移除用户连接
      this.connectedUsers.delete(user.id);
      
      // 广播用户离线状态
      socket.broadcast.emit('user_status', {
        user: user,
        status: 'offline',
        timestamp: new Date().toISOString()
      });
    });
    
    // 错误处理
    socket.on('error', (error) => {
      logger.error(`Socket.io error for user ${user.username}:`, error);
    });
  }

  // 工具方法
  async getRoomInfo(roomId) {
    const sockets = await this.io.in(roomId).fetchSockets();
    return {
      id: roomId,
      userCount: sockets.length,
      users: sockets.map(socket => socket.user)
    };
  }

  async saveMessage(message) {
    // 这里可以实现消息持久化逻辑
    // 例如保存到MongoDB或其他数据库
    logger.info('Message saved:', message.id);
  }

  async savePrivateMessage(message) {
    // 实现私聊消息持久化
    logger.info('Private message saved:', message.id);
  }

  generateMessageId() {
    return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  // 获取在线用户列表
  getOnlineUsers() {
    return Array.from(this.connectedUsers.values()).map(conn => ({
      user: conn.user,
      status: conn.status,
      lastActivity: conn.lastActivity
    }));
  }

  // 获取服务器统计信息
  async getServerStats() {
    const sockets = await this.io.fetchSockets();
    return {
      connectedClients: sockets.length,
      onlineUsers: this.connectedUsers.size,
      totalRooms: this.io.sockets.adapter.rooms.size
    };
  }

  // 向特定用户发送消息
  sendToUser(userId, event, data) {
    const userConnection = this.connectedUsers.get(userId);
    if (userConnection) {
      userConnection.socket.emit(event, data);
      return true;
    }
    return false;
  }

  // 向所有用户广播消息
  broadcast(event, data) {
    this.io.emit(event, data);
  }

  // 向房间广播消息
  broadcastToRoom(roomId, event, data) {
    this.io.to(roomId).emit(event, data);
  }
}

module.exports = SocketIOServer;

Redis适配器和集群支持

// src/socketio/redisAdapter.js - Redis适配器
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const logger = require('../utils/logger');

class RedisAdapter {
  static createAdapter() {
    try {
      const pubClient = createClient({
        url: process.env.REDIS_URL
      });
      
      const subClient = pubClient.duplicate();
      
      pubClient.on('error', (err) => {
        logger.error('Redis pub client error:', err);
      });
      
      subClient.on('error', (err) => {
        logger.error('Redis sub client error:', err);
      });
      
      pubClient.on('connect', () => {
        logger.info('Redis pub client connected');
      });
      
      subClient.on('connect', () => {
        logger.info('Redis sub client connected');
      });
      
      return createAdapter(pubClient, subClient);
    } catch (error) {
      logger.error('Failed to create Redis adapter:', error);
      throw error;
    }
  }
}

module.exports = RedisAdapter;

总结

Node.js实时应用开发的核心要点:

🎯 技术选择

  1. WebSocket:原生实时通信协议
  2. Socket.io:功能丰富的实时通信库
  3. Redis:集群支持和消息持久化
  4. JWT:安全的用户认证机制

✅ 核心功能

  • 实时聊天和私聊
  • 房间管理和用户状态
  • 文件分享和多媒体支持
  • 心跳检测和断线重连

🚀 高级特性

  • 集群部署和负载均衡
  • 消息持久化和历史记录
  • 速率限制和安全防护
  • 性能监控和统计分析

💡 最佳实践

  • 认证和授权机制
  • 错误处理和日志记录
  • 内存管理和性能优化
  • 可扩展的架构设计

掌握Node.js实时应用开发,构建高性能通信系统!


实时通信是现代应用的核心功能,Node.js的事件驱动特性使其在这一领域具有天然优势,通过合理的架构设计可以构建出高性能、可扩展的实时应用系统。