Files
jiebanke/docs/安全文档.md

54 KiB
Raw Permalink Blame History

结伴客项目安全文档

1. 安全概述

1.1 安全目标

  • 数据保护:确保用户数据和业务数据的机密性、完整性和可用性
  • 系统安全:防范各类网络攻击和安全威胁
  • 合规要求:满足相关法律法规和行业标准要求
  • 隐私保护:保护用户隐私信息不被泄露或滥用

1.2 安全架构

graph TB
    A[用户] --> B[CDN/WAF]
    B --> C[负载均衡器]
    C --> D[Web应用防火墙]
    D --> E[反向代理]
    E --> F[应用服务器]
    
    F --> G[数据库]
    F --> H[缓存服务]
    F --> I[文件存储]
    
    J[安全监控] --> K[入侵检测]
    J --> L[日志审计]
    J --> M[漏洞扫描]
    
    N[身份认证] --> O[JWT令牌]
    N --> P[多因子认证]
    N --> Q[权限控制]
    
    subgraph "安全防护层"
        B
        D
        E
    end
    
    subgraph "应用安全层"
        F
        N
    end
    
    subgraph "数据安全层"
        G
        H
        I
    end
    
    subgraph "监控审计层"
        J
        K
        L
        M
    end

1.3 安全原则

  • 最小权限原则:用户和系统组件只获得完成任务所需的最小权限
  • 纵深防御:多层安全防护,单点失效不影响整体安全
  • 零信任架构:不信任任何用户或设备,始终验证身份和权限
  • 数据分类保护:根据数据敏感性实施不同级别的保护措施

2. 威胁分析

2.1 威胁模型

graph LR
    A[威胁来源] --> B[外部攻击者]
    A --> C[内部威胁]
    A --> D[第三方风险]
    
    B --> B1[黑客攻击]
    B --> B2[恶意软件]
    B --> B3[DDoS攻击]
    
    C --> C1[内部人员]
    C --> C2[权限滥用]
    C --> C3[数据泄露]
    
    D --> D1[供应商风险]
    D --> D2[第三方服务]
    D --> D3[开源组件]

2.2 风险评估矩阵

威胁类型 可能性 影响程度 风险等级 防护措施
SQL注入 参数化查询、输入验证
XSS攻击 输出编码、CSP策略
CSRF攻击 CSRF令牌、同源检查
暴力破解 账户锁定、验证码
数据泄露 数据加密、访问控制
DDoS攻击 流量清洗、限流
内部威胁 权限管理、审计日志

2.3 攻击场景分析

2.3.1 Web应用攻击

// 常见攻击示例和防护
const securityExamples = {
  // SQL注入攻击示例
  sqlInjection: {
    vulnerable: "SELECT * FROM users WHERE id = " + userId,
    secure: "SELECT * FROM users WHERE id = ?", // 使用参数化查询
    prevention: [
      "使用ORM框架",
      "参数化查询",
      "输入验证",
      "最小权限数据库用户"
    ]
  },
  
  // XSS攻击示例
  xssAttack: {
    vulnerable: `<div>${userInput}</div>`,
    secure: `<div>${escapeHtml(userInput)}</div>`,
    prevention: [
      "输出编码",
      "CSP策略",
      "输入验证",
      "使用安全的模板引擎"
    ]
  },
  
  // CSRF攻击示例
  csrfAttack: {
    vulnerable: "POST /api/transfer without token",
    secure: "POST /api/transfer with CSRF token",
    prevention: [
      "CSRF令牌",
      "SameSite Cookie",
      "双重提交Cookie",
      "验证Referer头"
    ]
  }
};

3. 身份认证与授权

3.1 认证架构

sequenceDiagram
    participant U as 用户
    participant F as 前端
    participant A as 认证服务
    participant R as 资源服务
    participant D as 数据库
    
    U->>F: 登录请求
    F->>A: 提交凭证
    A->>D: 验证用户
    D-->>A: 用户信息
    A->>A: 生成JWT令牌
    A-->>F: 返回令牌
    F-->>U: 登录成功
    
    U->>F: 访问资源
    F->>R: 携带JWT令牌
    R->>A: 验证令牌
    A-->>R: 令牌有效
    R->>D: 查询数据
    D-->>R: 返回数据
    R-->>F: 返回结果
    F-->>U: 显示内容

3.2 JWT令牌安全配置

// JWT配置
const jwtConfig = {
  // 令牌配置
  secret: process.env.JWT_SECRET, // 256位随机密钥
  algorithm: 'HS256',
  expiresIn: '2h', // 访问令牌2小时过期
  refreshExpiresIn: '7d', // 刷新令牌7天过期
  
  // 安全选项
  issuer: 'jiebanke.com',
  audience: 'jiebanke-api',
  notBefore: 0,
  
  // 令牌载荷
  payload: {
    userId: 'user.id',
    username: 'user.username',
    role: 'user.role',
    permissions: 'user.permissions'
  }
};

// JWT中间件
const jwtMiddleware = (req, res, next) => {
  const token = req.headers.authorization?.replace('Bearer ', '');
  
  if (!token) {
    return res.status(401).json({ error: '缺少访问令牌' });
  }
  
  try {
    const decoded = jwt.verify(token, jwtConfig.secret, {
      issuer: jwtConfig.issuer,
      audience: jwtConfig.audience,
      algorithms: [jwtConfig.algorithm]
    });
    
    // 检查令牌黑名单
    if (await isTokenBlacklisted(token)) {
      return res.status(401).json({ error: '令牌已失效' });
    }
    
    req.user = decoded;
    next();
  } catch (error) {
    return res.status(401).json({ error: '无效的访问令牌' });
  }
};

3.3 权限控制系统

// RBAC权限模型
const rbacModel = {
  roles: {
    admin: {
      name: '管理员',
      permissions: ['*'] // 所有权限
    },
    moderator: {
      name: '版主',
      permissions: [
        'trip:read',
        'trip:update',
        'trip:delete',
        'user:read',
        'comment:moderate'
      ]
    },
    user: {
      name: '普通用户',
      permissions: [
        'trip:read',
        'trip:create',
        'trip:update:own',
        'comment:create',
        'profile:update:own'
      ]
    }
  },
  
  resources: {
    trip: ['create', 'read', 'update', 'delete'],
    user: ['create', 'read', 'update', 'delete'],
    comment: ['create', 'read', 'update', 'delete', 'moderate'],
    profile: ['read', 'update']
  }
};

