Files
nxxmdata/backend/utils/websocket.js

340 lines
9.1 KiB
JavaScript
Raw Normal View History

2025-09-12 20:08:42 +08:00
/**
* WebSocket实时通信系统
* @file websocket.js
* @description 实现实时数据推送替代轮询机制
*/
const socketIO = require('socket.io');
const jwt = require('jsonwebtoken');
const { User, Role } = require('../models');
const logger = require('./logger');
class WebSocketManager {
constructor() {
this.io = null;
this.connectedClients = new Map(); // 存储连接的客户端信息
this.rooms = {
admins: 'admin_room',
users: 'user_room',
farms: 'farm_', // farm_1, farm_2 等
devices: 'device_', // device_1, device_2 等
};
}
/**
* 初始化WebSocket服务器
* @param {Object} server HTTP服务器实例
*/
init(server) {
this.io = socketIO(server, {
cors: {
origin: ["http://localhost:5300", "http://localhost:3000"],
methods: ["GET", "POST"],
credentials: true
},
pingTimeout: 60000,
pingInterval: 25000
});
// 中间件:认证
this.io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token || socket.handshake.headers.authorization?.split(' ')[1];
if (!token) {
return next(new Error('未提供认证令牌'));
}
// 验证JWT令牌
const decoded = jwt.verify(token, process.env.JWT_SECRET || 'your_jwt_secret_key');
// 获取用户信息和角色
const user = await User.findByPk(decoded.id, {
include: [{
model: Role,
as: 'role',
attributes: ['name']
}]
});
if (!user) {
return next(new Error('用户不存在'));
}
// 将用户信息附加到socket
socket.user = {
id: user.id,
username: user.username,
email: user.email,
roles: user.role ? [user.role.name] : []
};
next();
} catch (error) {
logger.error('WebSocket认证失败:', error);
next(new Error('认证失败'));
}
});
// 连接事件处理
this.io.on('connection', (socket) => {
this.handleConnection(socket);
});
logger.info('WebSocket服务器初始化完成');
return this.io;
}
/**
* 处理客户端连接
* @param {Object} socket Socket实例
*/
handleConnection(socket) {
const user = socket.user;
// 存储客户端信息
this.connectedClients.set(socket.id, {
userId: user.id,
username: user.username,
roles: user.roles,
connectedAt: new Date()
});
// 加入相应的房间
this.joinRooms(socket, user);
logger.info(`用户 ${user.username} 已连接 WebSocket连接ID: ${socket.id}`);
// 发送连接成功消息
socket.emit('connected', {
message: '实时连接已建立',
timestamp: new Date(),
user: {
id: user.id,
username: user.username
}
});
// 处理客户端事件
this.setupSocketEvents(socket);
// 断开连接处理
socket.on('disconnect', () => {
this.handleDisconnection(socket);
});
}
/**
* 让用户加入相应的房间
* @param {Object} socket Socket实例
* @param {Object} user 用户信息
*/
joinRooms(socket, user) {
// 所有用户加入用户房间
socket.join(this.rooms.users);
// 管理员加入管理员房间
if (user.roles && user.roles.includes('admin')) {
socket.join(this.rooms.admins);
}
// 可以根据业务需求加入特定的农场或设备房间
// 这里暂时加入全局房间,后续可以根据用户权限细化
}
/**
* 设置Socket事件监听
* @param {Object} socket Socket实例
*/
setupSocketEvents(socket) {
// 订阅特定农场的数据
socket.on('subscribe_farm', (farmId) => {
socket.join(`${this.rooms.farms}${farmId}`);
logger.info(`用户 ${socket.user.username} 订阅农场 ${farmId} 的实时数据`);
});
// 取消订阅农场数据
socket.on('unsubscribe_farm', (farmId) => {
socket.leave(`${this.rooms.farms}${farmId}`);
logger.info(`用户 ${socket.user.username} 取消订阅农场 ${farmId} 的实时数据`);
});
// 订阅特定设备的数据
socket.on('subscribe_device', (deviceId) => {
socket.join(`${this.rooms.devices}${deviceId}`);
logger.info(`用户 ${socket.user.username} 订阅设备 ${deviceId} 的实时数据`);
});
// 心跳检测
socket.on('ping', () => {
socket.emit('pong', { timestamp: new Date() });
});
}
/**
* 处理客户端断开连接
* @param {Object} socket Socket实例
*/
handleDisconnection(socket) {
const clientInfo = this.connectedClients.get(socket.id);
if (clientInfo) {
logger.info(`用户 ${clientInfo.username} 断开 WebSocket 连接连接ID: ${socket.id}`);
this.connectedClients.delete(socket.id);
}
}
/**
* 广播设备状态更新
* @param {Object} deviceData 设备数据
*/
broadcastDeviceUpdate(deviceData) {
if (!this.io) return;
// 向所有用户广播设备更新
this.io.to(this.rooms.users).emit('device_update', {
type: 'device_status',
data: deviceData,
timestamp: new Date()
});
// 向特定设备的订阅者发送更新
this.io.to(`${this.rooms.devices}${deviceData.id}`).emit('device_detail_update', {
type: 'device_detail',
data: deviceData,
timestamp: new Date()
});
logger.info(`设备状态更新已广播: 设备ID ${deviceData.id}`);
}
/**
* 广播预警信息
* @param {Object} alertData 预警数据
*/
broadcastAlert(alertData) {
if (!this.io) return;
// 向管理员发送紧急预警
if (alertData.level === 'critical' || alertData.level === 'high') {
this.io.to(this.rooms.admins).emit('urgent_alert', {
type: 'urgent_alert',
data: alertData,
timestamp: new Date()
});
}
// 向所有用户广播预警
this.io.to(this.rooms.users).emit('alert_update', {
type: 'new_alert',
data: alertData,
timestamp: new Date()
});
// 向特定农场的订阅者发送预警
if (alertData.farm_id) {
this.io.to(`${this.rooms.farms}${alertData.farm_id}`).emit('farm_alert', {
type: 'farm_alert',
data: alertData,
timestamp: new Date()
});
}
logger.info(`预警信息已广播: 预警ID ${alertData.id}, 级别: ${alertData.level}`);
}
/**
* 广播动物健康数据更新
* @param {Object} animalData 动物数据
*/
broadcastAnimalUpdate(animalData) {
if (!this.io) return;
this.io.to(this.rooms.users).emit('animal_update', {
type: 'animal_health',
data: animalData,
timestamp: new Date()
});
// 向特定农场的订阅者发送动物更新
if (animalData.farm_id) {
this.io.to(`${this.rooms.farms}${animalData.farm_id}`).emit('farm_animal_update', {
type: 'farm_animal',
data: animalData,
timestamp: new Date()
});
}
logger.info(`动物健康数据更新已广播: 动物ID ${animalData.id}`);
}
/**
* 广播系统统计数据更新
* @param {Object} statsData 统计数据
*/
broadcastStatsUpdate(statsData) {
if (!this.io) return;
this.io.to(this.rooms.users).emit('stats_update', {
type: 'system_stats',
data: statsData,
timestamp: new Date()
});
logger.info('系统统计数据更新已广播');
}
/**
* 发送性能监控数据
* @param {Object} performanceData 性能数据
*/
broadcastPerformanceUpdate(performanceData) {
if (!this.io) return;
// 只向管理员发送性能数据
this.io.to(this.rooms.admins).emit('performance_update', {
type: 'system_performance',
data: performanceData,
timestamp: new Date()
});
logger.info('性能监控数据已发送给管理员');
}
/**
* 获取连接统计信息
* @returns {Object} 连接统计
*/
getConnectionStats() {
return {
totalConnections: this.connectedClients.size,
connectedUsers: Array.from(this.connectedClients.values()).map(client => ({
userId: client.userId,
username: client.username,
connectedAt: client.connectedAt
}))
};
}
/**
* 向特定用户发送消息
* @param {number} userId 用户ID
* @param {string} event 事件名称
* @param {Object} data 数据
*/
sendToUser(userId, event, data) {
if (!this.io) return;
for (const [socketId, clientInfo] of this.connectedClients.entries()) {
if (clientInfo.userId === userId) {
this.io.to(socketId).emit(event, data);
logger.info(`消息已发送给用户 ${clientInfo.username}: ${event}`);
}
}
}
}
// 创建单例实例
const webSocketManager = new WebSocketManager();
module.exports = webSocketManager;