// websocket.js const WebSocket = require('./node_modules/ws'); const { verifyToken } = require('./jwt'); // 引入 JWT 验证逻辑 // 存储所有连接的客户端 const clients = new Map(); // 格式: userId => ws /** * 初始化 WebSocket 服务 * @param {http.Server} server - HTTP 服务器实例 * @param {Function} onUserDisconnected - 当用户 WS 断线时,需要调用的回调函数 */ function initWebSocket(server, onUserDisconnected) { const wss = new WebSocket.Server({ noServer: true }); // 处理升级事件 server.on('upgrade', (request, socket, head) => { const urlParams = new URLSearchParams(request.url.split('?')[1]); const token = urlParams.get('token'); if (!token) { socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; } try { const result = verifyToken(token); // 验证 JWT if (!result || result.error || !result.userId) { throw new Error(result.error || '无效的 Token'); } const userId = result.userId; // 确保 userId 存在 wss.handleUpgrade(request, socket, head, (ws) => { wss.emit('connection', ws, userId); }); } catch (error) { console.error('身份验证失败:', error.message); socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); } }); wss.on('connection', (ws, userId) => { console.log(`新客户端连接: userId=${userId}`); // 保存客户端连接 clients.set(userId, ws); // 通知客户端连接成功 ws.send( JSON.stringify({ type: 'connection_ack', message: '连接成功', userId, }) ); // 监听客户端发送的消息 ws.on('message', (message) => { console.log(`收到消息: ${message}`); try { const data = JSON.parse(message); handleMessage(userId, data); } catch (error) { console.error('消息解析失败:', error.message); ws.send( JSON.stringify({ type: 'error', message: '无效的消息格式', }) ); } }); // 监听客户端断开连接 ws.on('close', () => { console.log(`客户端断开连接: userId=${userId}`); // 1. 删除此用户在全局 clients 的记录 clients.delete(userId); // 2. 通知等待室进行房间移除、广播等后续操作 if (typeof onUserDisconnected === 'function') { onUserDisconnected(userId); } }); // 监听连接错误 ws.on('error', (error) => { console.error(`客户端 userId=${userId} 出现错误:`, error.message); }); }); console.log('WebSocket 服务已启动'); } /** * 处理客户端发送的消息 * @param {string|number} userId - 发送消息的用户 ID * @param {Object} data - 客户端发送的消息数据 */ function handleMessage(userId, data) { const clientWs = clients.get(userId); if (!clientWs) { console.error(`未找到 userId=${userId} 的连接`); return; } switch (data.type) { case 'ping': // 处理心跳检测 clientWs.send( JSON.stringify({ type: 'pong', message: '心跳检测成功', }) ); break; case 'broadcast': // 示例: 广播消息 broadcastMessage(userId, data.message); break; default: console.warn(`未处理的消息类型: ${data.type}`); } } /** * 广播消息给所有客户端 * @param {string} senderId - 发送消息的用户 ID * @param {string|Object} message - 广播的消息内容 */ function broadcastMessage(senderId, message) { // 在广播前记录日志 console.log(`Broadcasting message from ${senderId}: ${JSON.stringify(message)}`); // 遍历所有客户端进行广播 clients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) { ws.send( JSON.stringify({ type: 'broadcast', senderId, message, }) ); } }); } /** * 向单个用户发送消息 * @param {string} userId - 接收消息的用户 ID * @param {Object} message - 要发送的消息内容 */ function sendMessageToUser(userId, message) { const ws = clients.get(userId); if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(message)); console.log(`消息发送给 userId=${userId}:`, message); } else { console.error(`无法向 userId=${userId} 发送消息,用户可能已断开连接`); } } /** * 获取所有客户端连接 * @returns {Map} 当前所有 WebSocket 连接 */ function getClients() { return clients; } module.exports = { initWebSocket, sendMessageToUser, broadcastMessage, handleMessage, getClients, };