// 权限检查中间件
const checkPermission = (resource, action) => {
  return (req, res, next) => {
    const user = req.user;
    const userRole = user.role;
    const userPermissions = rbacModel.roles[userRole]?.permissions || [];
    
    // 检查是否有通配符权限
    if (userPermissions.includes('*')) {
      return next();
    }
    
    // 检查具体权限
    const requiredPermission = `${resource}:${action}`;
    const hasPermission = userPermissions.some(permission => {
      if (permission === requiredPermission) return true;
      if (permission.endsWith(':*') && requiredPermission.startsWith(permission.slice(0, -1))) return true;
      return false;
    });
    
    // 检查资源所有权
    if (!hasPermission && action.endsWith(':own')) {
      const basePermission = requiredPermission.replace(':own', '');
      if (userPermissions.includes(basePermission + ':own')) {
        // 需要验证资源所有权
        return checkResourceOwnership(resource, req, res, next);
      }
    }
    
    if (!hasPermission) {
      return res.status(403).json({ error: '权限不足' });
    }
    
    next();
  };
};

3.4 多因子认证

// 多因子认证实现
const mfaService = {
  // 生成TOTP密钥
  generateSecret: (userId) => {
    const secret = speakeasy.generateSecret({
      name: `结伴客 (${userId})`,
issuer: '结伴客',
      length: 32
    });
    
    return {
      secret: secret.base32,
      qrCode: qrcode.toDataURL(secret.otpauth_url)
    };
  },
  
  // 验证TOTP令牌
  verifyToken: (secret, token) => {
    return speakeasy.totp.verify({
      secret: secret,
      encoding: 'base32',
      token: token,
      window: 2 // 允许时间偏差
    });
  },
  
  // 生成备用码
  generateBackupCodes: () => {
    const codes = [];
    for (let i = 0; i < 10; i++) {
      codes.push(crypto.randomBytes(4).toString('hex').toUpperCase());
    }
    return codes;
  }
};

// MFA中间件
const mfaMiddleware = async (req, res, next) => {
  const user = req.user;
  
  // 检查用户是否启用了MFA
  const userMfa = await getUserMfaSettings(user.userId);
  if (!userMfa.enabled) {
    return next();
  }
  
  const mfaToken = req.headers['x-mfa-token'];
  if (!mfaToken) {
    return res.status(401).json({ 
      error: '需要多因子认证',
      mfaRequired: true 
    });
  }
  
  // 验证MFA令牌
  const isValid = mfaService.verifyToken(userMfa.secret, mfaToken);
  if (!isValid) {
    // 尝试备用码
    const backupCodeValid = await verifyBackupCode(user.userId, mfaToken);
    if (!backupCodeValid) {
      return res.status(401).json({ error: '多因子认证失败' });
    }
  }
  
  next();
};

4. 数据安全

4.1 数据分类与保护

数据类型 敏感级别 保护措施 访问控制
用户密码 极高 bcrypt加密、盐值 仅认证服务访问
身份证号 AES-256加密 管理员+审计日志
手机号码 AES-256加密 业务需要+脱敏显示
邮箱地址 明文存储 用户本人+管理员
旅行信息 明文存储 公开+隐私设置
系统日志 压缩存储 运维人员

4.2 数据加密实现

// 数据加密服务
const encryptionService = {
  // AES-256-GCM加密
  encrypt: (plaintext, key = process.env.ENCRYPTION_KEY) => {
    const iv = crypto.randomBytes(16);
    const cipher = crypto.createCipher('aes-256-gcm', key, iv);
    
    let encrypted = cipher.update(plaintext, 'utf8', 'hex');
    encrypted += cipher.final('hex');
    
    const authTag = cipher.getAuthTag();
    
    return {
      encrypted,
      iv: iv.toString('hex'),
      authTag: authTag.toString('hex')
    };
  },
  
  // AES-256-GCM解密
  decrypt: (encryptedData, key = process.env.ENCRYPTION_KEY) => {
    const decipher = crypto.createDecipher('aes-256-gcm', key, 
      Buffer.from(encryptedData.iv, 'hex'));
    
    decipher.setAuthTag(Buffer.from(encryptedData.authTag, 'hex'));
    
    let decrypted = decipher.update(encryptedData.encrypted, 'hex', 'utf8');
    decrypted += decipher.final('utf8');
    
    return decrypted;
  },
  
  // 密码哈希
  hashPassword: async (password) => {
    const saltRounds = 12;
    return await bcrypt.hash(password, saltRounds);
  },
  
  // 密码验证
  verifyPassword: async (password, hash) => {
    return await bcrypt.compare(password, hash);
  }
};

// 敏感数据处理中间件
const sensitiveDataMiddleware = {
  // 加密敏感字段
  encryptSensitiveFields: (data, fields) => {
    const result = { ...data };
    
    fields.forEach(field => {
      if (result[field]) {
        result[field] = encryptionService.encrypt(result[field]);
      }
    });
    
    return result;
  },
  
  // 解密敏感字段
  decryptSensitiveFields: (data, fields) => {
    const result = { ...data };
    
    fields.forEach(field => {
      if (result[field] && typeof result[field] === 'object') {
        result[field] = encryptionService.decrypt(result[field]);
      }
    });
    
    return result;
  },
  
  // 数据脱敏
  maskSensitiveData: (data, field) => {
    if (!data[field]) return data;
    
    const value = data[field];
    let masked;
    
    switch (field) {
      case 'phone':
        masked = value.replace(/(\d{3})\d{4}(\d{4})/, '$1****$2');
        break;
      case 'email':
        masked = value.replace(/(.{2}).*(@.*)/, '$1***$2');
        break;
      case 'idCard':
        masked = value.replace(/(\d{6})\d{8}(\d{4})/, '$1********$2');
        break;
      default:
        masked = '***';
    }
    
    return { ...data, [field]: masked };
  }
};

4.3 数据库安全配置

-- 数据库安全配置
-- 1. 创建专用数据库用户
CREATE USER 'jiebanke_app'@'%' IDENTIFIED BY 'strong_password_here';
CREATE USER 'jiebanke_readonly'@'%' IDENTIFIED BY 'readonly_password_here';

-- 2. 授予最小权限
GRANT SELECT, INSERT, UPDATE, DELETE ON jiebanke.* TO 'jiebanke_app'@'%';
GRANT SELECT ON jiebanke.* TO 'jiebanke_readonly'@'%';

-- 3. 禁用危险权限
REVOKE FILE ON *.* FROM 'jiebanke_app'@'%';
REVOKE PROCESS ON *.* FROM 'jiebanke_app'@'%';
REVOKE SUPER ON *.* FROM 'jiebanke_app'@'%';

-- 4. 启用SSL连接
ALTER USER 'jiebanke_app'@'%' REQUIRE SSL;

-- 5. 设置连接限制
ALTER USER 'jiebanke_app'@'%' WITH MAX_CONNECTIONS_PER_HOUR 1000;
ALTER USER 'jiebanke_app'@'%' WITH MAX_QUERIES_PER_HOUR 10000;

