- 发布于
Node.js实时应用开发:WebSocket与Socket.io构建实时通信系统
- 作者

- 姓名
- 全能波
- GitHub
- @weicracker
Node.js实时应用开发:WebSocket与Socket.io构建实时通信系统
实时通信是现代Web应用的重要特性,Node.js凭借其事件驱动架构在实时应用开发方面具有天然优势。本文将详细介绍如何构建高性能的实时通信系统。
WebSocket基础和原生实现
项目环境搭建
{
"name": "nodejs-realtime-app",
"version": "1.0.0",
"description": "Node.js Real-time Application with WebSocket and Socket.io",
"main": "src/server.js",
"scripts": {
"start": "node src/server.js",
"dev": "nodemon src/server.js",
"test": "jest",
"build": "webpack --mode production",
"client": "cd client && npm start"
},
"dependencies": {
"express": "^4.18.2",
"socket.io": "^4.7.2",
"ws": "^8.13.0",
"redis": "^4.6.7",
"mongoose": "^7.5.0",
"jsonwebtoken": "^9.0.2",
"bcryptjs": "^2.4.3",
"cors": "^2.8.5",
"helmet": "^7.0.0",
"compression": "^1.7.4",
"winston": "^3.10.0",
"dotenv": "^16.3.1",
"uuid": "^9.0.0",
"joi": "^17.9.2"
},
"devDependencies": {
"nodemon": "^3.0.1",
"jest": "^29.6.2",
"supertest": "^6.3.3",
"webpack": "^5.88.2",
"webpack-cli": "^5.1.4"
}
}
原生WebSocket服务器实现
// src/websocket/nativeWebSocket.js - 原生WebSocket实现
const WebSocket = require('ws');
const http = require('http');
const url = require('url');
const jwt = require('jsonwebtoken');
const logger = require('../utils/logger');
class NativeWebSocketServer {
constructor(server) {
this.wss = new WebSocket.Server({
server,
verifyClient: this.verifyClient.bind(this)
});
this.clients = new Map(); // 存储客户端连接
this.rooms = new Map(); // 存储房间信息
this.setupEventHandlers();
}
// 客户端验证
verifyClient(info) {
try {
const query = url.parse(info.req.url, true).query;
const token = query.token;
if (!token) {
logger.warn('WebSocket connection rejected: No token provided');
return false;
}
const decoded = jwt.verify(token, process.env.JWT_SECRET);
info.req.user = decoded;
return true;
} catch (error) {
logger.warn('WebSocket connection rejected: Invalid token', error.message);
return false;
}
}
setupEventHandlers() {
this.wss.on('connection', (ws, req) => {
const user = req.user;
const clientId = this.generateClientId();
// 存储客户端信息
this.clients.set(clientId, {
ws,
user,
rooms: new Set(),
lastPing: Date.now()
});
logger.info(`WebSocket client connected: ${user.username} (${clientId})`);
// 发送连接确认
this.sendMessage(ws, {
type: 'connection',
data: {
clientId,
message: 'Connected successfully',
timestamp: new Date().toISOString()
}
});
// 设置消息处理
ws.on('message', (data) => {
this.handleMessage(clientId, data);
});
// 设置连接关闭处理
ws.on('close', () => {
this.handleDisconnection(clientId);
});
// 设置错误处理
ws.on('error', (error) => {
logger.error(`WebSocket error for client ${clientId}:`, error);
});
// 设置心跳检测
ws.on('pong', () => {
const client = this.clients.get(clientId);
if (client) {
client.lastPing = Date.now();
}
});
});
// 启动心跳检测
this.startHeartbeat();
}
handleMessage(clientId, data) {
try {
const message = JSON.parse(data);
const client = this.clients.get(clientId);
if (!client) {
logger.warn(`Message from unknown client: ${clientId}`);
return;
}
logger.info(`Message from ${client.user.username}:`, message);
switch (message.type) {
case 'join_room':
this.handleJoinRoom(clientId, message.data);
break;
case 'leave_room':
this.handleLeaveRoom(clientId, message.data);
break;
case 'chat_message':
this.handleChatMessage(clientId, message.data);
break;
case 'private_message':
this.handlePrivateMessage(clientId, message.data);
break;
case 'typing':
this.handleTyping(clientId, message.data);
break;
case 'ping':
this.handlePing(clientId);
break;
default:
logger.warn(`Unknown message type: ${message.type}`);
}
} catch (error) {
logger.error(`Error handling message from client ${clientId}:`, error);
}
}
handleJoinRoom(clientId, data) {
const { roomId } = data;
const client = this.clients.get(clientId);
if (!client) return;
// 添加客户端到房间
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(clientId);
client.rooms.add(roomId);
// 通知房间内其他用户
this.broadcastToRoom(roomId, {
type: 'user_joined',
data: {
user: client.user,
roomId,
timestamp: new Date().toISOString()
}
}, clientId);
// 确认加入成功
this.sendMessage(client.ws, {
type: 'room_joined',
data: {
roomId,
message: `Joined room ${roomId}`,
timestamp: new Date().toISOString()
}
});
logger.info(`Client ${client.user.username} joined room ${roomId}`);
}
handleLeaveRoom(clientId, data) {
const { roomId } = data;
const client = this.clients.get(clientId);
if (!client) return;
// 从房间移除客户端
if (this.rooms.has(roomId)) {
this.rooms.get(roomId).delete(clientId);
// 如果房间为空,删除房间
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
}
}
client.rooms.delete(roomId);
// 通知房间内其他用户
this.broadcastToRoom(roomId, {
type: 'user_left',
data: {
user: client.user,
roomId,
timestamp: new Date().toISOString()
}
});
logger.info(`Client ${client.user.username} left room ${roomId}`);
}
handleChatMessage(clientId, data) {
const { roomId, message } = data;
const client = this.clients.get(clientId);
if (!client || !client.rooms.has(roomId)) {
return;
}
const chatMessage = {
type: 'chat_message',
data: {
id: this.generateMessageId(),
user: client.user,
message,
roomId,
timestamp: new Date().toISOString()
}
};
// 广播消息到房间
this.broadcastToRoom(roomId, chatMessage);
logger.info(`Chat message in room ${roomId} from ${client.user.username}: ${message}`);
}
handlePrivateMessage(clientId, data) {
const { targetUserId, message } = data;
const client = this.clients.get(clientId);
if (!client) return;
// 查找目标用户
const targetClient = Array.from(this.clients.values())
.find(c => c.user.id === targetUserId);
if (!targetClient) {
this.sendMessage(client.ws, {
type: 'error',
data: {
message: 'Target user not found or offline',
timestamp: new Date().toISOString()
}
});
return;
}
const privateMessage = {
type: 'private_message',
data: {
id: this.generateMessageId(),
from: client.user,
message,
timestamp: new Date().toISOString()
}
};
// 发送给目标用户
this.sendMessage(targetClient.ws, privateMessage);
// 发送确认给发送者
this.sendMessage(client.ws, {
type: 'message_sent',
data: {
targetUser: targetClient.user,
message,
timestamp: new Date().toISOString()
}
});
logger.info(`Private message from ${client.user.username} to ${targetClient.user.username}`);
}
handleTyping(clientId, data) {
const { roomId, isTyping } = data;
const client = this.clients.get(clientId);
if (!client || !client.rooms.has(roomId)) return;
this.broadcastToRoom(roomId, {
type: 'typing',
data: {
user: client.user,
roomId,
isTyping,
timestamp: new Date().toISOString()
}
}, clientId);
}
handlePing(clientId) {
const client = this.clients.get(clientId);
if (client) {
this.sendMessage(client.ws, {
type: 'pong',
data: {
timestamp: new Date().toISOString()
}
});
}
}
handleDisconnection(clientId) {
const client = this.clients.get(clientId);
if (!client) return;
// 从所有房间移除客户端
client.rooms.forEach(roomId => {
this.handleLeaveRoom(clientId, { roomId });
});
// 移除客户端
this.clients.delete(clientId);
logger.info(`Client ${client.user.username} disconnected (${clientId})`);
}
// 工具方法
sendMessage(ws, message) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
broadcastToRoom(roomId, message, excludeClientId = null) {
const room = this.rooms.get(roomId);
if (!room) return;
room.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client) {
this.sendMessage(client.ws, message);
}
}
});
}
broadcastToAll(message, excludeClientId = null) {
this.clients.forEach((client, clientId) => {
if (clientId !== excludeClientId) {
this.sendMessage(client.ws, message);
}
});
}
generateClientId() {
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
generateMessageId() {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
startHeartbeat() {
setInterval(() => {
const now = Date.now();
const timeout = 30000; // 30秒超时
this.clients.forEach((client, clientId) => {
if (now - client.lastPing > timeout) {
logger.info(`Client ${clientId} timed out, closing connection`);
client.ws.terminate();
this.handleDisconnection(clientId);
} else {
// 发送心跳
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.ping();
}
}
});
}, 15000); // 每15秒检查一次
}
// 获取统计信息
getStats() {
return {
connectedClients: this.clients.size,
activeRooms: this.rooms.size,
roomDetails: Array.from(this.rooms.entries()).map(([roomId, clients]) => ({
roomId,
clientCount: clients.size
}))
};
}
}
module.exports = NativeWebSocketServer;
Socket.io高级实现
// src/socketio/socketServer.js - Socket.io实现
const socketIo = require('socket.io');
const jwt = require('jsonwebtoken');
const logger = require('../utils/logger');
const RedisAdapter = require('./redisAdapter');
class SocketIOServer {
constructor(server) {
this.io = socketIo(server, {
cors: {
origin: process.env.CLIENT_URL || "http://localhost:3000",
methods: ["GET", "POST"],
credentials: true
},
transports: ['websocket', 'polling']
});
// 设置Redis适配器用于集群支持
if (process.env.REDIS_URL) {
this.io.adapter(RedisAdapter.createAdapter());
}
this.connectedUsers = new Map();
this.userRooms = new Map();
this.setupMiddleware();
this.setupEventHandlers();
}
setupMiddleware() {
// 认证中间件
this.io.use((socket, next) => {
try {
const token = socket.handshake.auth.token || socket.handshake.query.token;
if (!token) {
return next(new Error('Authentication error: No token provided'));
}
const decoded = jwt.verify(token, process.env.JWT_SECRET);
socket.user = decoded;
logger.info(`Socket.io authentication successful for user: ${decoded.username}`);
next();
} catch (error) {
logger.warn('Socket.io authentication failed:', error.message);
next(new Error('Authentication error: Invalid token'));
}
});
// 速率限制中间件
this.io.use((socket, next) => {
const rateLimiter = new Map();
socket.use((packet, next) => {
const now = Date.now();
const userId = socket.user.id;
const userLimit = rateLimiter.get(userId) || { count: 0, resetTime: now + 60000 };
if (now > userLimit.resetTime) {
userLimit.count = 0;
userLimit.resetTime = now + 60000;
}
if (userLimit.count >= 100) { // 每分钟最多100个消息
return next(new Error('Rate limit exceeded'));
}
userLimit.count++;
rateLimiter.set(userId, userLimit);
next();
});
next();
});
}
setupEventHandlers() {
this.io.on('connection', (socket) => {
const user = socket.user;
// 存储用户连接
this.connectedUsers.set(user.id, {
socket,
user,
rooms: new Set(),
status: 'online',
lastActivity: Date.now()
});
logger.info(`Socket.io client connected: ${user.username} (${socket.id})`);
// 发送连接确认
socket.emit('connected', {
message: 'Connected successfully',
user: user,
timestamp: new Date().toISOString()
});
// 广播用户上线状态
socket.broadcast.emit('user_status', {
user: user,
status: 'online',
timestamp: new Date().toISOString()
});
this.setupSocketEvents(socket);
});
}
setupSocketEvents(socket) {
const user = socket.user;
// 加入房间
socket.on('join_room', async (data) => {
try {
const { roomId, roomType = 'public' } = data;
await socket.join(roomId);
// 更新用户房间信息
const userConnection = this.connectedUsers.get(user.id);
if (userConnection) {
userConnection.rooms.add(roomId);
}
// 获取房间信息
const roomInfo = await this.getRoomInfo(roomId);
// 通知房间内其他用户
socket.to(roomId).emit('user_joined', {
user: user,
roomId: roomId,
roomInfo: roomInfo,
timestamp: new Date().toISOString()
});
// 确认加入成功
socket.emit('room_joined', {
roomId: roomId,
roomInfo: roomInfo,
message: `Joined room ${roomId}`,
timestamp: new Date().toISOString()
});
logger.info(`User ${user.username} joined room ${roomId}`);
} catch (error) {
logger.error('Error joining room:', error);
socket.emit('error', { message: 'Failed to join room' });
}
});
// 离开房间
socket.on('leave_room', async (data) => {
try {
const { roomId } = data;
await socket.leave(roomId);
// 更新用户房间信息
const userConnection = this.connectedUsers.get(user.id);
if (userConnection) {
userConnection.rooms.delete(roomId);
}
// 通知房间内其他用户
socket.to(roomId).emit('user_left', {
user: user,
roomId: roomId,
timestamp: new Date().toISOString()
});
socket.emit('room_left', {
roomId: roomId,
message: `Left room ${roomId}`,
timestamp: new Date().toISOString()
});
logger.info(`User ${user.username} left room ${roomId}`);
} catch (error) {
logger.error('Error leaving room:', error);
socket.emit('error', { message: 'Failed to leave room' });
}
});
// 聊天消息
socket.on('chat_message', async (data) => {
try {
const { roomId, message, messageType = 'text' } = data;
// 验证用户是否在房间中
const rooms = Array.from(socket.rooms);
if (!rooms.includes(roomId)) {
socket.emit('error', { message: 'You are not in this room' });
return;
}
const chatMessage = {
id: this.generateMessageId(),
user: user,
message: message,
messageType: messageType,
roomId: roomId,
timestamp: new Date().toISOString()
};
// 保存消息到数据库(可选)
await this.saveMessage(chatMessage);
// 广播消息到房间
this.io.to(roomId).emit('chat_message', chatMessage);
logger.info(`Chat message in room ${roomId} from ${user.username}`);
} catch (error) {
logger.error('Error handling chat message:', error);
socket.emit('error', { message: 'Failed to send message' });
}
});
// 私聊消息
socket.on('private_message', async (data) => {
try {
const { targetUserId, message, messageType = 'text' } = data;
const targetConnection = this.connectedUsers.get(targetUserId);
if (!targetConnection) {
socket.emit('error', { message: 'Target user is offline' });
return;
}
const privateMessage = {
id: this.generateMessageId(),
from: user,
to: targetConnection.user,
message: message,
messageType: messageType,
timestamp: new Date().toISOString()
};
// 保存私聊消息
await this.savePrivateMessage(privateMessage);
// 发送给目标用户
targetConnection.socket.emit('private_message', privateMessage);
// 发送确认给发送者
socket.emit('message_sent', {
targetUser: targetConnection.user,
message: privateMessage,
timestamp: new Date().toISOString()
});
logger.info(`Private message from ${user.username} to ${targetConnection.user.username}`);
} catch (error) {
logger.error('Error handling private message:', error);
socket.emit('error', { message: 'Failed to send private message' });
}
});
// 输入状态
socket.on('typing', (data) => {
const { roomId, isTyping } = data;
socket.to(roomId).emit('typing', {
user: user,
roomId: roomId,
isTyping: isTyping,
timestamp: new Date().toISOString()
});
});
// 用户状态更新
socket.on('status_update', (data) => {
const { status } = data;
const userConnection = this.connectedUsers.get(user.id);
if (userConnection) {
userConnection.status = status;
userConnection.lastActivity = Date.now();
}
// 广播状态更新
socket.broadcast.emit('user_status', {
user: user,
status: status,
timestamp: new Date().toISOString()
});
});
// 文件分享
socket.on('file_share', async (data) => {
try {
const { roomId, fileInfo } = data;
const fileMessage = {
id: this.generateMessageId(),
user: user,
messageType: 'file',
fileInfo: fileInfo,
roomId: roomId,
timestamp: new Date().toISOString()
};
// 保存文件消息
await this.saveMessage(fileMessage);
// 广播文件分享
this.io.to(roomId).emit('file_shared', fileMessage);
logger.info(`File shared in room ${roomId} by ${user.username}: ${fileInfo.name}`);
} catch (error) {
logger.error('Error handling file share:', error);
socket.emit('error', { message: 'Failed to share file' });
}
});
// 断开连接处理
socket.on('disconnect', (reason) => {
logger.info(`Socket.io client disconnected: ${user.username} (${socket.id}), reason: ${reason}`);
// 移除用户连接
this.connectedUsers.delete(user.id);
// 广播用户离线状态
socket.broadcast.emit('user_status', {
user: user,
status: 'offline',
timestamp: new Date().toISOString()
});
});
// 错误处理
socket.on('error', (error) => {
logger.error(`Socket.io error for user ${user.username}:`, error);
});
}
// 工具方法
async getRoomInfo(roomId) {
const sockets = await this.io.in(roomId).fetchSockets();
return {
id: roomId,
userCount: sockets.length,
users: sockets.map(socket => socket.user)
};
}
async saveMessage(message) {
// 这里可以实现消息持久化逻辑
// 例如保存到MongoDB或其他数据库
logger.info('Message saved:', message.id);
}
async savePrivateMessage(message) {
// 实现私聊消息持久化
logger.info('Private message saved:', message.id);
}
generateMessageId() {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// 获取在线用户列表
getOnlineUsers() {
return Array.from(this.connectedUsers.values()).map(conn => ({
user: conn.user,
status: conn.status,
lastActivity: conn.lastActivity
}));
}
// 获取服务器统计信息
async getServerStats() {
const sockets = await this.io.fetchSockets();
return {
connectedClients: sockets.length,
onlineUsers: this.connectedUsers.size,
totalRooms: this.io.sockets.adapter.rooms.size
};
}
// 向特定用户发送消息
sendToUser(userId, event, data) {
const userConnection = this.connectedUsers.get(userId);
if (userConnection) {
userConnection.socket.emit(event, data);
return true;
}
return false;
}
// 向所有用户广播消息
broadcast(event, data) {
this.io.emit(event, data);
}
// 向房间广播消息
broadcastToRoom(roomId, event, data) {
this.io.to(roomId).emit(event, data);
}
}
module.exports = SocketIOServer;
Redis适配器和集群支持
// src/socketio/redisAdapter.js - Redis适配器
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const logger = require('../utils/logger');
class RedisAdapter {
static createAdapter() {
try {
const pubClient = createClient({
url: process.env.REDIS_URL
});
const subClient = pubClient.duplicate();
pubClient.on('error', (err) => {
logger.error('Redis pub client error:', err);
});
subClient.on('error', (err) => {
logger.error('Redis sub client error:', err);
});
pubClient.on('connect', () => {
logger.info('Redis pub client connected');
});
subClient.on('connect', () => {
logger.info('Redis sub client connected');
});
return createAdapter(pubClient, subClient);
} catch (error) {
logger.error('Failed to create Redis adapter:', error);
throw error;
}
}
}
module.exports = RedisAdapter;
总结
Node.js实时应用开发的核心要点:
🎯 技术选择
- WebSocket:原生实时通信协议
- Socket.io:功能丰富的实时通信库
- Redis:集群支持和消息持久化
- JWT:安全的用户认证机制
✅ 核心功能
- 实时聊天和私聊
- 房间管理和用户状态
- 文件分享和多媒体支持
- 心跳检测和断线重连
🚀 高级特性
- 集群部署和负载均衡
- 消息持久化和历史记录
- 速率限制和安全防护
- 性能监控和统计分析
💡 最佳实践
- 认证和授权机制
- 错误处理和日志记录
- 内存管理和性能优化
- 可扩展的架构设计
掌握Node.js实时应用开发,构建高性能通信系统!
实时通信是现代应用的核心功能,Node.js的事件驱动特性使其在这一领域具有天然优势,通过合理的架构设计可以构建出高性能、可扩展的实时应用系统。