Files
nxxmdata/backend/utils/websocket.js
2025-09-12 20:08:42 +08:00

340 lines
9.1 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* WebSocket实时通信系统
* @file websocket.js
* @description 实现实时数据推送,替代轮询机制
*/
const socketIO = require('socket.io');
const jwt = require('jsonwebtoken');
const { User, Role } = require('../models');
const logger = require('./logger');
class WebSocketManager {
constructor() {
this.io = null;
this.connectedClients = new Map(); // 存储连接的客户端信息
this.rooms = {
admins: 'admin_room',
users: 'user_room',
farms: 'farm_', // farm_1, farm_2 等
devices: 'device_', // device_1, device_2 等
};
}
/**
* 初始化WebSocket服务器
* @param {Object} server HTTP服务器实例
*/
init(server) {
this.io = socketIO(server, {
cors: {
origin: ["http://localhost:5300", "http://localhost:3000"],
methods: ["GET", "POST"],
credentials: true
},
pingTimeout: 60000,
pingInterval: 25000
});
// 中间件:认证
this.io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token || socket.handshake.headers.authorization?.split(' ')[1];
if (!token) {
return next(new Error('未提供认证令牌'));
}
// 验证JWT令牌
const decoded = jwt.verify(token, process.env.JWT_SECRET || 'your_jwt_secret_key');
// 获取用户信息和角色
const user = await User.findByPk(decoded.id, {
include: [{
model: Role,
as: 'role',
attributes: ['name']
}]
});
if (!user) {
return next(new Error('用户不存在'));
}
// 将用户信息附加到socket
socket.user = {
id: user.id,
username: user.username,
email: user.email,
roles: user.role ? [user.role.name] : []
};
next();
} catch (error) {
logger.error('WebSocket认证失败:', error);
next(new Error('认证失败'));
}
});
// 连接事件处理
this.io.on('connection', (socket) => {
this.handleConnection(socket);
});
logger.info('WebSocket服务器初始化完成');
return this.io;
}
/**
* 处理客户端连接
* @param {Object} socket Socket实例
*/
handleConnection(socket) {
const user = socket.user;
// 存储客户端信息
this.connectedClients.set(socket.id, {
userId: user.id,
username: user.username,
roles: user.roles,
connectedAt: new Date()
});
// 加入相应的房间
this.joinRooms(socket, user);
logger.info(`用户 ${user.username} 已连接 WebSocket连接ID: ${socket.id}`);
// 发送连接成功消息
socket.emit('connected', {
message: '实时连接已建立',
timestamp: new Date(),
user: {
id: user.id,
username: user.username
}
});
// 处理客户端事件
this.setupSocketEvents(socket);
// 断开连接处理
socket.on('disconnect', () => {
this.handleDisconnection(socket);
});
}
/**
* 让用户加入相应的房间
* @param {Object} socket Socket实例
* @param {Object} user 用户信息
*/
joinRooms(socket, user) {
// 所有用户加入用户房间
socket.join(this.rooms.users);
// 管理员加入管理员房间
if (user.roles && user.roles.includes('admin')) {
socket.join(this.rooms.admins);
}
// 可以根据业务需求加入特定的农场或设备房间
// 这里暂时加入全局房间,后续可以根据用户权限细化
}
/**
* 设置Socket事件监听
* @param {Object} socket Socket实例
*/
setupSocketEvents(socket) {
// 订阅特定农场的数据
socket.on('subscribe_farm', (farmId) => {
socket.join(`${this.rooms.farms}${farmId}`);
logger.info(`用户 ${socket.user.username} 订阅农场 ${farmId} 的实时数据`);
});
// 取消订阅农场数据
socket.on('unsubscribe_farm', (farmId) => {
socket.leave(`${this.rooms.farms}${farmId}`);
logger.info(`用户 ${socket.user.username} 取消订阅农场 ${farmId} 的实时数据`);
});
// 订阅特定设备的数据
socket.on('subscribe_device', (deviceId) => {
socket.join(`${this.rooms.devices}${deviceId}`);
logger.info(`用户 ${socket.user.username} 订阅设备 ${deviceId} 的实时数据`);
});
// 心跳检测
socket.on('ping', () => {
socket.emit('pong', { timestamp: new Date() });
});
}
/**
* 处理客户端断开连接
* @param {Object} socket Socket实例
*/
handleDisconnection(socket) {
const clientInfo = this.connectedClients.get(socket.id);
if (clientInfo) {
logger.info(`用户 ${clientInfo.username} 断开 WebSocket 连接连接ID: ${socket.id}`);
this.connectedClients.delete(socket.id);
}
}
/**
* 广播设备状态更新
* @param {Object} deviceData 设备数据
*/
broadcastDeviceUpdate(deviceData) {
if (!this.io) return;
// 向所有用户广播设备更新
this.io.to(this.rooms.users).emit('device_update', {
type: 'device_status',
data: deviceData,
timestamp: new Date()
});
// 向特定设备的订阅者发送更新
this.io.to(`${this.rooms.devices}${deviceData.id}`).emit('device_detail_update', {
type: 'device_detail',
data: deviceData,
timestamp: new Date()
});
logger.info(`设备状态更新已广播: 设备ID ${deviceData.id}`);
}
/**
* 广播预警信息
* @param {Object} alertData 预警数据
*/
broadcastAlert(alertData) {
if (!this.io) return;
// 向管理员发送紧急预警
if (alertData.level === 'critical' || alertData.level === 'high') {
this.io.to(this.rooms.admins).emit('urgent_alert', {
type: 'urgent_alert',
data: alertData,
timestamp: new Date()
});
}
// 向所有用户广播预警
this.io.to(this.rooms.users).emit('alert_update', {
type: 'new_alert',
data: alertData,
timestamp: new Date()
});
// 向特定农场的订阅者发送预警
if (alertData.farm_id) {
this.io.to(`${this.rooms.farms}${alertData.farm_id}`).emit('farm_alert', {
type: 'farm_alert',
data: alertData,
timestamp: new Date()
});
}
logger.info(`预警信息已广播: 预警ID ${alertData.id}, 级别: ${alertData.level}`);
}
/**
* 广播动物健康数据更新
* @param {Object} animalData 动物数据
*/
broadcastAnimalUpdate(animalData) {
if (!this.io) return;
this.io.to(this.rooms.users).emit('animal_update', {
type: 'animal_health',
data: animalData,
timestamp: new Date()
});
// 向特定农场的订阅者发送动物更新
if (animalData.farm_id) {
this.io.to(`${this.rooms.farms}${animalData.farm_id}`).emit('farm_animal_update', {
type: 'farm_animal',
data: animalData,
timestamp: new Date()
});
}
logger.info(`动物健康数据更新已广播: 动物ID ${animalData.id}`);
}
/**
* 广播系统统计数据更新
* @param {Object} statsData 统计数据
*/
broadcastStatsUpdate(statsData) {
if (!this.io) return;
this.io.to(this.rooms.users).emit('stats_update', {
type: 'system_stats',
data: statsData,
timestamp: new Date()
});
logger.info('系统统计数据更新已广播');
}
/**
* 发送性能监控数据
* @param {Object} performanceData 性能数据
*/
broadcastPerformanceUpdate(performanceData) {
if (!this.io) return;
// 只向管理员发送性能数据
this.io.to(this.rooms.admins).emit('performance_update', {
type: 'system_performance',
data: performanceData,
timestamp: new Date()
});
logger.info('性能监控数据已发送给管理员');
}
/**
* 获取连接统计信息
* @returns {Object} 连接统计
*/
getConnectionStats() {
return {
totalConnections: this.connectedClients.size,
connectedUsers: Array.from(this.connectedClients.values()).map(client => ({
userId: client.userId,
username: client.username,
connectedAt: client.connectedAt
}))
};
}
/**
* 向特定用户发送消息
* @param {number} userId 用户ID
* @param {string} event 事件名称
* @param {Object} data 数据
*/
sendToUser(userId, event, data) {
if (!this.io) return;
for (const [socketId, clientInfo] of this.connectedClients.entries()) {
if (clientInfo.userId === userId) {
this.io.to(socketId).emit(event, data);
logger.info(`消息已发送给用户 ${clientInfo.username}: ${event}`);
}
}
}
}
// 创建单例实例
const webSocketManager = new WebSocketManager();
module.exports = webSocketManager;