-- 6. 创建审计表
CREATE TABLE security_audit_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    user_id INT,
    action VARCHAR(50),
    resource VARCHAR(100),
    ip_address VARCHAR(45),
    user_agent TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_user_id (user_id),
    INDEX idx_action (action),
    INDEX idx_created_at (created_at)
);

5. 网络安全

5.1 网络架构安全

graph TB
    A[Internet] --> B[CDN]
    B --> C[WAF]
    C --> D[Load Balancer]
    D --> E[DMZ]
    
    E --> F[Web Server]
    F --> G[Internal Network]
    G --> H[App Server]
    G --> I[Database Server]
    
    J[VPN] --> G
    K[Bastion Host] --> G
    
    subgraph "Public Zone"
        B
        C
        D
    end
    
    subgraph "DMZ Zone"
        E
        F
    end
    
    subgraph "Internal Zone"
        G
        H
        I
    end
    
    subgraph "Management Zone"
        J
        K
    end

5.2 防火墙配置

#!/bin/bash
# firewall-config.sh - 防火墙配置脚本

# 清空现有规则
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X

# 设置默认策略
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT

# 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT

# 允许已建立的连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT

# 允许SSH (限制IP)
iptables -A INPUT -p tcp --dport 22 -s 192.168.1.0/24 -j ACCEPT

# 允许HTTP/HTTPS
iptables -A INPUT -p tcp --dport 80 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -j ACCEPT

# 允许应用端口 (仅内网)
iptables -A INPUT -p tcp --dport 3000 -s 192.168.1.0/24 -j ACCEPT

# 防止DDoS攻击
iptables -A INPUT -p tcp --dport 80 -m limit --limit 25/minute --limit-burst 100 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -m limit --limit 25/minute --limit-burst 100 -j ACCEPT

# 防止端口扫描
iptables -A INPUT -m recent --name portscan --rcheck --seconds 86400 -j DROP
iptables -A INPUT -m recent --name portscan --remove
iptables -A INPUT -p tcp -m tcp --dport 139 -m recent --name portscan --set -j LOG --log-prefix "portscan:"
iptables -A INPUT -p tcp -m tcp --dport 139 -j DROP

# 保存规则
iptables-save > /etc/iptables/rules.v4

echo "防火墙配置完成"

5.3 WAF规则配置

# WAF配置 - ModSecurity规则
# /etc/nginx/modsec/main.conf

# 基础配置
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess Off
SecRequestBodyLimit 13107200
SecRequestBodyNoFilesLimit 131072

# 日志配置
SecAuditEngine RelevantOnly
SecAuditLog /var/log/nginx/modsec_audit.log
SecAuditLogFormat JSON

# SQL注入防护
SecRule ARGS "@detectSQLi" \
    "id:1001,\
    phase:2,\
    block,\
    msg:'SQL Injection Attack Detected',\
    logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',\
    tag:'application-multi',\
    tag:'language-multi',\
    tag:'platform-multi',\
    tag:'attack-sqli'"

# XSS攻击防护
SecRule ARGS "@detectXSS" \
    "id:1002,\
    phase:2,\
    block,\
    msg:'XSS Attack Detected',\
    logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',\
    tag:'application-multi',\
    tag:'language-multi',\
    tag:'platform-multi',\
    tag:'attack-xss'"

# 文件上传限制
SecRule FILES_TMPNAMES "@inspectFile /etc/nginx/modsec/file_inspection.lua" \
    "id:1003,\
    phase:2,\
    block,\
    msg:'Malicious File Upload Detected'"

# 频率限制
SecRule IP:REQUEST_COUNT "@gt 100" \
    "id:1004,\
    phase:1,\
    deny,\
    msg:'Rate Limiting - Too Many Requests',\
    setvar:ip.request_count=+1,\
    expirevar:ip.request_count=60"

# 地理位置限制
SecRule REMOTE_ADDR "@geoLookup" \
    "id:1005,\
    phase:1,\
    chain,\
    msg:'Request from blocked country'"
SecRule GEO:COUNTRY_CODE "@within CN US JP KR" \
    "t:none,\
    deny"

5.4 SSL/TLS配置

# SSL/TLS安全配置
server {
    listen 443 ssl http2;
    server_name api.jiebanke.com;
    
    # SSL证书配置
    ssl_certificate /etc/ssl/certs/jiebanke.crt;
    ssl_certificate_key /etc/ssl/private/jiebanke.key;
    
    # SSL协议和加密套件
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-CHACHA20-POLY1305;
    ssl_prefer_server_ciphers off;
    
    # SSL会话配置
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    ssl_session_tickets off;
    
    # OCSP装订
    ssl_stapling on;
    ssl_stapling_verify on;
    ssl_trusted_certificate /etc/ssl/certs/ca-certificates.crt;
    resolver 8.8.8.8 8.8.4.4 valid=300s;
    resolver_timeout 5s;
    
    # 安全头部
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
    add_header X-Frame-Options "SAMEORIGIN" always;
    add_header X-Content-Type-Options "nosniff" always;
    add_header X-XSS-Protection "1; mode=block" always;
    add_header Referrer-Policy "strict-origin-when-cross-origin" always;
    add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'; frame-ancestors 'self';" always;
    
    # 隐藏服务器信息
    server_tokens off;
    more_clear_headers Server;
    
    location / {
        proxy_pass http://backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 安全头部
        proxy_hide_header X-Powered-By;
        proxy_hide_header Server;
    }
}

6. 应用安全

6.1 输入验证与过滤

