# 解班客项目安全文档 ## 1. 安全概述 ### 1.1 安全目标 - **数据保护**:确保用户数据和业务数据的机密性、完整性和可用性 - **系统安全**:防范各类网络攻击和安全威胁 - **合规要求**:满足相关法律法规和行业标准要求 - **隐私保护**:保护用户隐私信息不被泄露或滥用 ### 1.2 安全架构 ```mermaid graph TB A[用户] --> B[CDN/WAF] B --> C[负载均衡器] C --> D[Web应用防火墙] D --> E[反向代理] E --> F[应用服务器] F --> G[数据库] F --> H[缓存服务] F --> I[文件存储] J[安全监控] --> K[入侵检测] J --> L[日志审计] J --> M[漏洞扫描] N[身份认证] --> O[JWT令牌] N --> P[多因子认证] N --> Q[权限控制] subgraph "安全防护层" B D E end subgraph "应用安全层" F N end subgraph "数据安全层" G H I end subgraph "监控审计层" J K L M end ``` ### 1.3 安全原则 - **最小权限原则**:用户和系统组件只获得完成任务所需的最小权限 - **纵深防御**:多层安全防护,单点失效不影响整体安全 - **零信任架构**:不信任任何用户或设备,始终验证身份和权限 - **数据分类保护**:根据数据敏感性实施不同级别的保护措施 ## 2. 威胁分析 ### 2.1 威胁模型 ```mermaid graph LR A[威胁来源] --> B[外部攻击者] A --> C[内部威胁] A --> D[第三方风险] B --> B1[黑客攻击] B --> B2[恶意软件] B --> B3[DDoS攻击] C --> C1[内部人员] C --> C2[权限滥用] C --> C3[数据泄露] D --> D1[供应商风险] D --> D2[第三方服务] D --> D3[开源组件] ``` ### 2.2 风险评估矩阵 | 威胁类型 | 可能性 | 影响程度 | 风险等级 | 防护措施 | |---------|--------|----------|----------|----------| | SQL注入 | 中 | 高 | 高 | 参数化查询、输入验证 | | XSS攻击 | 高 | 中 | 高 | 输出编码、CSP策略 | | CSRF攻击 | 中 | 中 | 中 | CSRF令牌、同源检查 | | 暴力破解 | 高 | 中 | 高 | 账户锁定、验证码 | | 数据泄露 | 低 | 高 | 中 | 数据加密、访问控制 | | DDoS攻击 | 中 | 高 | 高 | 流量清洗、限流 | | 内部威胁 | 低 | 高 | 中 | 权限管理、审计日志 | ### 2.3 攻击场景分析 #### 2.3.1 Web应用攻击 ```javascript // 常见攻击示例和防护 const securityExamples = { // SQL注入攻击示例 sqlInjection: { vulnerable: "SELECT * FROM users WHERE id = " + userId, secure: "SELECT * FROM users WHERE id = ?", // 使用参数化查询 prevention: [ "使用ORM框架", "参数化查询", "输入验证", "最小权限数据库用户" ] }, // XSS攻击示例 xssAttack: { vulnerable: `
${userInput}
`, secure: `
${escapeHtml(userInput)}
`, prevention: [ "输出编码", "CSP策略", "输入验证", "使用安全的模板引擎" ] }, // CSRF攻击示例 csrfAttack: { vulnerable: "POST /api/transfer without token", secure: "POST /api/transfer with CSRF token", prevention: [ "CSRF令牌", "SameSite Cookie", "双重提交Cookie", "验证Referer头" ] } }; ``` ## 3. 身份认证与授权 ### 3.1 认证架构 ```mermaid sequenceDiagram participant U as 用户 participant F as 前端 participant A as 认证服务 participant R as 资源服务 participant D as 数据库 U->>F: 登录请求 F->>A: 提交凭证 A->>D: 验证用户 D-->>A: 用户信息 A->>A: 生成JWT令牌 A-->>F: 返回令牌 F-->>U: 登录成功 U->>F: 访问资源 F->>R: 携带JWT令牌 R->>A: 验证令牌 A-->>R: 令牌有效 R->>D: 查询数据 D-->>R: 返回数据 R-->>F: 返回结果 F-->>U: 显示内容 ``` ### 3.2 JWT令牌安全配置 ```javascript // JWT配置 const jwtConfig = { // 令牌配置 secret: process.env.JWT_SECRET, // 256位随机密钥 algorithm: 'HS256', expiresIn: '2h', // 访问令牌2小时过期 refreshExpiresIn: '7d', // 刷新令牌7天过期 // 安全选项 issuer: 'jiebanke.com', audience: 'jiebanke-api', notBefore: 0, // 令牌载荷 payload: { userId: 'user.id', username: 'user.username', role: 'user.role', permissions: 'user.permissions' } }; // JWT中间件 const jwtMiddleware = (req, res, next) => { const token = req.headers.authorization?.replace('Bearer ', ''); if (!token) { return res.status(401).json({ error: '缺少访问令牌' }); } try { const decoded = jwt.verify(token, jwtConfig.secret, { issuer: jwtConfig.issuer, audience: jwtConfig.audience, algorithms: [jwtConfig.algorithm] }); // 检查令牌黑名单 if (await isTokenBlacklisted(token)) { return res.status(401).json({ error: '令牌已失效' }); } req.user = decoded; next(); } catch (error) { return res.status(401).json({ error: '无效的访问令牌' }); } }; ``` ### 3.3 权限控制系统 ```javascript // RBAC权限模型 const rbacModel = { roles: { admin: { name: '管理员', permissions: ['*'] // 所有权限 }, moderator: { name: '版主', permissions: [ 'trip:read', 'trip:update', 'trip:delete', 'user:read', 'comment:moderate' ] }, user: { name: '普通用户', permissions: [ 'trip:read', 'trip:create', 'trip:update:own', 'comment:create', 'profile:update:own' ] } }, resources: { trip: ['create', 'read', 'update', 'delete'], user: ['create', 'read', 'update', 'delete'], comment: ['create', 'read', 'update', 'delete', 'moderate'], profile: ['read', 'update'] } }; // 权限检查中间件 const checkPermission = (resource, action) => { return (req, res, next) => { const user = req.user; const userRole = user.role; const userPermissions = rbacModel.roles[userRole]?.permissions || []; // 检查是否有通配符权限 if (userPermissions.includes('*')) { return next(); } // 检查具体权限 const requiredPermission = `${resource}:${action}`; const hasPermission = userPermissions.some(permission => { if (permission === requiredPermission) return true; if (permission.endsWith(':*') && requiredPermission.startsWith(permission.slice(0, -1))) return true; return false; }); // 检查资源所有权 if (!hasPermission && action.endsWith(':own')) { const basePermission = requiredPermission.replace(':own', ''); if (userPermissions.includes(basePermission + ':own')) { // 需要验证资源所有权 return checkResourceOwnership(resource, req, res, next); } } if (!hasPermission) { return res.status(403).json({ error: '权限不足' }); } next(); }; }; ``` ### 3.4 多因子认证 ```javascript // 多因子认证实现 const mfaService = { // 生成TOTP密钥 generateSecret: (userId) => { const secret = speakeasy.generateSecret({ name: `解班客 (${userId})`, issuer: '解班客', length: 32 }); return { secret: secret.base32, qrCode: qrcode.toDataURL(secret.otpauth_url) }; }, // 验证TOTP令牌 verifyToken: (secret, token) => { return speakeasy.totp.verify({ secret: secret, encoding: 'base32', token: token, window: 2 // 允许时间偏差 }); }, // 生成备用码 generateBackupCodes: () => { const codes = []; for (let i = 0; i < 10; i++) { codes.push(crypto.randomBytes(4).toString('hex').toUpperCase()); } return codes; } }; // MFA中间件 const mfaMiddleware = async (req, res, next) => { const user = req.user; // 检查用户是否启用了MFA const userMfa = await getUserMfaSettings(user.userId); if (!userMfa.enabled) { return next(); } const mfaToken = req.headers['x-mfa-token']; if (!mfaToken) { return res.status(401).json({ error: '需要多因子认证', mfaRequired: true }); } // 验证MFA令牌 const isValid = mfaService.verifyToken(userMfa.secret, mfaToken); if (!isValid) { // 尝试备用码 const backupCodeValid = await verifyBackupCode(user.userId, mfaToken); if (!backupCodeValid) { return res.status(401).json({ error: '多因子认证失败' }); } } next(); }; ``` ## 4. 数据安全 ### 4.1 数据分类与保护 | 数据类型 | 敏感级别 | 保护措施 | 访问控制 | |---------|----------|----------|----------| | 用户密码 | 极高 | bcrypt加密、盐值 | 仅认证服务访问 | | 身份证号 | 高 | AES-256加密 | 管理员+审计日志 | | 手机号码 | 高 | AES-256加密 | 业务需要+脱敏显示 | | 邮箱地址 | 中 | 明文存储 | 用户本人+管理员 | | 旅行信息 | 中 | 明文存储 | 公开+隐私设置 | | 系统日志 | 低 | 压缩存储 | 运维人员 | ### 4.2 数据加密实现 ```javascript // 数据加密服务 const encryptionService = { // AES-256-GCM加密 encrypt: (plaintext, key = process.env.ENCRYPTION_KEY) => { const iv = crypto.randomBytes(16); const cipher = crypto.createCipher('aes-256-gcm', key, iv); let encrypted = cipher.update(plaintext, 'utf8', 'hex'); encrypted += cipher.final('hex'); const authTag = cipher.getAuthTag(); return { encrypted, iv: iv.toString('hex'), authTag: authTag.toString('hex') }; }, // AES-256-GCM解密 decrypt: (encryptedData, key = process.env.ENCRYPTION_KEY) => { const decipher = crypto.createDecipher('aes-256-gcm', key, Buffer.from(encryptedData.iv, 'hex')); decipher.setAuthTag(Buffer.from(encryptedData.authTag, 'hex')); let decrypted = decipher.update(encryptedData.encrypted, 'hex', 'utf8'); decrypted += decipher.final('utf8'); return decrypted; }, // 密码哈希 hashPassword: async (password) => { const saltRounds = 12; return await bcrypt.hash(password, saltRounds); }, // 密码验证 verifyPassword: async (password, hash) => { return await bcrypt.compare(password, hash); } }; // 敏感数据处理中间件 const sensitiveDataMiddleware = { // 加密敏感字段 encryptSensitiveFields: (data, fields) => { const result = { ...data }; fields.forEach(field => { if (result[field]) { result[field] = encryptionService.encrypt(result[field]); } }); return result; }, // 解密敏感字段 decryptSensitiveFields: (data, fields) => { const result = { ...data }; fields.forEach(field => { if (result[field] && typeof result[field] === 'object') { result[field] = encryptionService.decrypt(result[field]); } }); return result; }, // 数据脱敏 maskSensitiveData: (data, field) => { if (!data[field]) return data; const value = data[field]; let masked; switch (field) { case 'phone': masked = value.replace(/(\d{3})\d{4}(\d{4})/, '$1****$2'); break; case 'email': masked = value.replace(/(.{2}).*(@.*)/, '$1***$2'); break; case 'idCard': masked = value.replace(/(\d{6})\d{8}(\d{4})/, '$1********$2'); break; default: masked = '***'; } return { ...data, [field]: masked }; } }; ``` ### 4.3 数据库安全配置 ```sql -- 数据库安全配置 -- 1. 创建专用数据库用户 CREATE USER 'jiebanke_app'@'%' IDENTIFIED BY 'strong_password_here'; CREATE USER 'jiebanke_readonly'@'%' IDENTIFIED BY 'readonly_password_here'; -- 2. 授予最小权限 GRANT SELECT, INSERT, UPDATE, DELETE ON jiebanke.* TO 'jiebanke_app'@'%'; GRANT SELECT ON jiebanke.* TO 'jiebanke_readonly'@'%'; -- 3. 禁用危险权限 REVOKE FILE ON *.* FROM 'jiebanke_app'@'%'; REVOKE PROCESS ON *.* FROM 'jiebanke_app'@'%'; REVOKE SUPER ON *.* FROM 'jiebanke_app'@'%'; -- 4. 启用SSL连接 ALTER USER 'jiebanke_app'@'%' REQUIRE SSL; -- 5. 设置连接限制 ALTER USER 'jiebanke_app'@'%' WITH MAX_CONNECTIONS_PER_HOUR 1000; ALTER USER 'jiebanke_app'@'%' WITH MAX_QUERIES_PER_HOUR 10000; -- 6. 创建审计表 CREATE TABLE security_audit_log ( id BIGINT AUTO_INCREMENT PRIMARY KEY, user_id INT, action VARCHAR(50), resource VARCHAR(100), ip_address VARCHAR(45), user_agent TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_user_id (user_id), INDEX idx_action (action), INDEX idx_created_at (created_at) ); ``` ## 5. 网络安全 ### 5.1 网络架构安全 ```mermaid graph TB A[Internet] --> B[CDN] B --> C[WAF] C --> D[Load Balancer] D --> E[DMZ] E --> F[Web Server] F --> G[Internal Network] G --> H[App Server] G --> I[Database Server] J[VPN] --> G K[Bastion Host] --> G subgraph "Public Zone" B C D end subgraph "DMZ Zone" E F end subgraph "Internal Zone" G H I end subgraph "Management Zone" J K end ``` ### 5.2 防火墙配置 ```bash #!/bin/bash # firewall-config.sh - 防火墙配置脚本 # 清空现有规则 iptables -F iptables -X iptables -t nat -F iptables -t nat -X # 设置默认策略 iptables -P INPUT DROP iptables -P FORWARD DROP iptables -P OUTPUT ACCEPT # 允许本地回环 iptables -A INPUT -i lo -j ACCEPT iptables -A OUTPUT -o lo -j ACCEPT # 允许已建立的连接 iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT # 允许SSH (限制IP) iptables -A INPUT -p tcp --dport 22 -s 192.168.1.0/24 -j ACCEPT # 允许HTTP/HTTPS iptables -A INPUT -p tcp --dport 80 -j ACCEPT iptables -A INPUT -p tcp --dport 443 -j ACCEPT # 允许应用端口 (仅内网) iptables -A INPUT -p tcp --dport 3000 -s 192.168.1.0/24 -j ACCEPT # 防止DDoS攻击 iptables -A INPUT -p tcp --dport 80 -m limit --limit 25/minute --limit-burst 100 -j ACCEPT iptables -A INPUT -p tcp --dport 443 -m limit --limit 25/minute --limit-burst 100 -j ACCEPT # 防止端口扫描 iptables -A INPUT -m recent --name portscan --rcheck --seconds 86400 -j DROP iptables -A INPUT -m recent --name portscan --remove iptables -A INPUT -p tcp -m tcp --dport 139 -m recent --name portscan --set -j LOG --log-prefix "portscan:" iptables -A INPUT -p tcp -m tcp --dport 139 -j DROP # 保存规则 iptables-save > /etc/iptables/rules.v4 echo "防火墙配置完成" ``` ### 5.3 WAF规则配置 ```nginx # WAF配置 - ModSecurity规则 # /etc/nginx/modsec/main.conf # 基础配置 SecRuleEngine On SecRequestBodyAccess On SecResponseBodyAccess Off SecRequestBodyLimit 13107200 SecRequestBodyNoFilesLimit 131072 # 日志配置 SecAuditEngine RelevantOnly SecAuditLog /var/log/nginx/modsec_audit.log SecAuditLogFormat JSON # SQL注入防护 SecRule ARGS "@detectSQLi" \ "id:1001,\ phase:2,\ block,\ msg:'SQL Injection Attack Detected',\ logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',\ tag:'application-multi',\ tag:'language-multi',\ tag:'platform-multi',\ tag:'attack-sqli'" # XSS攻击防护 SecRule ARGS "@detectXSS" \ "id:1002,\ phase:2,\ block,\ msg:'XSS Attack Detected',\ logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',\ tag:'application-multi',\ tag:'language-multi',\ tag:'platform-multi',\ tag:'attack-xss'" # 文件上传限制 SecRule FILES_TMPNAMES "@inspectFile /etc/nginx/modsec/file_inspection.lua" \ "id:1003,\ phase:2,\ block,\ msg:'Malicious File Upload Detected'" # 频率限制 SecRule IP:REQUEST_COUNT "@gt 100" \ "id:1004,\ phase:1,\ deny,\ msg:'Rate Limiting - Too Many Requests',\ setvar:ip.request_count=+1,\ expirevar:ip.request_count=60" # 地理位置限制 SecRule REMOTE_ADDR "@geoLookup" \ "id:1005,\ phase:1,\ chain,\ msg:'Request from blocked country'" SecRule GEO:COUNTRY_CODE "@within CN US JP KR" \ "t:none,\ deny" ``` ### 5.4 SSL/TLS配置 ```nginx # SSL/TLS安全配置 server { listen 443 ssl http2; server_name api.jiebanke.com; # SSL证书配置 ssl_certificate /etc/ssl/certs/jiebanke.crt; ssl_certificate_key /etc/ssl/private/jiebanke.key; # SSL协议和加密套件 ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-CHACHA20-POLY1305; ssl_prefer_server_ciphers off; # SSL会话配置 ssl_session_cache shared:SSL:10m; ssl_session_timeout 10m; ssl_session_tickets off; # OCSP装订 ssl_stapling on; ssl_stapling_verify on; ssl_trusted_certificate /etc/ssl/certs/ca-certificates.crt; resolver 8.8.8.8 8.8.4.4 valid=300s; resolver_timeout 5s; # 安全头部 add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always; add_header X-Frame-Options "SAMEORIGIN" always; add_header X-Content-Type-Options "nosniff" always; add_header X-XSS-Protection "1; mode=block" always; add_header Referrer-Policy "strict-origin-when-cross-origin" always; add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'; frame-ancestors 'self';" always; # 隐藏服务器信息 server_tokens off; more_clear_headers Server; location / { proxy_pass http://backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 安全头部 proxy_hide_header X-Powered-By; proxy_hide_header Server; } } ``` ## 6. 应用安全 ### 6.1 输入验证与过滤 ```javascript // 输入验证中间件 const inputValidation = { // 通用验证规则 rules: { username: { type: 'string', minLength: 3, maxLength: 20, pattern: /^[a-zA-Z0-9_]+$/, sanitize: true }, email: { type: 'email', maxLength: 100, sanitize: true }, password: { type: 'string', minLength: 8, maxLength: 128, pattern: /^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]/ }, phone: { type: 'string', pattern: /^1[3-9]\d{9}$/, sanitize: true } }, // 验证函数 validate: (data, rules) => { const errors = []; for (const [field, rule] of Object.entries(rules)) { const value = data[field]; // 必填检查 if (rule.required && (!value || value.trim() === '')) { errors.push(`${field} 是必填字段`); continue; } if (!value) continue; // 类型检查 if (rule.type === 'email' && !validator.isEmail(value)) { errors.push(`${field} 格式不正确`); } // 长度检查 if (rule.minLength && value.length < rule.minLength) { errors.push(`${field} 长度不能少于 ${rule.minLength} 个字符`); } if (rule.maxLength && value.length > rule.maxLength) { errors.push(`${field} 长度不能超过 ${rule.maxLength} 个字符`); } // 正则检查 if (rule.pattern && !rule.pattern.test(value)) { errors.push(`${field} 格式不正确`); } } return errors; }, // 数据清理 sanitize: (data, rules) => { const sanitized = {}; for (const [field, rule] of Object.entries(rules)) { let value = data[field]; if (!value) { sanitized[field] = value; continue; } if (rule.sanitize) { // HTML转义 value = validator.escape(value); // 去除首尾空格 value = value.trim(); // SQL注入防护 value = value.replace(/['"\\]/g, '\\$&'); } sanitized[field] = value; } return sanitized; } }; // 验证中间件 const validateInput = (rules) => { return (req, res, next) => { const errors = inputValidation.validate(req.body, rules); if (errors.length > 0) { return res.status(400).json({ error: '输入验证失败', details: errors }); } // 清理输入数据 req.body = inputValidation.sanitize(req.body, rules); next(); }; }; ``` ### 6.2 CSRF防护 ```javascript // CSRF防护中间件 const csrfProtection = { // 生成CSRF令牌 generateToken: (req) => { const token = crypto.randomBytes(32).toString('hex'); req.session.csrfToken = token; return token; }, // 验证CSRF令牌 verifyToken: (req) => { const sessionToken = req.session.csrfToken; const requestToken = req.headers['x-csrf-token'] || req.body._csrf; return sessionToken && requestToken && sessionToken === requestToken; }, // CSRF中间件 middleware: (req, res, next) => { // GET请求不需要CSRF验证 if (req.method === 'GET') { return next(); } // API请求使用JWT,不需要CSRF验证 if (req.path.startsWith('/api/')) { return next(); } if (!csrfProtection.verifyToken(req)) { return res.status(403).json({ error: 'CSRF令牌验证失败' }); } next(); } }; // 双重提交Cookie CSRF防护 const doubleSubmitCsrf = { middleware: (req, res, next) => { if (req.method === 'GET') { // 设置CSRF Cookie const token = crypto.randomBytes(32).toString('hex'); res.cookie('csrf-token', token, { httpOnly: false, secure: process.env.NODE_ENV === 'production', sameSite: 'strict' }); return next(); } const cookieToken = req.cookies['csrf-token']; const headerToken = req.headers['x-csrf-token']; if (!cookieToken || !headerToken || cookieToken !== headerToken) { return res.status(403).json({ error: 'CSRF验证失败' }); } next(); } }; ``` ### 6.3 会话安全 ```javascript // 会话安全配置 const sessionConfig = { secret: process.env.SESSION_SECRET, name: 'jiebanke.sid', resave: false, saveUninitialized: false, rolling: true, cookie: { secure: process.env.NODE_ENV === 'production', httpOnly: true, maxAge: 2 * 60 * 60 * 1000, // 2小时 sameSite: 'strict' }, store: new RedisStore({ client: redisClient, prefix: 'sess:', ttl: 2 * 60 * 60 // 2小时 }) }; // 会话安全中间件 const sessionSecurity = { // 会话固定攻击防护 regenerateSession: (req, res, next) => { if (req.session && req.session.regenerate) { req.session.regenerate((err) => { if (err) { return next(err); } next(); }); } else { next(); } }, // 会话劫持防护 validateSession: (req, res, next) => { if (!req.session.userAgent) { req.session.userAgent = req.headers['user-agent']; req.session.ipAddress = req.ip; } else { // 检查User-Agent和IP是否一致 if (req.session.userAgent !== req.headers['user-agent'] || req.session.ipAddress !== req.ip) { req.session.destroy(); return res.status(401).json({ error: '会话已失效,请重新登录' }); } } next(); }, // 并发会话控制 limitConcurrentSessions: async (req, res, next) => { if (!req.user) return next(); const userId = req.user.userId; const currentSessionId = req.sessionID; // 获取用户的所有活跃会话 const activeSessions = await redis.smembers(`user:${userId}:sessions`); // 限制最多3个并发会话 if (activeSessions.length >= 3 && !activeSessions.includes(currentSessionId)) { // 删除最旧的会话 const oldestSession = activeSessions[0]; await redis.srem(`user:${userId}:sessions`, oldestSession); await redis.del(`sess:${oldestSession}`); } // 添加当前会话 await redis.sadd(`user:${userId}:sessions`, currentSessionId); await redis.expire(`user:${userId}:sessions`, 2 * 60 * 60); next(); } }; ``` ## 7. 安全监控与审计 ### 7.1 安全事件监控 ```javascript // 安全事件监控系统 const securityMonitor = { // 事件类型定义 eventTypes: { LOGIN_SUCCESS: 'login_success', LOGIN_FAILURE: 'login_failure', PASSWORD_CHANGE: 'password_change', PERMISSION_DENIED: 'permission_denied', SUSPICIOUS_ACTIVITY: 'suspicious_activity', DATA_ACCESS: 'data_access', ADMIN_ACTION: 'admin_action' }, // 记录安全事件 logEvent: async (eventType, userId, details = {}) => { const event = { id: uuidv4(), type: eventType, userId: userId, timestamp: new Date(), ipAddress: details.ipAddress, userAgent: details.userAgent, resource: details.resource, action: details.action, result: details.result, riskLevel: details.riskLevel || 'low', metadata: details.metadata || {} }; // 存储到数据库 await SecurityAuditLog.create(event); // 发送到日志系统 logger.info('Security Event', event); // 检查是否需要告警 await this.checkAlerts(event); }, // 告警检查 checkAlerts: async (event) => { const alerts = []; // 登录失败次数检查 if (event.type === this.eventTypes.LOGIN_FAILURE) { const recentFailures = await this.getRecentEvents( event.userId, this.eventTypes.LOGIN_FAILURE, 15 * 60 * 1000 // 15分钟内 ); if (recentFailures.length >= 5) { alerts.push({ type: 'BRUTE_FORCE_ATTACK', severity: 'high', message: `用户 ${event.userId} 15分钟内登录失败${recentFailures.length}次` }); } } // 异常IP访问检查 if (event.ipAddress) { const userIPs = await this.getUserRecentIPs(event.userId, 24 * 60 * 60 * 1000); if (userIPs.length > 1 && !userIPs.includes(event.ipAddress)) { alerts.push({ type: 'UNUSUAL_IP_ACCESS', severity: 'medium', message: `用户 ${event.userId} 从新IP地址 ${event.ipAddress} 访问` }); } } // 权限提升检查 if (event.type === this.eventTypes.ADMIN_ACTION && event.userId) { alerts.push({ type: 'ADMIN_ACTION', severity: 'medium', message: `管理员 ${event.userId} 执行了操作: ${event.action}` }); } // 发送告警 for (const alert of alerts) { await this.sendAlert(alert, event); } }, // 发送安全告警 sendAlert: async (alert, event) => { const message = { title: `安全告警: ${alert.type}`, severity: alert.severity, message: alert.message, timestamp: event.timestamp, details: event }; // 发送到钉钉 await this.sendDingTalkAlert(message); // 发送邮件 if (alert.severity === 'high') { await this.sendEmailAlert(message); } // 记录告警 await SecurityAlert.create({ type: alert.type, severity: alert.severity, message: alert.message, eventId: event.id, status: 'open' }); } }; // 安全监控中间件 const securityMonitorMiddleware = (req, res, next) => { // 记录请求信息 const originalSend = res.send; res.send = function(data) { const statusCode = res.statusCode; // 记录安全相关事件 if (req.path.includes('/auth/login')) { const eventType = statusCode === 200 ? securityMonitor.eventTypes.LOGIN_SUCCESS : securityMonitor.eventTypes.LOGIN_FAILURE; securityMonitor.logEvent(eventType, req.body.username, { ipAddress: req.ip, userAgent: req.headers['user-agent'], result: statusCode === 200 ? 'success' : 'failure' }); } if (statusCode === 403) { securityMonitor.logEvent(securityMonitor.eventTypes.PERMISSION_DENIED, req.user?.userId, { ipAddress: req.ip, userAgent: req.headers['user-agent'], resource: req.path, action: req.method }); } originalSend.call(this, data); }; next(); }; ``` ### 7.2 入侵检测系统 ```python #!/usr/bin/env python3 # intrusion_detection.py - 入侵检测系统 import re import json import time from collections import defaultdict, deque from datetime import datetime, timedelta class IntrusionDetectionSystem: def __init__(self): self.attack_patterns = { 'sql_injection': [ r"(\bunion\b.*\bselect\b)", r"(\bselect\b.*\bfrom\b.*\bwhere\b)", r"(\bdrop\b.*\btable\b)", r"(\binsert\b.*\binto\b)", r"(\bdelete\b.*\bfrom\b)" ], 'xss_attack': [ r"]*>.*?", r"javascript:", r"on\w+\s*=", r"]*>.*?" ], 'path_traversal': [ r"\.\./", r"\.\.\\", r"/etc/passwd", r"/etc/shadow" ], 'command_injection': [ r";\s*(cat|ls|pwd|whoami)", r"\|\s*(cat|ls|pwd|whoami)", r"&&\s*(cat|ls|pwd|whoami)", r"`.*`" ] } self.ip_requests = defaultdict(lambda: deque(maxlen=100)) self.failed_logins = defaultdict(lambda: deque(maxlen=50)) def analyze_request(self, request_data): """分析单个请求""" alerts = [] # 检查攻击模式 for attack_type, patterns in self.attack_patterns.items(): for pattern in patterns: if self._check_pattern(request_data, pattern): alerts.append({ 'type': attack_type, 'severity': 'high', 'pattern': pattern, 'timestamp': datetime.now(), 'source_ip': request_data.get('ip'), 'request_uri': request_data.get('uri'), 'user_agent': request_data.get('user_agent') }) # 检查频率异常 ip = request_data.get('ip') if ip: self.ip_requests[ip].append(time.time()) if self._check_rate_limit(ip): alerts.append({ 'type': 'rate_limit_exceeded', 'severity': 'medium', 'source_ip': ip, 'request_count': len(self.ip_requests[ip]), 'timestamp': datetime.now() }) # 检查登录失败 if request_data.get('path') == '/auth/login' and request_data.get('status') >= 400: user = request_data.get('username', ip) self.failed_logins[user].append(time.time()) if len(self.failed_logins[user]) >= 5: alerts.append({ 'type': 'brute_force_attack', 'severity': 'high', 'target_user': user, 'source_ip': ip, 'failed_attempts': len(self.failed_logins[user]), 'timestamp': datetime.now() }) return alerts def _check_pattern(self, request_data, pattern): """检查请求是否匹配攻击模式""" text_fields = [ request_data.get('uri', ''), request_data.get('query_string', ''), request_data.get('post_data', ''), request_data.get('headers', {}).get('user-agent', '') ] for field in text_fields: if re.search(pattern, field, re.IGNORECASE): return True return False def _check_rate_limit(self, ip): """检查IP请求频率""" now = time.time() requests = self.ip_requests[ip] # 1分钟内超过100个请求 recent_requests = [req for req in requests if now - req < 60] return len(recent_requests) > 100 def generate_report(self, alerts): """生成入侵检测报告""" if not alerts: return None report = { 'timestamp': datetime.now().isoformat(), 'total_alerts': len(alerts), 'severity_breakdown': defaultdict(int), 'attack_types': defaultdict(int), 'top_source_ips': defaultdict(int), 'alerts': alerts } for alert in alerts: report['severity_breakdown'][alert['severity']] += 1 report['attack_types'][alert['type']] += 1 if 'source_ip' in alert: report['top_source_ips'][alert['source_ip']] += 1 return report def block_ip(self, ip, duration=3600): """阻止IP访问""" # 这里应该调用防火墙API或更新IP黑名单 print(f"Blocking IP {ip} for {duration} seconds") # 示例:更新iptables规则 import subprocess try: subprocess.run([ 'iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP' ], check=True) # 设置定时解除阻止 subprocess.run([ 'at', f'now + {duration} seconds' ], input=f'iptables -D INPUT -s {ip} -j DROP\n', text=True, check=True) except subprocess.CalledProcessError as e: print(f"Failed to block IP {ip}: {e}") # 使用示例 if __name__ == "__main__": ids = IntrusionDetectionSystem() # 模拟请求数据 request_data = { 'ip': '192.168.1.100', 'uri': '/api/users?id=1 UNION SELECT * FROM users', 'method': 'GET', 'status': 200, 'user_agent': 'Mozilla/5.0...' } alerts = ids.analyze_request(request_data) if alerts: report = ids.generate_report(alerts) print(json.dumps(report, indent=2, default=str)) # 自动阻止高危IP for alert in alerts: if alert['severity'] == 'high' and 'source_ip' in alert: ids.block_ip(alert['source_ip']) ``` ## 8. 安全测试 ### 8.1 自动化安全测试 ```python #!/usr/bin/env python3 # security_test.py - 自动化安全测试 import requests import json import time from urllib.parse import urljoin class SecurityTester: def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() self.vulnerabilities = [] def test_sql_injection(self): """SQL注入测试""" print("Testing SQL Injection...") payloads = [ "' OR '1'='1", "'; DROP TABLE users; --", "' UNION SELECT * FROM users --", "1' AND (SELECT COUNT(*) FROM users) > 0 --" ] test_endpoints = [ "/api/users", "/api/trips", "/api/auth/login" ] for endpoint in test_endpoints: for payload in payloads: params = {'id': payload} try: response = self.session.get( urljoin(self.base_url, endpoint), params=params, timeout=10 ) # 检查是否存在SQL错误信息 error_indicators = [ 'mysql_fetch_array', 'ORA-01756', 'Microsoft OLE DB Provider', 'SQLServer JDBC Driver', 'PostgreSQL query failed' ] for indicator in error_indicators: if indicator.lower() in response.text.lower(): self.vulnerabilities.append({ 'type': 'SQL Injection', 'severity': 'High', 'endpoint': endpoint, 'payload': payload, 'evidence': indicator }) break except requests.RequestException as e: print(f"Request failed: {e}") def test_xss(self): """XSS测试""" print("Testing XSS...") payloads = [ "", "", "javascript:alert('XSS')", "" ] test_endpoints = [ "/api/comments", "/api/trips", "/api/users/profile" ] for endpoint in test_endpoints: for payload in payloads: data = {'content': payload, 'title': payload} try: response = self.session.post( urljoin(self.base_url, endpoint), json=data, timeout=10 ) # 检查响应中是否包含未转义的payload if payload in response.text: self.vulnerabilities.append({ 'type': 'Cross-Site Scripting (XSS)', 'severity': 'High', 'endpoint': endpoint, 'payload': payload, 'evidence': 'Payload reflected in response' }) except requests.RequestException as e: print(f"Request failed: {e}") def test_authentication_bypass(self): """认证绕过测试""" print("Testing Authentication Bypass...") protected_endpoints = [ "/api/admin/users", "/api/admin/system", "/api/users/profile" ] for endpoint in protected_endpoints: try: # 不带认证头的请求 response = self.session.get( urljoin(self.base_url, endpoint), timeout=10 ) if response.status_code == 200: self.vulnerabilities.append({ 'type': 'Authentication Bypass', 'severity': 'High', 'endpoint': endpoint, 'evidence': f'Status code: {response.status_code}' }) # 测试无效token headers = {'Authorization': 'Bearer invalid_token'} response = self.session.get( urljoin(self.base_url, endpoint), headers=headers, timeout=10 ) if response.status_code == 200: self.vulnerabilities.append({ 'type': 'Invalid Token Accepted', 'severity': 'High', 'endpoint': endpoint, 'evidence': 'Invalid token accepted' }) except requests.RequestException as e: print(f"Request failed: {e}") def test_brute_force_protection(self): """暴力破解防护测试""" print("Testing Brute Force Protection...") login_endpoint = "/api/auth/login" credentials = { 'username': 'testuser', 'password': 'wrongpassword' } attempt_count = 0 max_attempts = 10 for i in range(max_attempts): try: response = self.session.post( urljoin(self.base_url, login_endpoint), json=credentials, timeout=10 ) attempt_count += 1 # 检查是否有速率限制 if response.status_code == 429: print(f"Rate limiting detected after {attempt_count} attempts") break elif response.status_code == 401: continue else: time.sleep(1) # 避免过快请求 except requests.RequestException as e: print(f"Request failed: {e}") break if attempt_count >= max_attempts: self.vulnerabilities.append({ 'type': 'No Brute Force Protection', 'severity': 'Medium', 'endpoint': login_endpoint, 'evidence': f'Allowed {max_attempts} failed login attempts' }) def test_file_upload_security(self): """文件上传安全测试""" print("Testing File Upload Security...") upload_endpoint = "/api/upload" # 测试恶意文件上传 malicious_files = [ ('shell.php', '', 'application/x-php'), ('script.js', 'alert("XSS")', 'application/javascript'), ('test.exe', b'\x4d\x5a\x90\x00', 'application/x-executable') ] for filename, content, content_type in malicious_files: files = { 'file': (filename, content, content_type) } try: response = self.session.post( urljoin(self.base_url, upload_endpoint), files=files, timeout=10 ) if response.status_code == 200: self.vulnerabilities.append({ 'type': 'Malicious File Upload', 'severity': 'High', 'endpoint': upload_endpoint, 'payload': filename, 'evidence': 'Malicious file accepted' }) except requests.RequestException as e: print(f"Request failed: {e}") def generate_report(self): """生成安全测试报告""" report = { 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'target': self.base_url, 'total_vulnerabilities': len(self.vulnerabilities), 'severity_breakdown': { 'High': 0, 'Medium': 0, 'Low': 0 }, 'vulnerabilities': self.vulnerabilities } for vuln in self.vulnerabilities: severity = vuln.get('severity', 'Low') report['severity_breakdown'][severity] += 1 return report def run_all_tests(self): """运行所有安全测试""" print(f"Starting security tests for {self.base_url}") self.test_sql_injection() self.test_xss() self.test_authentication_bypass() self.test_brute_force_protection() self.test_file_upload_security() report = self.generate_report() print("\n" + "="*50) print("SECURITY TEST REPORT") print("="*50) print(f"Target: {report['target']}") print(f"Total Vulnerabilities: {report['total_vulnerabilities']}") print(f"High: {report['severity_breakdown']['High']}") print(f"Medium: {report['severity_breakdown']['Medium']}") print(f"Low: {report['severity_breakdown']['Low']}") if self.vulnerabilities: print("\nVulnerabilities Found:") for i, vuln in enumerate(self.vulnerabilities, 1): print(f"\n{i}. {vuln['type']} ({vuln['severity']})") print(f" Endpoint: {vuln['endpoint']}") if 'payload' in vuln: print(f" Payload: {vuln['payload']}") print(f" Evidence: {vuln['evidence']}") else: print("\nNo vulnerabilities found!") return report # 使用示例 if __name__ == "__main__": tester = SecurityTester("http://localhost:3000") report = tester.run_all_tests() # 保存报告 with open(f"security_report_{int(time.time())}.json", 'w') as f: json.dump(report, f, indent=2) ``` ## 9. 应急响应 ### 9.1 安全事件响应流程 ```mermaid graph TD A[安全事件发生] --> B[事件检测] B --> C[事件分类] C --> D[影响评估] D --> E[响应级别确定] E --> F[一级响应] E --> G[二级响应] E --> H[三级响应] F --> I[立即隔离] G --> J[限制访问] H --> K[监控观察] I --> L[深入调查] J --> L K --> L L --> M[制定恢复计划] M --> N[系统恢复] N --> O[事后总结] ``` ### 9.2 应急响应脚本 ```bash #!/bin/bash # emergency-response.sh - 应急响应脚本 set -e LOG_FILE="/var/log/security/emergency-response.log" BACKUP_DIR="/opt/backup/emergency" QUARANTINE_DIR="/opt/quarantine" # 日志函数 log_message() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE" } # 发送告警 send_alert() { local message="$1" local level="$2" # 发送钉钉告警 curl -X POST "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN" \ -H "Content-Type: application/json" \ -d "{ \"msgtype\": \"text\", \"text\": { \"content\": \"【安全应急】[$level] $message\" }, \"at\": { \"isAtAll\": true } }" > /dev/null 2>&1 log_message "Alert sent: [$level] $message" } # 隔离受感染系统 isolate_system() { log_message "开始系统隔离..." # 断开网络连接 iptables -P INPUT DROP iptables -P OUTPUT DROP iptables -P FORWARD DROP # 保留SSH连接用于管理 iptables -A INPUT -p tcp --dport 22 -s 192.168.1.0/24 -j ACCEPT iptables -A OUTPUT -p tcp --sport 22 -d 192.168.1.0/24 -j ACCEPT # 停止非关键服务 systemctl stop nginx systemctl stop apache2 systemctl stop mysql send_alert "系统已隔离,网络连接已断开" "CRITICAL" log_message "系统隔离完成" } # 备份关键数据 backup_critical_data() { log_message "开始备份关键数据..." mkdir -p "$BACKUP_DIR/$(date +%Y%m%d_%H%M%S)" CURRENT_BACKUP="$BACKUP_DIR/$(date +%Y%m%d_%H%M%S)" # 备份数据库 mysqldump -u root -p"$DB_PASSWORD" --all-databases > "$CURRENT_BACKUP/database_backup.sql" # 备份配置文件 cp -r /etc/nginx "$CURRENT_BACKUP/" cp -r /etc/mysql "$CURRENT_BACKUP/" cp -r /opt/jiebanke/config "$CURRENT_BACKUP/" # 备份日志文件 cp -r /var/log "$CURRENT_BACKUP/" # 创建备份清单 find "$CURRENT_BACKUP" -type f -exec ls -la {} \; > "$CURRENT_BACKUP/backup_manifest.txt" log_message "数据备份完成: $CURRENT_BACKUP" } # 收集证据 collect_evidence() { log_message "开始收集安全事件证据..." EVIDENCE_DIR="/opt/evidence/$(date +%Y%m%d_%H%M%S)" mkdir -p "$EVIDENCE_DIR" # 系统信息 uname -a > "$EVIDENCE_DIR/system_info.txt" ps aux > "$EVIDENCE_DIR/processes.txt" netstat -tulpn > "$EVIDENCE_DIR/network_connections.txt" lsof > "$EVIDENCE_DIR/open_files.txt" # 用户信息 who > "$EVIDENCE_DIR/logged_users.txt" last -n 100 > "$EVIDENCE_DIR/login_history.txt" # 文件系统信息 find /tmp -type f -mtime -1 -ls > "$EVIDENCE_DIR/recent_tmp_files.txt" find /var/www -type f -mtime -1 -ls > "$EVIDENCE_DIR/recent_web_files.txt" # 网络流量 tcpdump -i any -w "$EVIDENCE_DIR/network_traffic.pcap" -c 1000 & TCPDUMP_PID=$! sleep 30 kill $TCPDUMP_PID 2>/dev/null || true log_message "证据收集完成: $EVIDENCE_DIR" } # 恶意文件隔离 quarantine_malicious_files() { local file_path="$1" if [ ! -f "$file_path" ]; then log_message "文件不存在: $file_path" return 1 fi log_message "隔离恶意文件: $file_path" mkdir -p "$QUARANTINE_DIR" # 计算文件哈希 FILE_HASH=$(sha256sum "$file_path" | cut -d' ' -f1) # 移动文件到隔离区 mv "$file_path" "$QUARANTINE_DIR/${FILE_HASH}_$(basename $file_path)" # 记录隔离信息 echo "$(date '+%Y-%m-%d %H:%M:%S') - $file_path -> $QUARANTINE_DIR/${FILE_HASH}_$(basename $file_path)" >> "$QUARANTINE_DIR/quarantine.log" log_message "文件已隔离: $QUARANTINE_DIR/${FILE_HASH}_$(basename $file_path)" } # 阻止恶意IP block_malicious_ip() { local ip="$1" local duration="${2:-3600}" # 默认1小时 log_message "阻止恶意IP: $ip (持续时间: ${duration}秒)" # 添加iptables规则 iptables -A INPUT -s "$ip" -j DROP # 记录到黑名单 echo "$ip" >> /etc/security/ip_blacklist.txt # 设置定时解除阻止 echo "iptables -D INPUT -s $ip -j DROP" | at now + ${duration} seconds send_alert "已阻止恶意IP: $ip" "HIGH" log_message "IP阻止完成: $ip" } # 系统恢复 system_recovery() { log_message "开始系统恢复..." # 恢复网络连接 iptables -F iptables -P INPUT ACCEPT iptables -P OUTPUT ACCEPT iptables -P FORWARD ACCEPT # 重启服务 systemctl start mysql systemctl start nginx # 验证服务状态 if systemctl is-active --quiet mysql && systemctl is-active --quiet nginx; then log_message "服务恢复成功" send_alert "系统恢复完成,服务正常运行" "INFO" else log_message "服务恢复失败" send_alert "系统恢复失败,需要人工干预" "CRITICAL" fi } # 主函数 main() { case "$1" in "isolate") isolate_system ;; "backup") backup_critical_data ;; "evidence") collect_evidence ;; "quarantine") if [ -z "$2" ]; then echo "用法: $0 quarantine " exit 1 fi quarantine_malicious_files "$2" ;; "block-ip") if [ -z "$2" ]; then echo "用法: $0 block-ip [duration]" exit 1 fi block_malicious_ip "$2" "$3" ;; "recover") system_recovery ;; "full-response") log_message "开始完整应急响应流程" backup_critical_data collect_evidence isolate_system send_alert "完整应急响应流程已执行" "CRITICAL" ;; *) echo "用法: $0 {isolate|backup|evidence|quarantine|block-ip|recover|full-response}" echo " isolate - 隔离系统" echo " backup - 备份关键数据" echo " evidence - 收集证据" echo " quarantine - 隔离恶意文件" echo " block-ip - 阻止恶意IP" echo " recover - 系统恢复" echo " full-response - 执行完整应急响应" exit 1 ;; esac } main "$@" ``` ### 9.3 事件响应手册 #### 9.3.1 数据泄露响应 **响应步骤:** 1. **立即行动**(0-1小时) - 确认泄露范围和影响 - 停止数据泄露源 - 保护剩余数据 - 通知应急响应团队 2. **深入调查**(1-24小时) - 分析泄露原因 - 确定泄露的具体数据 - 评估业务影响 - 收集相关证据 3. **通知相关方**(24-72小时) - 通知监管机构 - 通知受影响用户 - 发布公告说明 - 配合调查工作 #### 9.3.2 恶意软件感染响应 **响应步骤:** 1. **隔离感染系统** - 断开网络连接 - 停止相关服务 - 防止横向传播 2. **分析恶意软件** - 识别恶意软件类型 - 分析攻击向量 - 评估损害程度 3. **清除和恢复** - 清除恶意软件 - 修复受损文件 - 恢复系统功能 - 加强防护措施 ## 10. 总结 ### 10.1 安全架构总结 解班客项目的安全架构采用了多层防护策略,包括: - **网络安全层**:WAF、防火墙、DDoS防护 - **应用安全层**:身份认证、权限控制、输入验证 - **数据安全层**:数据加密、访问控制、备份恢复 - **监控审计层**:安全监控、日志审计、入侵检测 ### 10.2 关键安全措施 1. **身份认证**:JWT令牌 + 多因子认证 2. **权限控制**:基于角色的访问控制(RBAC) 3. **数据保护**:AES-256加密 + 数据分类保护 4. **网络防护**:多层防火墙 + WAF规则 5. **安全监控**:实时监控 + 自动告警 6. **应急响应**:完整的事件响应流程 ### 10.3 持续改进 安全是一个持续的过程,需要: - **定期安全评估**:每季度进行安全评估 - **漏洞扫描**:每月进行自动化漏洞扫描 - **安全培训**:定期对开发和运维人员进行安全培训 - **威胁情报**:关注最新的安全威胁和漏洞信息 - **安全更新**:及时更新系统和组件的安全补丁 ### 10.4 合规要求 项目需要满足以下合规要求: - **《网络安全法》**:数据保护和网络安全要求 - **《数据安全法》**:数据分类分级保护 - **《个人信息保护法》**:个人信息处理规范 - **等保2.0**:信息系统安全等级保护 通过实施本安全文档中的各项措施,可以有效保护解班客项目的安全,确保用户数据和业务系统的安全稳定运行。