340 lines
9.1 KiB
JavaScript
340 lines
9.1 KiB
JavaScript
|
|
/**
|
|||
|
|
* WebSocket实时通信系统
|
|||
|
|
* @file websocket.js
|
|||
|
|
* @description 实现实时数据推送,替代轮询机制
|
|||
|
|
*/
|
|||
|
|
const socketIO = require('socket.io');
|
|||
|
|
const jwt = require('jsonwebtoken');
|
|||
|
|
const { User, Role } = require('../models');
|
|||
|
|
const logger = require('./logger');
|
|||
|
|
|
|||
|
|
class WebSocketManager {
|
|||
|
|
constructor() {
|
|||
|
|
this.io = null;
|
|||
|
|
this.connectedClients = new Map(); // 存储连接的客户端信息
|
|||
|
|
this.rooms = {
|
|||
|
|
admins: 'admin_room',
|
|||
|
|
users: 'user_room',
|
|||
|
|
farms: 'farm_', // farm_1, farm_2 等
|
|||
|
|
devices: 'device_', // device_1, device_2 等
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 初始化WebSocket服务器
|
|||
|
|
* @param {Object} server HTTP服务器实例
|
|||
|
|
*/
|
|||
|
|
init(server) {
|
|||
|
|
this.io = socketIO(server, {
|
|||
|
|
cors: {
|
|||
|
|
origin: ["http://localhost:5300", "http://localhost:3000"],
|
|||
|
|
methods: ["GET", "POST"],
|
|||
|
|
credentials: true
|
|||
|
|
},
|
|||
|
|
pingTimeout: 60000,
|
|||
|
|
pingInterval: 25000
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 中间件:认证
|
|||
|
|
this.io.use(async (socket, next) => {
|
|||
|
|
try {
|
|||
|
|
const token = socket.handshake.auth.token || socket.handshake.headers.authorization?.split(' ')[1];
|
|||
|
|
|
|||
|
|
if (!token) {
|
|||
|
|
return next(new Error('未提供认证令牌'));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 验证JWT令牌
|
|||
|
|
const decoded = jwt.verify(token, process.env.JWT_SECRET || 'your_jwt_secret_key');
|
|||
|
|
|
|||
|
|
// 获取用户信息和角色
|
|||
|
|
const user = await User.findByPk(decoded.id, {
|
|||
|
|
include: [{
|
|||
|
|
model: Role,
|
|||
|
|
as: 'role',
|
|||
|
|
attributes: ['name']
|
|||
|
|
}]
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
if (!user) {
|
|||
|
|
return next(new Error('用户不存在'));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 将用户信息附加到socket
|
|||
|
|
socket.user = {
|
|||
|
|
id: user.id,
|
|||
|
|
username: user.username,
|
|||
|
|
email: user.email,
|
|||
|
|
roles: user.role ? [user.role.name] : []
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
next();
|
|||
|
|
} catch (error) {
|
|||
|
|
logger.error('WebSocket认证失败:', error);
|
|||
|
|
next(new Error('认证失败'));
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 连接事件处理
|
|||
|
|
this.io.on('connection', (socket) => {
|
|||
|
|
this.handleConnection(socket);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
logger.info('WebSocket服务器初始化完成');
|
|||
|
|
return this.io;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 处理客户端连接
|
|||
|
|
* @param {Object} socket Socket实例
|
|||
|
|
*/
|
|||
|
|
handleConnection(socket) {
|
|||
|
|
const user = socket.user;
|
|||
|
|
|
|||
|
|
// 存储客户端信息
|
|||
|
|
this.connectedClients.set(socket.id, {
|
|||
|
|
userId: user.id,
|
|||
|
|
username: user.username,
|
|||
|
|
roles: user.roles,
|
|||
|
|
connectedAt: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 加入相应的房间
|
|||
|
|
this.joinRooms(socket, user);
|
|||
|
|
|
|||
|
|
logger.info(`用户 ${user.username} 已连接 WebSocket,连接ID: ${socket.id}`);
|
|||
|
|
|
|||
|
|
// 发送连接成功消息
|
|||
|
|
socket.emit('connected', {
|
|||
|
|
message: '实时连接已建立',
|
|||
|
|
timestamp: new Date(),
|
|||
|
|
user: {
|
|||
|
|
id: user.id,
|
|||
|
|
username: user.username
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 处理客户端事件
|
|||
|
|
this.setupSocketEvents(socket);
|
|||
|
|
|
|||
|
|
// 断开连接处理
|
|||
|
|
socket.on('disconnect', () => {
|
|||
|
|
this.handleDisconnection(socket);
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 让用户加入相应的房间
|
|||
|
|
* @param {Object} socket Socket实例
|
|||
|
|
* @param {Object} user 用户信息
|
|||
|
|
*/
|
|||
|
|
joinRooms(socket, user) {
|
|||
|
|
// 所有用户加入用户房间
|
|||
|
|
socket.join(this.rooms.users);
|
|||
|
|
|
|||
|
|
// 管理员加入管理员房间
|
|||
|
|
if (user.roles && user.roles.includes('admin')) {
|
|||
|
|
socket.join(this.rooms.admins);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 可以根据业务需求加入特定的农场或设备房间
|
|||
|
|
// 这里暂时加入全局房间,后续可以根据用户权限细化
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 设置Socket事件监听
|
|||
|
|
* @param {Object} socket Socket实例
|
|||
|
|
*/
|
|||
|
|
setupSocketEvents(socket) {
|
|||
|
|
// 订阅特定农场的数据
|
|||
|
|
socket.on('subscribe_farm', (farmId) => {
|
|||
|
|
socket.join(`${this.rooms.farms}${farmId}`);
|
|||
|
|
logger.info(`用户 ${socket.user.username} 订阅农场 ${farmId} 的实时数据`);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 取消订阅农场数据
|
|||
|
|
socket.on('unsubscribe_farm', (farmId) => {
|
|||
|
|
socket.leave(`${this.rooms.farms}${farmId}`);
|
|||
|
|
logger.info(`用户 ${socket.user.username} 取消订阅农场 ${farmId} 的实时数据`);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 订阅特定设备的数据
|
|||
|
|
socket.on('subscribe_device', (deviceId) => {
|
|||
|
|
socket.join(`${this.rooms.devices}${deviceId}`);
|
|||
|
|
logger.info(`用户 ${socket.user.username} 订阅设备 ${deviceId} 的实时数据`);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 心跳检测
|
|||
|
|
socket.on('ping', () => {
|
|||
|
|
socket.emit('pong', { timestamp: new Date() });
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 处理客户端断开连接
|
|||
|
|
* @param {Object} socket Socket实例
|
|||
|
|
*/
|
|||
|
|
handleDisconnection(socket) {
|
|||
|
|
const clientInfo = this.connectedClients.get(socket.id);
|
|||
|
|
|
|||
|
|
if (clientInfo) {
|
|||
|
|
logger.info(`用户 ${clientInfo.username} 断开 WebSocket 连接,连接ID: ${socket.id}`);
|
|||
|
|
this.connectedClients.delete(socket.id);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 广播设备状态更新
|
|||
|
|
* @param {Object} deviceData 设备数据
|
|||
|
|
*/
|
|||
|
|
broadcastDeviceUpdate(deviceData) {
|
|||
|
|
if (!this.io) return;
|
|||
|
|
|
|||
|
|
// 向所有用户广播设备更新
|
|||
|
|
this.io.to(this.rooms.users).emit('device_update', {
|
|||
|
|
type: 'device_status',
|
|||
|
|
data: deviceData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 向特定设备的订阅者发送更新
|
|||
|
|
this.io.to(`${this.rooms.devices}${deviceData.id}`).emit('device_detail_update', {
|
|||
|
|
type: 'device_detail',
|
|||
|
|
data: deviceData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
logger.info(`设备状态更新已广播: 设备ID ${deviceData.id}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 广播预警信息
|
|||
|
|
* @param {Object} alertData 预警数据
|
|||
|
|
*/
|
|||
|
|
broadcastAlert(alertData) {
|
|||
|
|
if (!this.io) return;
|
|||
|
|
|
|||
|
|
// 向管理员发送紧急预警
|
|||
|
|
if (alertData.level === 'critical' || alertData.level === 'high') {
|
|||
|
|
this.io.to(this.rooms.admins).emit('urgent_alert', {
|
|||
|
|
type: 'urgent_alert',
|
|||
|
|
data: alertData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 向所有用户广播预警
|
|||
|
|
this.io.to(this.rooms.users).emit('alert_update', {
|
|||
|
|
type: 'new_alert',
|
|||
|
|
data: alertData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 向特定农场的订阅者发送预警
|
|||
|
|
if (alertData.farm_id) {
|
|||
|
|
this.io.to(`${this.rooms.farms}${alertData.farm_id}`).emit('farm_alert', {
|
|||
|
|
type: 'farm_alert',
|
|||
|
|
data: alertData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`预警信息已广播: 预警ID ${alertData.id}, 级别: ${alertData.level}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 广播动物健康数据更新
|
|||
|
|
* @param {Object} animalData 动物数据
|
|||
|
|
*/
|
|||
|
|
broadcastAnimalUpdate(animalData) {
|
|||
|
|
if (!this.io) return;
|
|||
|
|
|
|||
|
|
this.io.to(this.rooms.users).emit('animal_update', {
|
|||
|
|
type: 'animal_health',
|
|||
|
|
data: animalData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 向特定农场的订阅者发送动物更新
|
|||
|
|
if (animalData.farm_id) {
|
|||
|
|
this.io.to(`${this.rooms.farms}${animalData.farm_id}`).emit('farm_animal_update', {
|
|||
|
|
type: 'farm_animal',
|
|||
|
|
data: animalData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`动物健康数据更新已广播: 动物ID ${animalData.id}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 广播系统统计数据更新
|
|||
|
|
* @param {Object} statsData 统计数据
|
|||
|
|
*/
|
|||
|
|
broadcastStatsUpdate(statsData) {
|
|||
|
|
if (!this.io) return;
|
|||
|
|
|
|||
|
|
this.io.to(this.rooms.users).emit('stats_update', {
|
|||
|
|
type: 'system_stats',
|
|||
|
|
data: statsData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
logger.info('系统统计数据更新已广播');
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 发送性能监控数据
|
|||
|
|
* @param {Object} performanceData 性能数据
|
|||
|
|
*/
|
|||
|
|
broadcastPerformanceUpdate(performanceData) {
|
|||
|
|
if (!this.io) return;
|
|||
|
|
|
|||
|
|
// 只向管理员发送性能数据
|
|||
|
|
this.io.to(this.rooms.admins).emit('performance_update', {
|
|||
|
|
type: 'system_performance',
|
|||
|
|
data: performanceData,
|
|||
|
|
timestamp: new Date()
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
logger.info('性能监控数据已发送给管理员');
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 获取连接统计信息
|
|||
|
|
* @returns {Object} 连接统计
|
|||
|
|
*/
|
|||
|
|
getConnectionStats() {
|
|||
|
|
return {
|
|||
|
|
totalConnections: this.connectedClients.size,
|
|||
|
|
connectedUsers: Array.from(this.connectedClients.values()).map(client => ({
|
|||
|
|
userId: client.userId,
|
|||
|
|
username: client.username,
|
|||
|
|
connectedAt: client.connectedAt
|
|||
|
|
}))
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 向特定用户发送消息
|
|||
|
|
* @param {number} userId 用户ID
|
|||
|
|
* @param {string} event 事件名称
|
|||
|
|
* @param {Object} data 数据
|
|||
|
|
*/
|
|||
|
|
sendToUser(userId, event, data) {
|
|||
|
|
if (!this.io) return;
|
|||
|
|
|
|||
|
|
for (const [socketId, clientInfo] of this.connectedClients.entries()) {
|
|||
|
|
if (clientInfo.userId === userId) {
|
|||
|
|
this.io.to(socketId).emit(event, data);
|
|||
|
|
logger.info(`消息已发送给用户 ${clientInfo.username}: ${event}`);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建单例实例
|
|||
|
|
const webSocketManager = new WebSocketManager();
|
|||
|
|
|
|||
|
|
module.exports = webSocketManager;
|