// 输入验证中间件
const inputValidation = {
  // 通用验证规则
  rules: {
    username: {
      type: 'string',
      minLength: 3,
      maxLength: 20,
      pattern: /^[a-zA-Z0-9_]+$/,
      sanitize: true
    },
    email: {
      type: 'email',
      maxLength: 100,
      sanitize: true
    },
    password: {
      type: 'string',
      minLength: 8,
      maxLength: 128,
      pattern: /^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]/
    },
    phone: {
      type: 'string',
      pattern: /^1[3-9]\d{9}$/,
      sanitize: true
    }
  },
  
  // 验证函数
  validate: (data, rules) => {
    const errors = [];
    
    for (const [field, rule] of Object.entries(rules)) {
      const value = data[field];
      
      // 必填检查
      if (rule.required && (!value || value.trim() === '')) {
        errors.push(`${field} 是必填字段`);
        continue;
      }
      
      if (!value) continue;
      
      // 类型检查
      if (rule.type === 'email' && !validator.isEmail(value)) {
        errors.push(`${field} 格式不正确`);
      }
      
      // 长度检查
      if (rule.minLength && value.length < rule.minLength) {
        errors.push(`${field} 长度不能少于 ${rule.minLength} 个字符`);
      }
      
      if (rule.maxLength && value.length > rule.maxLength) {
        errors.push(`${field} 长度不能超过 ${rule.maxLength} 个字符`);
      }
      
      // 正则检查
      if (rule.pattern && !rule.pattern.test(value)) {
        errors.push(`${field} 格式不正确`);
      }
    }
    
    return errors;
  },
  
  // 数据清理
  sanitize: (data, rules) => {
    const sanitized = {};
    
    for (const [field, rule] of Object.entries(rules)) {
      let value = data[field];
      
      if (!value) {
        sanitized[field] = value;
        continue;
      }
      
      if (rule.sanitize) {
        // HTML转义
        value = validator.escape(value);
        // 去除首尾空格
        value = value.trim();
        // SQL注入防护
        value = value.replace(/['"\\]/g, '\\$&');
      }
      
      sanitized[field] = value;
    }
    
    return sanitized;
  }
};

// 验证中间件
const validateInput = (rules) => {
  return (req, res, next) => {
    const errors = inputValidation.validate(req.body, rules);
    
    if (errors.length > 0) {
      return res.status(400).json({
        error: '输入验证失败',
        details: errors
      });
    }
    
    // 清理输入数据
    req.body = inputValidation.sanitize(req.body, rules);
    next();
  };
};

6.2 CSRF防护

// CSRF防护中间件
const csrfProtection = {
  // 生成CSRF令牌
  generateToken: (req) => {
    const token = crypto.randomBytes(32).toString('hex');
    req.session.csrfToken = token;
    return token;
  },
  
  // 验证CSRF令牌
  verifyToken: (req) => {
    const sessionToken = req.session.csrfToken;
    const requestToken = req.headers['x-csrf-token'] || req.body._csrf;
    
    return sessionToken && requestToken && sessionToken === requestToken;
  },
  
  // CSRF中间件
  middleware: (req, res, next) => {
    // GET请求不需要CSRF验证
    if (req.method === 'GET') {
      return next();
    }
    
    // API请求使用JWT不需要CSRF验证
    if (req.path.startsWith('/api/')) {
      return next();
    }
    
    if (!csrfProtection.verifyToken(req)) {
      return res.status(403).json({ error: 'CSRF令牌验证失败' });
    }
    
    next();
  }
};

// 双重提交Cookie CSRF防护
const doubleSubmitCsrf = {
  middleware: (req, res, next) => {
    if (req.method === 'GET') {
      // 设置CSRF Cookie
      const token = crypto.randomBytes(32).toString('hex');
      res.cookie('csrf-token', token, {
        httpOnly: false,
        secure: process.env.NODE_ENV === 'production',
        sameSite: 'strict'
      });
      return next();
    }
    
    const cookieToken = req.cookies['csrf-token'];
    const headerToken = req.headers['x-csrf-token'];
    
    if (!cookieToken || !headerToken || cookieToken !== headerToken) {
      return res.status(403).json({ error: 'CSRF验证失败' });
    }
    
    next();
  }
};

6.3 会话安全

// 会话安全配置
const sessionConfig = {
  secret: process.env.SESSION_SECRET,
  name: 'jiebanke.sid',
  resave: false,
  saveUninitialized: false,
  rolling: true,
  cookie: {
    secure: process.env.NODE_ENV === 'production',
    httpOnly: true,
    maxAge: 2 * 60 * 60 * 1000, // 2小时
    sameSite: 'strict'
  },
  store: new RedisStore({
    client: redisClient,
    prefix: 'sess:',
    ttl: 2 * 60 * 60 // 2小时
  })
};

// 会话安全中间件
const sessionSecurity = {
  // 会话固定攻击防护
  regenerateSession: (req, res, next) => {
    if (req.session && req.session.regenerate) {
      req.session.regenerate((err) => {
        if (err) {
          return next(err);
        }
        next();
      });
    } else {
      next();
    }
  },
  
  // 会话劫持防护
  validateSession: (req, res, next) => {
    if (!req.session.userAgent) {
      req.session.userAgent = req.headers['user-agent'];
      req.session.ipAddress = req.ip;
    } else {
      // 检查User-Agent和IP是否一致
      if (req.session.userAgent !== req.headers['user-agent'] ||
          req.session.ipAddress !== req.ip) {
        req.session.destroy();
        return res.status(401).json({ error: '会话已失效,请重新登录' });
      }
    }
    
    next();
  },
  
  // 并发会话控制
  limitConcurrentSessions: async (req, res, next) => {
    if (!req.user) return next();
    
    const userId = req.user.userId;
    const currentSessionId = req.sessionID;
    
    // 获取用户的所有活跃会话
    const activeSessions = await redis.smembers(`user:${userId}:sessions`);
    
    // 限制最多3个并发会话
    if (activeSessions.length >= 3 && !activeSessions.includes(currentSessionId)) {
      // 删除最旧的会话
      const oldestSession = activeSessions[0];
      await redis.srem(`user:${userId}:sessions`, oldestSession);
      await redis.del(`sess:${oldestSession}`);
    }
    
    // 添加当前会话
    await redis.sadd(`user:${userId}:sessions`, currentSessionId);
    await redis.expire(`user:${userId}:sessions`, 2 * 60 * 60);
    
    next();
  }
};

7. 安全监控与审计

7.1 安全事件监控

// 安全事件监控系统
const securityMonitor = {
  // 事件类型定义
  eventTypes: {
    LOGIN_SUCCESS: 'login_success',
    LOGIN_FAILURE: 'login_failure',
    PASSWORD_CHANGE: 'password_change',
    PERMISSION_DENIED: 'permission_denied',
    SUSPICIOUS_ACTIVITY: 'suspicious_activity',
    DATA_ACCESS: 'data_access',
    ADMIN_ACTION: 'admin_action'
  },
  
  // 记录安全事件
  logEvent: async (eventType, userId, details = {}) => {
    const event = {
      id: uuidv4(),
      type: eventType,
      userId: userId,
      timestamp: new Date(),
      ipAddress: details.ipAddress,
      userAgent: details.userAgent,
      resource: details.resource,
      action: details.action,
      result: details.result,
      riskLevel: details.riskLevel || 'low',
      metadata: details.metadata || {}
    };
    
    // 存储到数据库
    await SecurityAuditLog.create(event);
    
    // 发送到日志系统
    logger.info('Security Event', event);
    
    // 检查是否需要告警
    await this.checkAlerts(event);
  },
  
  // 告警检查
  checkAlerts: async (event) => {
    const alerts = [];
    
    // 登录失败次数检查
    if (event.type === this.eventTypes.LOGIN_FAILURE) {
      const recentFailures = await this.getRecentEvents(
        event.userId, 
        this.eventTypes.LOGIN_FAILURE, 
        15 * 60 * 1000 // 15分钟内
      );
      
      if (recentFailures.length >= 5) {
        alerts.push({
          type: 'BRUTE_FORCE_ATTACK',
          severity: 'high',
          message: `用户 ${event.userId} 15分钟内登录失败${recentFailures.length}次`
        });
      }
    }
    
    // 异常IP访问检查
    if (event.ipAddress) {
      const userIPs = await this.getUserRecentIPs(event.userId, 24 * 60 * 60 * 1000);
      if (userIPs.length > 1 && !userIPs.includes(event.ipAddress)) {
        alerts.push({
          type: 'UNUSUAL_IP_ACCESS',
          severity: 'medium',
          message: `用户 ${event.userId} 从新IP地址 ${event.ipAddress} 访问`
        });
      }
    }
    
    // 权限提升检查
    if (event.type === this.eventTypes.ADMIN_ACTION && event.userId) {
      alerts.push({
        type: 'ADMIN_ACTION',
        severity: 'medium',
        message: `管理员 ${event.userId} 执行了操作: ${event.action}`
      });
    }
    
    // 发送告警
    for (const alert of alerts) {
      await this.sendAlert(alert, event);
    }
  },
  
  // 发送安全告警
  sendAlert: async (alert, event) => {
    const message = {
      title: `安全告警: ${alert.type}`,
      severity: alert.severity,
      message: alert.message,
      timestamp: event.timestamp,
      details: event
    };
    
    // 发送到钉钉
    await this.sendDingTalkAlert(message);
    
    // 发送邮件
    if (alert.severity === 'high') {
      await this.sendEmailAlert(message);
    }
    
    // 记录告警
    await SecurityAlert.create({
      type: alert.type,
      severity: alert.severity,
      message: alert.message,
      eventId: event.id,
      status: 'open'
    });
  }
};

// 安全监控中间件
const securityMonitorMiddleware = (req, res, next) => {
  // 记录请求信息
  const originalSend = res.send;
  res.send = function(data) {
    const statusCode = res.statusCode;
    
    // 记录安全相关事件
    if (req.path.includes('/auth/login')) {
      const eventType = statusCode === 200 ? 
        securityMonitor.eventTypes.LOGIN_SUCCESS : 
        securityMonitor.eventTypes.LOGIN_FAILURE;
      
      securityMonitor.logEvent(eventType, req.body.username, {
        ipAddress: req.ip,
        userAgent: req.headers['user-agent'],
        result: statusCode === 200 ? 'success' : 'failure'
      });
    }
    
    if (statusCode === 403) {
      securityMonitor.logEvent(securityMonitor.eventTypes.PERMISSION_DENIED, req.user?.userId, {
        ipAddress: req.ip,
        userAgent: req.headers['user-agent'],
        resource: req.path,
        action: req.method
      });
    }
    
    originalSend.call(this, data);
  };
  
  next();
};

7.2 入侵检测系统

#!/usr/bin/env python3
# intrusion_detection.py - 入侵检测系统

import re
import json
import time
from collections import defaultdict, deque
from datetime import datetime, timedelta

class IntrusionDetectionSystem:
    def __init__(self):
        self.attack_patterns = {
            'sql_injection': [
                r"(\bunion\b.*\bselect\b)",
                r"(\bselect\b.*\bfrom\b.*\bwhere\b)",
                r"(\bdrop\b.*\btable\b)",
                r"(\binsert\b.*\binto\b)",
                r"(\bdelete\b.*\bfrom\b)"
            ],
            'xss_attack': [
                r"<script[^>]*>.*?</script>",
                r"javascript:",
                r"on\w+\s*=",
                r"<iframe[^>]*>.*?</iframe>"
            ],
            'path_traversal': [
                r"\.\./",
                r"\.\.\\",
                r"/etc/passwd",
                r"/etc/shadow"
            ],
            'command_injection': [
                r";\s*(cat|ls|pwd|whoami)",
                r"\|\s*(cat|ls|pwd|whoami)",
                r"&&\s*(cat|ls|pwd|whoami)",
                r"`.*`"
            ]
        }
        
        self.ip_requests = defaultdict(lambda: deque(maxlen=100))
        self.failed_logins = defaultdict(lambda: deque(maxlen=50))
        
    def analyze_request(self, request_data):
        """分析单个请求"""
        alerts = []
        
        # 检查攻击模式
        for attack_type, patterns in self.attack_patterns.items():
            for pattern in patterns:
                if self._check_pattern(request_data, pattern):
                    alerts.append({
                        'type': attack_type,
                        'severity': 'high',
                        'pattern': pattern,
                        'timestamp': datetime.now(),
                        'source_ip': request_data.get('ip'),
                        'request_uri': request_data.get('uri'),
                        'user_agent': request_data.get('user_agent')
                    })
        
        # 检查频率异常
        ip = request_data.get('ip')
        if ip:
            self.ip_requests[ip].append(time.time())
            if self._check_rate_limit(ip):
                alerts.append({
                    'type': 'rate_limit_exceeded',
                    'severity': 'medium',
                    'source_ip': ip,
                    'request_count': len(self.ip_requests[ip]),
                    'timestamp': datetime.now()
                })
        
        # 检查登录失败
        if request_data.get('path') == '/auth/login' and request_data.get('status') >= 400:
            user = request_data.get('username', ip)
            self.failed_logins[user].append(time.time())
            if len(self.failed_logins[user]) >= 5:
                alerts.append({
                    'type': 'brute_force_attack',
                    'severity': 'high',
                    'target_user': user,
                    'source_ip': ip,
                    'failed_attempts': len(self.failed_logins[user]),
                    'timestamp': datetime.now()
                })
        
        return alerts
    
    def _check_pattern(self, request_data, pattern):
        """检查请求是否匹配攻击模式"""
        text_fields = [
            request_data.get('uri', ''),
            request_data.get('query_string', ''),
            request_data.get('post_data', ''),
            request_data.get('headers', {}).get('user-agent', '')
        ]
        
        for field in text_fields:
            if re.search(pattern, field, re.IGNORECASE):
                return True
        return False
    
    def _check_rate_limit(self, ip):
        """检查IP请求频率"""
        now = time.time()
        requests = self.ip_requests[ip]
        
        # 1分钟内超过100个请求
        recent_requests = [req for req in requests if now - req < 60]
        return len(recent_requests) > 100
    
    def generate_report(self, alerts):
        """生成入侵检测报告"""
        if not alerts:
            return None
        
        report = {
            'timestamp': datetime.now().isoformat(),
            'total_alerts': len(alerts),
            'severity_breakdown': defaultdict(int),
            'attack_types': defaultdict(int),
            'top_source_ips': defaultdict(int),
            'alerts': alerts
        }
        
        for alert in alerts:
            report['severity_breakdown'][alert['severity']] += 1
            report['attack_types'][alert['type']] += 1
            if 'source_ip' in alert:
                report['top_source_ips'][alert['source_ip']] += 1
        
        return report
    
    def block_ip(self, ip, duration=3600):
        """阻止IP访问"""
        # 这里应该调用防火墙API或更新IP黑名单
        print(f"Blocking IP {ip} for {duration} seconds")
        
        # 示例更新iptables规则
        import subprocess
        try:
            subprocess.run([
                'iptables', '-A', 'INPUT', 
                '-s', ip, '-j', 'DROP'
            ], check=True)
            
            # 设置定时解除阻止
            subprocess.run([
                'at', f'now + {duration} seconds'
            ], input=f'iptables -D INPUT -s {ip} -j DROP\n', 
            text=True, check=True)
            
        except subprocess.CalledProcessError as e:
            print(f"Failed to block IP {ip}: {e}")

# 使用示例
if __name__ == "__main__":
    ids = IntrusionDetectionSystem()
    
    # 模拟请求数据
    request_data = {
        'ip': '192.168.1.100',
        'uri': '/api/users?id=1 UNION SELECT * FROM users',
        'method': 'GET',
        'status': 200,
        'user_agent': 'Mozilla/5.0...'
    }
    
    alerts = ids.analyze_request(request_data)
    if alerts:
        report = ids.generate_report(alerts)
        print(json.dumps(report, indent=2, default=str))
        
        # 自动阻止高危IP
        for alert in alerts:
            if alert['severity'] == 'high' and 'source_ip' in alert:
                ids.block_ip(alert['source_ip'])

8. 安全测试

8.1 自动化安全测试

#!/usr/bin/env python3
# security_test.py - 自动化安全测试

import requests
import json
import time
from urllib.parse import urljoin

class SecurityTester:
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.vulnerabilities = []
    
    def test_sql_injection(self):
        """SQL注入测试"""
        print("Testing SQL Injection...")
        
        payloads = [
            "' OR '1'='1",
            "'; DROP TABLE users; --",
            "' UNION SELECT * FROM users --",
            "1' AND (SELECT COUNT(*) FROM users) > 0 --"
        ]
        
        test_endpoints = [
            "/api/users",
            "/api/trips",
            "/api/auth/login"
        ]
        
        for endpoint in test_endpoints:
            for payload in payloads:
                params = {'id': payload}
                try:
                    response = self.session.get(
                        urljoin(self.base_url, endpoint),
                        params=params,
                        timeout=10
                    )
                    
                    # 检查是否存在SQL错误信息
                    error_indicators = [
                        'mysql_fetch_array',
                        'ORA-01756',
                        'Microsoft OLE DB Provider',
                        'SQLServer JDBC Driver',
                        'PostgreSQL query failed'
                    ]
                    
                    for indicator in error_indicators:
                        if indicator.lower() in response.text.lower():
                            self.vulnerabilities.append({
                                'type': 'SQL Injection',
                                'severity': 'High',
                                'endpoint': endpoint,
                                'payload': payload,
                                'evidence': indicator
                            })
                            break
                            
                except requests.RequestException as e:
                    print(f"Request failed: {e}")
    
    def test_xss(self):
        """XSS测试"""
        print("Testing XSS...")
        
        payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "javascript:alert('XSS')",
            "<svg onload=alert('XSS')>"
        ]
        
        test_endpoints = [
            "/api/comments",
            "/api/trips",
            "/api/users/profile"
        ]
        
        for endpoint in test_endpoints:
            for payload in payloads:
                data = {'content': payload, 'title': payload}
                try:
                    response = self.session.post(
                        urljoin(self.base_url, endpoint),
                        json=data,
                        timeout=10
                    )
                    
                    # 检查响应中是否包含未转义的payload
                    if payload in response.text:
                        self.vulnerabilities.append({
                            'type': 'Cross-Site Scripting (XSS)',
                            'severity': 'High',
                            'endpoint': endpoint,
                            'payload': payload,
                            'evidence': 'Payload reflected in response'
                        })
                        
                except requests.RequestException as e:
                    print(f"Request failed: {e}")
    
    def test_authentication_bypass(self):
        """认证绕过测试"""
        print("Testing Authentication Bypass...")
        
        protected_endpoints = [
            "/api/admin/users",
            "/api/admin/system",
            "/api/users/profile"
        ]
        
        for endpoint in protected_endpoints:
            try:
                # 不带认证头的请求
                response = self.session.get(
                    urljoin(self.base_url, endpoint),
                    timeout=10
                )
                
                if response.status_code == 200:
                    self.vulnerabilities.append({
                        'type': 'Authentication Bypass',
                        'severity': 'High',
                        'endpoint': endpoint,
                        'evidence': f'Status code: {response.status_code}'
                    })
                    
                # 测试无效token
                headers = {'Authorization': 'Bearer invalid_token'}
                response = self.session.get(
                    urljoin(self.base_url, endpoint),
                    headers=headers,
                    timeout=10
                )
                
                if response.status_code == 200:
                    self.vulnerabilities.append({
                        'type': 'Invalid Token Accepted',
                        'severity': 'High',
                        'endpoint': endpoint,
                        'evidence': 'Invalid token accepted'
                    })
                    
            except requests.RequestException as e:
                print(f"Request failed: {e}")
    
    def test_brute_force_protection(self):
        """暴力破解防护测试"""
        print("Testing Brute Force Protection...")
        
        login_endpoint = "/api/auth/login"
        credentials = {
            'username': 'testuser',
            'password': 'wrongpassword'
        }
        
        attempt_count = 0
        max_attempts = 10
        
        for i in range(max_attempts):
            try:
                response = self.session.post(
                    urljoin(self.base_url, login_endpoint),
                    json=credentials,
                    timeout=10
                )
                
                attempt_count += 1
                
                # 检查是否有速率限制
                if response.status_code == 429:
                    print(f"Rate limiting detected after {attempt_count} attempts")
                    break
                elif response.status_code == 401:
                    continue
                else:
                    time.sleep(1)  # 避免过快请求
                    
            except requests.RequestException as e:
                print(f"Request failed: {e}")
                break
        
        if attempt_count >= max_attempts:
            self.vulnerabilities.append({
                'type': 'No Brute Force Protection',
                'severity': 'Medium',
                'endpoint': login_endpoint,
                'evidence': f'Allowed {max_attempts} failed login attempts'
            })
    
    def test_file_upload_security(self):
        """文件上传安全测试"""
        print("Testing File Upload Security...")
        
        upload_endpoint = "/api/upload"
        
        # 测试恶意文件上传
        malicious_files = [
            ('shell.php', '<?php system($_GET["cmd"]); ?>', 'application/x-php'),
            ('script.js', 'alert("XSS")', 'application/javascript'),
            ('test.exe', b'\x4d\x5a\x90\x00', 'application/x-executable')
        ]
        
        for filename, content, content_type in malicious_files:
            files = {
                'file': (filename, content, content_type)
            }
            
            try:
                response = self.session.post(
                    urljoin(self.base_url, upload_endpoint),
                    files=files,
                    timeout=10
                )
                
                if response.status_code == 200:
                    self.vulnerabilities.append({
                        'type': 'Malicious File Upload',
                        'severity': 'High',
                        'endpoint': upload_endpoint,
                        'payload': filename,
                        'evidence': 'Malicious file accepted'
                    })
                    
            except requests.RequestException as e:
                print(f"Request failed: {e}")
    
    def generate_report(self):
        """生成安全测试报告"""
        report = {
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'target': self.base_url,
            'total_vulnerabilities': len(self.vulnerabilities),
            'severity_breakdown': {
                'High': 0,
                'Medium': 0,
                'Low': 0
            },
            'vulnerabilities': self.vulnerabilities
        }
        
        for vuln in self.vulnerabilities:
            severity = vuln.get('severity', 'Low')
            report['severity_breakdown'][severity] += 1
        
        return report
    
    def run_all_tests(self):
        """运行所有安全测试"""
        print(f"Starting security tests for {self.base_url}")
        
        self.test_sql_injection()
        self.test_xss()
        self.test_authentication_bypass()
        self.test_brute_force_protection()
        self.test_file_upload_security()
        
        report = self.generate_report()
        
        print("\n" + "="*50)
        print("SECURITY TEST REPORT")
        print("="*50)
        print(f"Target: {report['target']}")
        print(f"Total Vulnerabilities: {report['total_vulnerabilities']}")
        print(f"High: {report['severity_breakdown']['High']}")
        print(f"Medium: {report['severity_breakdown']['Medium']}")
        print(f"Low: {report['severity_breakdown']['Low']}")
        
        if self.vulnerabilities:
            print("\nVulnerabilities Found:")
            for i, vuln in enumerate(self.vulnerabilities, 1):
                print(f"\n{i}. {vuln['type']} ({vuln['severity']})")
                print(f"   Endpoint: {vuln['endpoint']}")
                if 'payload' in vuln:
                    print(f"   Payload: {vuln['payload']}")
                print(f"   Evidence: {vuln['evidence']}")
        else:
            print("\nNo vulnerabilities found!")
        
        return report

