/** * 数据库连接池配置 * @file database-pool.js * @description 配置和管理Sequelize数据库连接池 */ const { Sequelize } = require('sequelize'); const { EventEmitter } = require('events'); const logger = require('../utils/logger'); const ormConfig = require('./orm-config'); // 从环境变量获取数据库连接参数 const DB_DIALECT = process.env.DB_DIALECT || 'mysql'; const DB_STORAGE = process.env.DB_STORAGE || './database.sqlite'; const DB_NAME = process.env.DB_NAME || 'nxTest'; const DB_USER = process.env.DB_USER || 'root'; const DB_PASSWORD = process.env.DB_PASSWORD || 'Aiotagro@741'; const DB_HOST = process.env.DB_HOST || '129.211.213.226'; const DB_PORT = process.env.DB_PORT || 3306; // 数据库连接池事件发射器 class DatabasePoolEmitter extends EventEmitter {} const poolEvents = new DatabasePoolEmitter(); // 默认连接池配置 const DEFAULT_POOL_CONFIG = { max: parseInt(process.env.DB_POOL_MAX || '10'), // 最大连接数 min: parseInt(process.env.DB_POOL_MIN || '2'), // 最小连接数 acquire: parseInt(process.env.DB_POOL_ACQUIRE || '30000'), // 获取连接超时时间(毫秒) idle: parseInt(process.env.DB_POOL_IDLE || '10000'), // 连接空闲多久后释放(毫秒) evict: parseInt(process.env.DB_POOL_EVICT || '1000'), // 多久检查一次空闲连接(毫秒) }; // 创建Sequelize实例 let sequelize; if (DB_DIALECT === 'sqlite') { sequelize = new Sequelize({ dialect: 'sqlite', storage: DB_STORAGE, logging: (msg) => logger.debug(msg), benchmark: process.env.NODE_ENV !== 'production', pool: DEFAULT_POOL_CONFIG, define: ormConfig.defaultModelOptions }); } else { sequelize = new Sequelize(DB_NAME, DB_USER, DB_PASSWORD, { host: DB_HOST, port: DB_PORT, dialect: DB_DIALECT, logging: (msg) => logger.debug(msg), benchmark: process.env.NODE_ENV !== 'production', pool: DEFAULT_POOL_CONFIG, define: ormConfig.defaultModelOptions, dialectOptions: { charset: 'utf8mb4', supportBigNumbers: true, bigNumberStrings: true, dateStrings: true, multipleStatements: process.env.DB_MULTIPLE_STATEMENTS === 'true' }, timezone: '+08:00' }); } // 监听连接池事件 - 使用Sequelize实例的hooks sequelize.addHook('afterConnect', (connection, config) => { logger.info(`数据库连接已建立`); poolEvents.emit('connect', connection); }); sequelize.addHook('beforeDisconnect', (connection) => { logger.info(`数据库连接即将断开`); poolEvents.emit('disconnect', connection); }); // 注意:acquire和release事件在新版Sequelize中需要通过其他方式监听 // 这里我们使用定时器来监控连接池状态 setInterval(() => { if (sequelize.connectionManager && sequelize.connectionManager.pool) { const pool = sequelize.connectionManager.pool; poolEvents.emit('poolStatus', { size: pool.size || 0, available: pool.available || 0, using: pool.using || 0, waiting: pool.waiting || 0 }); } }, 30000); // 每30秒检查一次连接池状态 // 测试数据库连接 async function testConnection() { try { await sequelize.authenticate(); logger.info('数据库连接测试成功'); poolEvents.emit('connectionSuccess'); return { success: true, message: '数据库连接测试成功' }; } catch (error) { logger.error('数据库连接测试失败:', error); poolEvents.emit('connectionError', error); return { success: false, message: error.message }; } } // 获取连接池状态 async function getPoolStatus() { try { const pool = sequelize.connectionManager.pool; if (!pool) { return { error: '连接池未初始化' }; } // 获取连接池统计信息 const status = { all: pool.size, // 所有连接数 idle: pool.idleCount, // 空闲连接数 used: pool.size - pool.idleCount, // 使用中的连接数 waiting: pool.pending, // 等待连接的请求数 max: pool.options.max, // 最大连接数 min: pool.options.min, // 最小连接数 acquire: pool.options.acquire, // 获取连接超时时间 idle: pool.options.idle, // 空闲超时时间 created: new Date().toISOString(), // 状态创建时间 utilization: (pool.size > 0) ? ((pool.size - pool.idleCount) / pool.size) * 100 : 0 // 利用率 }; poolEvents.emit('poolStatus', status); return status; } catch (error) { logger.error('获取连接池状态失败:', error); return { error: error.message }; } } // 监控连接池 async function monitorPool(interval = 60000) { try { const status = await getPoolStatus(); logger.debug('连接池状态:', status); // 检查连接池利用率 if (status.utilization > 80) { logger.warn(`连接池利用率过高: ${status.utilization.toFixed(2)}%`); poolEvents.emit('highUtilization', status); } // 检查等待连接的请求数 if (status.waiting > 5) { logger.warn(`连接池等待请求过多: ${status.waiting}`); poolEvents.emit('highWaiting', status); } return status; } catch (error) { logger.error('监控连接池失败:', error); return { error: error.message }; } } // 关闭连接池 async function closePool() { try { await sequelize.close(); logger.info('数据库连接池已关闭'); poolEvents.emit('poolClosed'); return { success: true, message: '数据库连接池已关闭' }; } catch (error) { logger.error('关闭数据库连接池失败:', error); return { success: false, message: error.message }; } } // 重置连接池 async function resetPool() { try { await closePool(); // 重新初始化连接池 sequelize.connectionManager.initPools(); // 测试新的连接池 const testResult = await testConnection(); if (testResult.success) { logger.info('数据库连接池已重置'); poolEvents.emit('poolReset'); return { success: true, message: '数据库连接池已重置' }; } else { throw new Error(testResult.message); } } catch (error) { logger.error('重置数据库连接池失败:', error); poolEvents.emit('poolResetError', error); return { success: false, message: error.message }; } } // 优化连接池配置 async function optimizePool(config = {}) { try { // 获取当前状态 const currentStatus = await getPoolStatus(); // 计算新的配置 const newConfig = { max: config.max || Math.max(currentStatus.max, Math.ceil(currentStatus.used * 1.5)), min: config.min || Math.min(currentStatus.min, Math.floor(currentStatus.used * 0.5)), acquire: config.acquire || currentStatus.acquire, idle: config.idle || currentStatus.idle }; // 确保最小连接数不小于1 newConfig.min = Math.max(newConfig.min, 1); // 确保最大连接数不小于最小连接数 newConfig.max = Math.max(newConfig.max, newConfig.min); // 应用新配置 await closePool(); // 更新连接池配置 sequelize.options.pool = newConfig; // 重新初始化连接池 sequelize.connectionManager.initPools(); // 测试新的连接池 const testResult = await testConnection(); if (testResult.success) { logger.info('数据库连接池已优化:', newConfig); poolEvents.emit('poolOptimized', newConfig); return { success: true, message: '数据库连接池已优化', config: newConfig }; } else { throw new Error(testResult.message); } } catch (error) { logger.error('优化数据库连接池失败:', error); poolEvents.emit('poolOptimizationError', error); return { success: false, message: error.message }; } } // 获取数据库表列表 async function getTablesList() { try { const [results] = await sequelize.query( `SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? ORDER BY TABLE_NAME`, { replacements: [DB_NAME] } ); return results.map(row => row.TABLE_NAME); } catch (error) { logger.error('获取数据库表列表失败:', error); return []; } } // 导出模块 module.exports = { sequelize, testConnection, getPoolStatus, monitorPool, closePool, resetPool, optimizePool, getTablesList, events: poolEvents };