Generating commit message...

This commit is contained in:
2025-08-30 14:33:49 +08:00
parent 4d469e95f0
commit 7f9bfbb381
99 changed files with 69225 additions and 35 deletions

View File

@@ -0,0 +1,73 @@
const mysql = require('mysql2/promise');
// 数据库配置
const dbConfig = {
host: process.env.DB_HOST || '192.168.0.240',
port: process.env.DB_PORT || 3306,
user: process.env.DB_USER || 'root',
password: process.env.DB_PASSWORD || 'aiot$Aiot123',
database: process.env.DB_NAME || 'jiebandata',
connectionLimit: 10,
// 移除无效的配置选项 acquireTimeout 和 timeout
charset: 'utf8mb4',
timezone: '+08:00',
// 连接池配置
waitForConnections: true,
queueLimit: 0
};
// 创建连接池
const pool = mysql.createPool(dbConfig);
// 测试数据库连接
async function testConnection() {
try {
const connection = await pool.getConnection();
console.log('✅ MySQL数据库连接成功');
connection.release();
return true;
} catch (error) {
console.error('❌ MySQL数据库连接失败:', error.message);
return false;
}
}
// 执行查询
async function query(sql, params = []) {
try {
const [rows] = await pool.execute(sql, params);
return rows;
} catch (error) {
console.error('数据库查询错误:', error.message);
throw error;
}
}
// 执行事务
async function transaction(callback) {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
const result = await callback(connection);
await connection.commit();
return result;
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}
// 关闭连接池
async function closePool() {
await pool.end();
}
module.exports = {
pool,
query,
transaction,
testConnection,
closePool
};

View File

@@ -0,0 +1,203 @@
const amqp = require('amqplib');
class RabbitMQConfig {
constructor() {
this.connection = null;
this.channel = null;
this.isConnected = false;
this.exchanges = new Map();
this.queues = new Map();
}
// 获取连接URL
getConnectionUrl() {
const host = process.env.RABBITMQ_HOST || 'localhost';
const port = process.env.RABBITMQ_PORT || 5672;
const username = process.env.RABBITMQ_USERNAME || 'guest';
const password = process.env.RABBITMQ_PASSWORD || 'guest';
const vhost = process.env.RABBITMQ_VHOST || '/';
return `amqp://${username}:${password}@${host}:${port}/${vhost}`;
}
// 连接RabbitMQ
async connect() {
if (this.isConnected) {
return { connection: this.connection, channel: this.channel };
}
try {
const url = this.getConnectionUrl();
this.connection = await amqp.connect(url);
this.connection.on('error', (err) => {
console.error('RabbitMQ连接错误:', err);
this.isConnected = false;
});
this.connection.on('close', () => {
console.log('❌ RabbitMQ连接关闭');
this.isConnected = false;
});
this.channel = await this.connection.createChannel();
this.channel.on('error', (err) => {
console.error('RabbitMQ通道错误:', err);
});
this.channel.on('close', () => {
console.log('❌ RabbitMQ通道关闭');
});
this.isConnected = true;
console.log('✅ RabbitMQ连接成功');
// 声明默认交换器
await this.setupDefaultExchanges();
return { connection: this.connection, channel: this.channel };
} catch (error) {
console.error('RabbitMQ连接失败:', error);
throw error;
}
}
// 设置默认交换器
async setupDefaultExchanges() {
const exchanges = [
{ name: 'jiebanke.direct', type: 'direct', durable: true },
{ name: 'jiebanke.topic', type: 'topic', durable: true },
{ name: 'jiebanke.fanout', type: 'fanout', durable: true },
{ name: 'jiebanke.delay', type: 'x-delayed-message', durable: true, arguments: { 'x-delayed-type': 'direct' } }
];
for (const exchange of exchanges) {
await this.channel.assertExchange(exchange.name, exchange.type, {
durable: exchange.durable,
arguments: exchange.arguments
});
this.exchanges.set(exchange.name, exchange);
}
}
// 声明队列
async assertQueue(queueName, options = {}) {
if (!this.isConnected) {
await this.connect();
}
const queueOptions = {
durable: true,
arguments: {
'x-message-ttl': 86400000, // 24小时消息过期时间
...options.arguments
},
...options
};
const queue = await this.channel.assertQueue(queueName, queueOptions);
this.queues.set(queueName, queue);
return queue;
}
// 绑定队列到交换器
async bindQueue(queueName, exchangeName, routingKey = '') {
if (!this.isConnected) {
await this.connect();
}
await this.channel.bindQueue(queueName, exchangeName, routingKey);
console.log(`✅ 队列 ${queueName} 绑定到交换器 ${exchangeName},路由键: ${routingKey}`);
}
// 发布消息
async publish(exchangeName, routingKey, message, options = {}) {
if (!this.isConnected) {
await this.connect();
}
const messageBuffer = Buffer.from(JSON.stringify({
timestamp: new Date().toISOString(),
data: message
}));
const publishOptions = {
persistent: true,
contentType: 'application/json',
...options
};
return this.channel.publish(exchangeName, routingKey, messageBuffer, publishOptions);
}
// 消费消息
async consume(queueName, callback, options = {}) {
if (!this.isConnected) {
await this.connect();
}
const consumeOptions = {
noAck: false,
...options
};
return this.channel.consume(queueName, async (msg) => {
try {
if (msg !== null) {
const content = JSON.parse(msg.content.toString());
await callback(content, msg);
this.channel.ack(msg);
}
} catch (error) {
console.error('消息处理错误:', error);
this.channel.nack(msg, false, false); // 不重新入队
}
}, consumeOptions);
}
// 健康检查
async healthCheck() {
try {
if (!this.isConnected) {
throw new Error('RabbitMQ未连接');
}
return {
status: 'healthy',
host: process.env.RABBITMQ_HOST || 'localhost',
port: process.env.RABBITMQ_PORT || 5672,
connected: this.isConnected
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
host: process.env.RABBITMQ_HOST || 'localhost',
port: process.env.RABBITMQ_PORT || 5672,
connected: false
};
}
}
// 优雅关闭
async close() {
try {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
this.isConnected = false;
console.log('✅ RabbitMQ连接已关闭');
} catch (error) {
console.error('关闭RabbitMQ连接时出错:', error);
}
}
}
// 创建全局RabbitMQ实例
const rabbitMQConfig = new RabbitMQConfig();
module.exports = rabbitMQConfig;