# 使用示例
if __name__ == "__main__":
    tester = SecurityTester("http://localhost:3000")
    report = tester.run_all_tests()
    
    # 保存报告
    with open(f"security_report_{int(time.time())}.json", 'w') as f:
        json.dump(report, f, indent=2)

9. 应急响应

9.1 安全事件响应流程

graph TD
    A[安全事件发生] --> B[事件检测]
    B --> C[事件分类]
    C --> D[影响评估]
    D --> E[响应级别确定]
    
    E --> F[一级响应]
    E --> G[二级响应]
    E --> H[三级响应]
    
    F --> I[立即隔离]
    G --> J[限制访问]
    H --> K[监控观察]
    
    I --> L[深入调查]
    J --> L
    K --> L
    
    L --> M[制定恢复计划]
    M --> N[系统恢复]
    N --> O[事后总结]

9.2 应急响应脚本

#!/bin/bash
# emergency-response.sh - 应急响应脚本

set -e

LOG_FILE="/var/log/security/emergency-response.log"
BACKUP_DIR="/opt/backup/emergency"
QUARANTINE_DIR="/opt/quarantine"

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 发送告警
send_alert() {
    local message="$1"
    local level="$2"
    
    # 发送钉钉告警
    curl -X POST "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN" \
        -H "Content-Type: application/json" \
        -d "{
            \"msgtype\": \"text\",
            \"text\": {
                \"content\": \"【安全应急】[$level] $message\"
            },
            \"at\": {
                \"isAtAll\": true
            }
        }" > /dev/null 2>&1
    
    log_message "Alert sent: [$level] $message"
}

