更新PM2配置并添加相关脚本
This commit is contained in:
@@ -1,203 +0,0 @@
|
||||
const amqp = require('amqplib');
|
||||
|
||||
class RabbitMQConfig {
|
||||
constructor() {
|
||||
this.connection = null;
|
||||
this.channel = null;
|
||||
this.isConnected = false;
|
||||
this.exchanges = new Map();
|
||||
this.queues = new Map();
|
||||
}
|
||||
|
||||
// 获取连接URL
|
||||
getConnectionUrl() {
|
||||
const host = process.env.RABBITMQ_HOST || 'rabbitmq.jiebanke.com';
|
||||
const port = process.env.RABBITMQ_PORT || 5672;
|
||||
const username = process.env.RABBITMQ_USERNAME || 'guest';
|
||||
const password = process.env.RABBITMQ_PASSWORD || 'guest';
|
||||
const vhost = process.env.RABBITMQ_VHOST || '/';
|
||||
|
||||
return `amqp://${username}:${password}@${host}:${port}/${vhost}`;
|
||||
}
|
||||
|
||||
// 连接RabbitMQ
|
||||
async connect() {
|
||||
if (this.isConnected) {
|
||||
return { connection: this.connection, channel: this.channel };
|
||||
}
|
||||
|
||||
try {
|
||||
const url = this.getConnectionUrl();
|
||||
this.connection = await amqp.connect(url);
|
||||
|
||||
this.connection.on('error', (err) => {
|
||||
console.error('RabbitMQ连接错误:', err);
|
||||
this.isConnected = false;
|
||||
});
|
||||
|
||||
this.connection.on('close', () => {
|
||||
console.log('❌ RabbitMQ连接关闭');
|
||||
this.isConnected = false;
|
||||
});
|
||||
|
||||
this.channel = await this.connection.createChannel();
|
||||
|
||||
this.channel.on('error', (err) => {
|
||||
console.error('RabbitMQ通道错误:', err);
|
||||
});
|
||||
|
||||
this.channel.on('close', () => {
|
||||
console.log('❌ RabbitMQ通道关闭');
|
||||
});
|
||||
|
||||
this.isConnected = true;
|
||||
console.log('✅ RabbitMQ连接成功');
|
||||
|
||||
// 声明默认交换器
|
||||
await this.setupDefaultExchanges();
|
||||
|
||||
return { connection: this.connection, channel: this.channel };
|
||||
} catch (error) {
|
||||
console.error('RabbitMQ连接失败:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// 设置默认交换器
|
||||
async setupDefaultExchanges() {
|
||||
const exchanges = [
|
||||
{ name: 'jiebanke.direct', type: 'direct', durable: true },
|
||||
{ name: 'jiebanke.topic', type: 'topic', durable: true },
|
||||
{ name: 'jiebanke.fanout', type: 'fanout', durable: true },
|
||||
{ name: 'jiebanke.delay', type: 'x-delayed-message', durable: true, arguments: { 'x-delayed-type': 'direct' } }
|
||||
];
|
||||
|
||||
for (const exchange of exchanges) {
|
||||
await this.channel.assertExchange(exchange.name, exchange.type, {
|
||||
durable: exchange.durable,
|
||||
arguments: exchange.arguments
|
||||
});
|
||||
this.exchanges.set(exchange.name, exchange);
|
||||
}
|
||||
}
|
||||
|
||||
// 声明队列
|
||||
async assertQueue(queueName, options = {}) {
|
||||
if (!this.isConnected) {
|
||||
await this.connect();
|
||||
}
|
||||
|
||||
const queueOptions = {
|
||||
durable: true,
|
||||
arguments: {
|
||||
'x-message-ttl': 86400000, // 24小时消息过期时间
|
||||
...options.arguments
|
||||
},
|
||||
...options
|
||||
};
|
||||
|
||||
const queue = await this.channel.assertQueue(queueName, queueOptions);
|
||||
this.queues.set(queueName, queue);
|
||||
return queue;
|
||||
}
|
||||
|
||||
// 绑定队列到交换器
|
||||
async bindQueue(queueName, exchangeName, routingKey = '') {
|
||||
if (!this.isConnected) {
|
||||
await this.connect();
|
||||
}
|
||||
|
||||
await this.channel.bindQueue(queueName, exchangeName, routingKey);
|
||||
console.log(`✅ 队列 ${queueName} 绑定到交换器 ${exchangeName},路由键: ${routingKey}`);
|
||||
}
|
||||
|
||||
// 发布消息
|
||||
async publish(exchangeName, routingKey, message, options = {}) {
|
||||
if (!this.isConnected) {
|
||||
await this.connect();
|
||||
}
|
||||
|
||||
const messageBuffer = Buffer.from(JSON.stringify({
|
||||
timestamp: new Date().toISOString(),
|
||||
data: message
|
||||
}));
|
||||
|
||||
const publishOptions = {
|
||||
persistent: true,
|
||||
contentType: 'application/json',
|
||||
...options
|
||||
};
|
||||
|
||||
return this.channel.publish(exchangeName, routingKey, messageBuffer, publishOptions);
|
||||
}
|
||||
|
||||
// 消费消息
|
||||
async consume(queueName, callback, options = {}) {
|
||||
if (!this.isConnected) {
|
||||
await this.connect();
|
||||
}
|
||||
|
||||
const consumeOptions = {
|
||||
noAck: false,
|
||||
...options
|
||||
};
|
||||
|
||||
return this.channel.consume(queueName, async (msg) => {
|
||||
try {
|
||||
if (msg !== null) {
|
||||
const content = JSON.parse(msg.content.toString());
|
||||
await callback(content, msg);
|
||||
this.channel.ack(msg);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('消息处理错误:', error);
|
||||
this.channel.nack(msg, false, false); // 不重新入队
|
||||
}
|
||||
}, consumeOptions);
|
||||
}
|
||||
|
||||
// 健康检查
|
||||
async healthCheck() {
|
||||
try {
|
||||
if (!this.isConnected) {
|
||||
throw new Error('RabbitMQ未连接');
|
||||
}
|
||||
|
||||
return {
|
||||
status: 'healthy',
|
||||
host: process.env.RABBITMQ_HOST || 'rabbitmq.jiebanke.com',
|
||||
port: process.env.RABBITMQ_PORT || 5672,
|
||||
connected: this.isConnected
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
status: 'unhealthy',
|
||||
error: error.message,
|
||||
host: process.env.RABBITMQ_HOST || 'rabbitmq.jiebanke.com',
|
||||
port: process.env.RABBITMQ_PORT || 5672,
|
||||
connected: false
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// 优雅关闭
|
||||
async close() {
|
||||
try {
|
||||
if (this.channel) {
|
||||
await this.channel.close();
|
||||
}
|
||||
if (this.connection) {
|
||||
await this.connection.close();
|
||||
}
|
||||
this.isConnected = false;
|
||||
console.log('✅ RabbitMQ连接已关闭');
|
||||
} catch (error) {
|
||||
console.error('关闭RabbitMQ连接时出错:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 创建全局RabbitMQ实例
|
||||
const rabbitMQConfig = new RabbitMQConfig();
|
||||
|
||||
module.exports = rabbitMQConfig;
|
||||
@@ -1,119 +0,0 @@
|
||||
const redis = require('redis');
|
||||
|
||||
class RedisConfig {
|
||||
constructor() {
|
||||
this.client = null;
|
||||
this.isConnected = false;
|
||||
}
|
||||
|
||||
// 创建Redis客户端
|
||||
createClient() {
|
||||
const redisConfig = {
|
||||
socket: {
|
||||
host: process.env.REDIS_HOST || 'redis.jiebanke.com',
|
||||
port: process.env.REDIS_PORT || 6379,
|
||||
reconnectStrategy: (retries) => {
|
||||
const delay = Math.min(retries * 100, 3000);
|
||||
console.log(`Redis连接重试第${retries + 1}次,延迟${delay}ms`);
|
||||
return delay;
|
||||
}
|
||||
},
|
||||
password: process.env.REDIS_PASSWORD || null,
|
||||
database: process.env.REDIS_DB || 0
|
||||
};
|
||||
|
||||
// 移除空配置项
|
||||
if (!redisConfig.password) delete redisConfig.password;
|
||||
|
||||
this.client = redis.createClient(redisConfig);
|
||||
|
||||
// 错误处理
|
||||
this.client.on('error', (err) => {
|
||||
console.error('Redis错误:', err);
|
||||
this.isConnected = false;
|
||||
});
|
||||
|
||||
// 连接成功
|
||||
this.client.on('connect', () => {
|
||||
console.log('✅ Redis连接中...');
|
||||
});
|
||||
|
||||
// 准备就绪
|
||||
this.client.on('ready', () => {
|
||||
this.isConnected = true;
|
||||
console.log('✅ Redis连接就绪');
|
||||
});
|
||||
|
||||
// 连接断开
|
||||
this.client.on('end', () => {
|
||||
this.isConnected = false;
|
||||
console.log('❌ Redis连接断开');
|
||||
});
|
||||
|
||||
// 重连
|
||||
this.client.on('reconnecting', () => {
|
||||
console.log('🔄 Redis重新连接中...');
|
||||
});
|
||||
|
||||
return this.client;
|
||||
}
|
||||
|
||||
// 连接Redis
|
||||
async connect() {
|
||||
if (this.client && this.isConnected) {
|
||||
return this.client;
|
||||
}
|
||||
|
||||
// 开发环境下,如果Redis未配置,则不连接
|
||||
if (process.env.NODE_ENV === 'development' &&
|
||||
(!process.env.REDIS_HOST || process.env.REDIS_HOST === 'redis.jiebanke.com')) {
|
||||
console.log('⚠️ 开发环境未配置Redis,跳过连接');
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
this.createClient();
|
||||
await this.client.connect();
|
||||
return this.client;
|
||||
} catch (error) {
|
||||
console.error('Redis连接失败:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// 断开连接
|
||||
async disconnect() {
|
||||
if (this.client) {
|
||||
await this.client.quit();
|
||||
this.isConnected = false;
|
||||
console.log('✅ Redis连接已关闭');
|
||||
}
|
||||
}
|
||||
|
||||
// 获取客户端状态
|
||||
getStatus() {
|
||||
return {
|
||||
isConnected: this.isConnected,
|
||||
host: process.env.REDIS_HOST || 'redis.jiebanke.com',
|
||||
port: process.env.REDIS_PORT || 6379
|
||||
};
|
||||
}
|
||||
|
||||
// 健康检查
|
||||
async healthCheck() {
|
||||
try {
|
||||
if (!this.isConnected) {
|
||||
throw new Error('Redis未连接');
|
||||
}
|
||||
await this.client.ping();
|
||||
return { status: 'healthy', ...this.getStatus() };
|
||||
} catch (error) {
|
||||
return { status: 'unhealthy', error: error.message, ...this.getStatus() };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 创建全局Redis实例
|
||||
const redisConfig = new RedisConfig();
|
||||
|
||||
module.exports = redisConfig;
|
||||
Reference in New Issue
Block a user