119
backend/src/config/redis.js Normal file
View File

@@ -0,0 +1,119 @@
const redis = require('redis');
class RedisConfig {
constructor() {
this.client = null;
this.isConnected = false;
}
// 创建Redis客户端
createClient() {
const redisConfig = {
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
reconnectStrategy: (retries) => {
const delay = Math.min(retries * 100, 3000);
console.log(`Redis连接重试第${retries + 1}次,延迟${delay}ms`);
return delay;
}
},
password: process.env.REDIS_PASSWORD || null,
database: process.env.REDIS_DB || 0
};
// 移除空配置项
if (!redisConfig.password) delete redisConfig.password;
this.client = redis.createClient(redisConfig);
// 错误处理
this.client.on('error', (err) => {
console.error('Redis错误:', err);
this.isConnected = false;
});
// 连接成功
this.client.on('connect', () => {
console.log('✅ Redis连接中...');
});
// 准备就绪
this.client.on('ready', () => {
this.isConnected = true;
console.log('✅ Redis连接就绪');
});
// 连接断开
this.client.on('end', () => {
this.isConnected = false;
console.log('❌ Redis连接断开');
});
// 重连
this.client.on('reconnecting', () => {
console.log('🔄 Redis重新连接中...');
});
return this.client;
}
// 连接Redis
async connect() {
if (this.client && this.isConnected) {
return this.client;
}
// 开发环境下如果Redis未配置则不连接
if (process.env.NODE_ENV === 'development' &&
(!process.env.REDIS_HOST || process.env.REDIS_HOST === 'localhost')) {
console.log('⚠️ 开发环境未配置Redis跳过连接');
return null;
}
try {
this.createClient();
await this.client.connect();
return this.client;
} catch (error) {
console.error('Redis连接失败:', error);
throw error;
}
}
// 断开连接
async disconnect() {
if (this.client) {
await this.client.quit();
this.isConnected = false;
console.log('✅ Redis连接已关闭');
}
}
// 获取客户端状态
getStatus() {
return {
isConnected: this.isConnected,
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
};
}
// 健康检查
async healthCheck() {
try {
if (!this.isConnected) {
throw new Error('Redis未连接');
}
await this.client.ping();
return { status: 'healthy', ...this.getStatus() };
} catch (error) {
return { status: 'unhealthy', error: error.message, ...this.getStatus() };
}
}
}
// 创建全局Redis实例
const redisConfig = new RedisConfig();
module.exports = redisConfig;