# 隔离受感染系统
isolate_system() {
    log_message "开始系统隔离..."
    
    # 断开网络连接
    iptables -P INPUT DROP
    iptables -P OUTPUT DROP
    iptables -P FORWARD DROP
    
    # 保留SSH连接用于管理
    iptables -A INPUT -p tcp --dport 22 -s 192.168.1.0/24 -j ACCEPT
    iptables -A OUTPUT -p tcp --sport 22 -d 192.168.1.0/24 -j ACCEPT
    
    # 停止非关键服务
    systemctl stop nginx
    systemctl stop apache2
    systemctl stop mysql
    
    send_alert "系统已隔离,网络连接已断开" "CRITICAL"
    log_message "系统隔离完成"
}

# 备份关键数据
backup_critical_data() {
    log_message "开始备份关键数据..."
    
    mkdir -p "$BACKUP_DIR/$(date +%Y%m%d_%H%M%S)"
    CURRENT_BACKUP="$BACKUP_DIR/$(date +%Y%m%d_%H%M%S)"
    
    # 备份数据库
    mysqldump -u root -p"$DB_PASSWORD" --all-databases > "$CURRENT_BACKUP/database_backup.sql"
    
    # 备份配置文件
    cp -r /etc/nginx "$CURRENT_BACKUP/"
    cp -r /etc/mysql "$CURRENT_BACKUP/"
    cp -r /opt/jiebanke/config "$CURRENT_BACKUP/"
    
    # 备份日志文件
    cp -r /var/log "$CURRENT_BACKUP/"
    
    # 创建备份清单
    find "$CURRENT_BACKUP" -type f -exec ls -la {} \; > "$CURRENT_BACKUP/backup_manifest.txt"
    
    log_message "数据备份完成: $CURRENT_BACKUP"
}

# 收集证据
collect_evidence() {
    log_message "开始收集安全事件证据..."
    
    EVIDENCE_DIR="/opt/evidence/$(date +%Y%m%d_%H%M%S)"
    mkdir -p "$EVIDENCE_DIR"
    
    # 系统信息
    uname -a > "$EVIDENCE_DIR/system_info.txt"
    ps aux > "$EVIDENCE_DIR/processes.txt"
    netstat -tulpn > "$EVIDENCE_DIR/network_connections.txt"
    lsof > "$EVIDENCE_DIR/open_files.txt"
    
    # 用户信息
    who > "$EVIDENCE_DIR/logged_users.txt"
    last -n 100 > "$EVIDENCE_DIR/login_history.txt"
    
    # 文件系统信息
    find /tmp -type f -mtime -1 -ls > "$EVIDENCE_DIR/recent_tmp_files.txt"
    find /var/www -type f -mtime -1 -ls > "$EVIDENCE_DIR/recent_web_files.txt"
    
    # 网络流量
    tcpdump -i any -w "$EVIDENCE_DIR/network_traffic.pcap" -c 1000 &
    TCPDUMP_PID=$!
    sleep 30
    kill $TCPDUMP_PID 2>/dev/null || true
    
    log_message "证据收集完成: $EVIDENCE_DIR"
}

# 恶意文件隔离
quarantine_malicious_files() {
    local file_path="$1"
    
    if [ ! -f "$file_path" ]; then
        log_message "文件不存在: $file_path"
        return 1
    fi
    
    log_message "隔离恶意文件: $file_path"
    
    mkdir -p "$QUARANTINE_DIR"
    
    # 计算文件哈希
    FILE_HASH=$(sha256sum "$file_path" | cut -d' ' -f1)
    
    # 移动文件到隔离区
    mv "$file_path" "$QUARANTINE_DIR/${FILE_HASH}_$(basename $file_path)"
    
    # 记录隔离信息
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $file_path -> $QUARANTINE_DIR/${FILE_HASH}_$(basename $file_path)" >> "$QUARANTINE_DIR/quarantine.log"
    
    log_message "文件已隔离: $QUARANTINE_DIR/${FILE_HASH}_$(basename $file_path)"
}

# 阻止恶意IP
block_malicious_ip() {
    local ip="$1"
    local duration="${2:-3600}"  # 默认1小时
    
    log_message "阻止恶意IP: $ip (持续时间: ${duration}秒)"
    
    # 添加iptables规则
    iptables -A INPUT -s "$ip" -j DROP
    
    # 记录到黑名单
    echo "$ip" >> /etc/security/ip_blacklist.txt
    
    # 设置定时解除阻止
    echo "iptables -D INPUT -s $ip -j DROP" | at now + ${duration} seconds
    
    send_alert "已阻止恶意IP: $ip" "HIGH"
    log_message "IP阻止完成: $ip"
}

# 系统恢复
system_recovery() {
    log_message "开始系统恢复..."
    
    # 恢复网络连接
    iptables -F
    iptables -P INPUT ACCEPT
    iptables -P OUTPUT ACCEPT
    iptables -P FORWARD ACCEPT
    
    # 重启服务
    systemctl start mysql
    systemctl start nginx
    
    # 验证服务状态
    if systemctl is-active --quiet mysql && systemctl is-active --quiet nginx; then
        log_message "服务恢复成功"
        send_alert "系统恢复完成,服务正常运行" "INFO"
    else
        log_message "服务恢复失败"
        send_alert "系统恢复失败,需要人工干预" "CRITICAL"
    fi
}

# 主函数
main() {
    case "$1" in
        "isolate")
            isolate_system
            ;;
        "backup")
            backup_critical_data
            ;;
        "evidence")
            collect_evidence
            ;;
        "quarantine")
            if [ -z "$2" ]; then
                echo "用法: $0 quarantine <file_path>"
                exit 1
            fi
            quarantine_malicious_files "$2"
            ;;
        "block-ip")
            if [ -z "$2" ]; then
                echo "用法: $0 block-ip <ip_address> [duration]"
                exit 1
            fi
            block_malicious_ip "$2" "$3"
            ;;
        "recover")
            system_recovery
            ;;
        "full-response")
            log_message "开始完整应急响应流程"
            backup_critical_data
            collect_evidence
            isolate_system
            send_alert "完整应急响应流程已执行" "CRITICAL"
            ;;
        *)
            echo "用法: $0 {isolate|backup|evidence|quarantine|block-ip|recover|full-response}"
            echo "  isolate        - 隔离系统"
            echo "  backup         - 备份关键数据"
            echo "  evidence       - 收集证据"
            echo "  quarantine     - 隔离恶意文件"
            echo "  block-ip       - 阻止恶意IP"
            echo "  recover        - 系统恢复"
            echo "  full-response  - 执行完整应急响应"
            exit 1
            ;;
    esac
}

main "$@"

9.3 事件响应手册

9.3.1 数据泄露响应

响应步骤:

  1. 立即行动0-1小时

    • 确认泄露范围和影响
    • 停止数据泄露源
    • 保护剩余数据
    • 通知应急响应团队
  2. 深入调查1-24小时

    • 分析泄露原因
    • 确定泄露的具体数据
    • 评估业务影响
    • 收集相关证据
  3. 通知相关方24-72小时

    • 通知监管机构
    • 通知受影响用户
    • 发布公告说明
    • 配合调查工作

9.3.2 恶意软件感染响应

响应步骤:

  1. 隔离感染系统

    • 断开网络连接
    • 停止相关服务
    • 防止横向传播
  2. 分析恶意软件

    • 识别恶意软件类型
    • 分析攻击向量
    • 评估损害程度
  3. 清除和恢复

    • 清除恶意软件
    • 修复受损文件
    • 恢复系统功能
    • 加强防护措施

10. 总结

10.1 安全架构总结

结伴客项目的安全架构采用了多层防护策略,包括:

  • 网络安全层WAF、防火墙、DDoS防护
  • 应用安全层:身份认证、权限控制、输入验证
  • 数据安全层:数据加密、访问控制、备份恢复
  • 监控审计层:安全监控、日志审计、入侵检测

10.2 关键安全措施

  1. 身份认证JWT令牌 + 多因子认证
  2. 权限控制基于角色的访问控制RBAC
  3. 数据保护AES-256加密 + 数据分类保护
  4. 网络防护:多层防火墙 + WAF规则
  5. 安全监控:实时监控 + 自动告警
  6. 应急响应:完整的事件响应流程

10.3 持续改进

安全是一个持续的过程,需要:

  • 定期安全评估:每季度进行安全评估
  • 漏洞扫描:每月进行自动化漏洞扫描
  • 安全培训:定期对开发和运维人员进行安全培训
  • 威胁情报:关注最新的安全威胁和漏洞信息
  • 安全更新:及时更新系统和组件的安全补丁

10.4 合规要求

项目需要满足以下合规要求:

  • 《网络安全法》:数据保护和网络安全要求
  • 《数据安全法》:数据分类分级保护
  • 《个人信息保护法》:个人信息处理规范
  • 等保2.0:信息系统安全等级保护

通过实施本安全文档中的各项措施,可以有效保护结伴客项目的安全,确保用户数据和业务系统的安全稳定运行。