# 后端管理开发文档 ## 版本历史 | 版本 | 日期 | 作者 | 变更说明 | |------|------|------|----------| | 1.0 | 2024-01-20 | 后端开发团队 | 初始版本 | ## 1. 项目概述 ### 1.1 项目介绍 畜牧养殖管理平台后端管理系统,为管理员提供系统管理、数据管理、业务监控等后台管理功能。基于微服务架构设计,提供高性能、高可用的后端服务支持。 ### 1.2 技术栈 - **开发语言**: Python 3.9+ - **Web框架**: FastAPI - **数据库**: MySQL 8.0 + Redis 6.0 + MongoDB 5.0 - **ORM框架**: SQLAlchemy + Alembic - **异步框架**: asyncio + aioredis + motor - **消息队列**: Celery + Redis - **缓存系统**: Redis - **搜索引擎**: Elasticsearch - **文件存储**: MinIO / 阿里云OSS - **监控系统**: Prometheus + Grafana - **日志系统**: ELK Stack - **容器化**: Docker + Docker Compose - **API文档**: Swagger/OpenAPI ### 1.3 项目结构 ``` backend/ ├── app/ │ ├── api/ # API路由 │ │ ├── v1/ # API版本1 │ │ │ ├── admin/ # 管理员相关接口 │ │ │ ├── auth/ # 认证相关接口 │ │ │ ├── user/ # 用户管理接口 │ │ │ ├── farm/ # 养殖场管理接口 │ │ │ ├── animal/ # 动物管理接口 │ │ │ ├── order/ # 订单管理接口 │ │ │ ├── finance/ # 财务管理接口 │ │ │ ├── system/ # 系统管理接口 │ │ │ └── statistics/ # 统计分析接口 │ │ └── deps.py # 依赖注入 │ ├── core/ # 核心配置 │ │ ├── config.py # 配置管理 │ │ ├── security.py # 安全相关 │ │ ├── database.py # 数据库配置 │ │ └── middleware.py # 中间件 │ ├── models/ # 数据模型 │ │ ├── admin.py # 管理员模型 │ │ ├── user.py # 用户模型 │ │ ├── farm.py # 养殖场模型 │ │ ├── animal.py # 动物模型 │ │ ├── order.py # 订单模型 │ │ └── system.py # 系统模型 │ ├── schemas/ # Pydantic模型 │ │ ├── admin.py # 管理员Schema │ │ ├── user.py # 用户Schema │ │ ├── farm.py # 养殖场Schema │ │ └── common.py # 通用Schema │ ├── services/ # 业务逻辑 │ │ ├── admin_service.py # 管理员服务 │ │ ├── user_service.py # 用户服务 │ │ ├── farm_service.py # 养殖场服务 │ │ ├── order_service.py # 订单服务 │ │ └── statistics_service.py # 统计服务 │ ├── utils/ # 工具函数 │ │ ├── auth.py # 认证工具 │ │ ├── cache.py # 缓存工具 │ │ ├── email.py # 邮件工具 │ │ ├── file.py # 文件工具 │ │ └── validators.py # 验证工具 │ ├── tasks/ # 异步任务 │ │ ├── celery_app.py # Celery配置 │ │ ├── email_tasks.py # 邮件任务 │ │ ├── report_tasks.py # 报表任务 │ │ └── cleanup_tasks.py # 清理任务 │ └── main.py # 应用入口 ├── migrations/ # 数据库迁移 ├── tests/ # 测试文件 ├── scripts/ # 脚本文件 ├── docs/ # 文档 ├── docker/ # Docker配置 ├── requirements/ # 依赖文件 │ ├── base.txt # 基础依赖 │ ├── dev.txt # 开发依赖 │ └── prod.txt # 生产依赖 ├── .env.example # 环境变量示例 ├── docker-compose.yml # Docker Compose配置 ├── Dockerfile # Docker镜像配置 └── README.md ``` ## 2. 开发环境搭建 ### 2.1 环境要求 - Python 3.9+ - MySQL 8.0+ - Redis 6.0+ - MongoDB 5.0+ - Docker & Docker Compose - Git ### 2.2 安装步骤 ```bash # 1. 克隆项目 git clone cd backend # 2. 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 3. 安装依赖 pip install -r requirements/dev.txt # 4. 配置环境变量 cp .env.example .env # 编辑 .env 文件,配置数据库连接等信息 # 5. 启动数据库服务(使用Docker) docker-compose up -d mysql redis mongodb # 6. 运行数据库迁移 alembic upgrade head # 7. 启动开发服务器 uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # 8. 启动Celery工作进程 celery -A app.tasks.celery_app worker --loglevel=info # 9. 启动Celery Beat调度器 celery -A app.tasks.celery_app beat --loglevel=info ``` ### 2.3 开发工具配置 - **IDE**: PyCharm / VSCode - **代码规范**: Black + isort + flake8 - **类型检查**: mypy - **测试框架**: pytest - **API测试**: Postman / Insomnia ## 3. 开发计划 ### 3.1 第一阶段:基础框架搭建(1周) #### 3.1.1 项目初始化 **任务**: 搭建基础项目架构 **负责人**: 后端架构师 **工期**: 2天 **详细任务**: - [ ] 创建FastAPI项目结构 - [ ] 配置数据库连接和ORM - [ ] 配置Redis缓存和会话管理 - [ ] 配置MongoDB文档数据库 - [ ] 配置Celery异步任务队列 - [ ] 配置日志系统和监控 - [ ] 配置Docker容器化环境 #### 3.1.2 认证授权系统 **任务**: 实现认证授权和权限控制 **负责人**: 后端工程师A **工期**: 3天 **详细任务**: - [ ] 实现JWT Token认证机制 - [ ] 实现管理员登录和注册 - [ ] 实现权限控制中间件 - [ ] 实现角色权限管理 - [ ] 实现Token刷新机制 - [ ] 实现登录日志记录 **接口列表**: ``` POST /api/v1/auth/login # 管理员登录 POST /api/v1/auth/logout # 管理员登出 POST /api/v1/auth/refresh # 刷新Token GET /api/v1/auth/profile # 获取个人信息 PUT /api/v1/auth/profile # 更新个人信息 POST /api/v1/auth/change-password # 修改密码 ``` #### 3.1.3 基础中间件和工具 **任务**: 开发基础中间件和工具函数 **负责人**: 后端工程师B **工期**: 2天 **详细任务**: - [ ] 实现CORS跨域中间件 - [ ] 实现请求日志中间件 - [ ] 实现异常处理中间件 - [ ] 实现限流中间件 - [ ] 实现缓存工具函数 - [ ] 实现文件上传工具 ### 3.2 第二阶段:核心业务模块开发(4-5周) #### 3.2.1 用户管理模块 **任务**: 实现用户管理功能 **负责人**: 后端工程师A **工期**: 5天 **详细任务**: - [ ] 用户数据模型设计 - [ ] 用户CRUD操作接口 - [ ] 用户搜索和筛选功能 - [ ] 用户状态管理(启用/禁用) - [ ] 用户批量操作功能 - [ ] 用户数据导入导出 - [ ] 用户操作日志记录 **接口列表**: ``` GET /api/v1/admin/users # 获取用户列表 GET /api/v1/admin/users/{id} # 获取用户详情 POST /api/v1/admin/users # 创建用户 PUT /api/v1/admin/users/{id} # 更新用户 DELETE /api/v1/admin/users/{id} # 删除用户 POST /api/v1/admin/users/batch # 批量操作用户 PUT /api/v1/admin/users/{id}/status # 更新用户状态 GET /api/v1/admin/users/export # 导出用户数据 POST /api/v1/admin/users/import # 导入用户数据 ``` #### 3.2.2 角色权限管理模块 **任务**: 实现角色权限管理功能 **负责人**: 后端工程师B **工期**: 4天 **详细任务**: - [ ] 角色权限数据模型设计 - [ ] 角色CRUD操作接口 - [ ] 权限树形结构管理 - [ ] 用户角色分配功能 - [ ] 权限验证装饰器 - [ ] 权限变更日志记录 **接口列表**: ``` GET /api/v1/admin/roles # 获取角色列表 GET /api/v1/admin/roles/{id} # 获取角色详情 POST /api/v1/admin/roles # 创建角色 PUT /api/v1/admin/roles/{id} # 更新角色 DELETE /api/v1/admin/roles/{id} # 删除角色 GET /api/v1/admin/permissions # 获取权限列表 POST /api/v1/admin/roles/{id}/permissions # 分配权限 GET /api/v1/admin/users/{id}/roles # 获取用户角色 POST /api/v1/admin/users/{id}/roles # 分配用户角色 ``` #### 3.2.3 养殖场管理模块 **任务**: 实现养殖场管理功能 **负责人**: 后端工程师C **工期**: 6天 **详细任务**: - [ ] 养殖场数据模型设计 - [ ] 养殖场CRUD操作接口 - [ ] 养殖场审核功能 - [ ] 养殖场统计分析 - [ ] 养殖场地理位置管理 - [ ] 养殖场认证管理 - [ ] 养殖场数据导出 **接口列表**: ``` GET /api/v1/admin/farms # 获取养殖场列表 GET /api/v1/admin/farms/{id} # 获取养殖场详情 POST /api/v1/admin/farms # 创建养殖场 PUT /api/v1/admin/farms/{id} # 更新养殖场 DELETE /api/v1/admin/farms/{id} # 删除养殖场 PUT /api/v1/admin/farms/{id}/audit # 审核养殖场 GET /api/v1/admin/farms/statistics # 养殖场统计 GET /api/v1/admin/farms/export # 导出养殖场数据 ``` #### 3.2.4 动物管理模块 **任务**: 实现动物档案管理功能 **负责人**: 后端工程师D **工期**: 6天 **详细任务**: - [ ] 动物档案数据模型设计 - [ ] 动物CRUD操作接口 - [ ] 动物健康记录管理 - [ ] 动物生长记录管理 - [ ] 动物统计分析功能 - [ ] 动物批量操作功能 - [ ] 动物数据导出功能 **接口列表**: ``` GET /api/v1/admin/animals # 获取动物列表 GET /api/v1/admin/animals/{id} # 获取动物详情 POST /api/v1/admin/animals # 创建动物档案 PUT /api/v1/admin/animals/{id} # 更新动物档案 DELETE /api/v1/admin/animals/{id} # 删除动物档案 GET /api/v1/admin/animals/{id}/health # 获取健康记录 POST /api/v1/admin/animals/{id}/health # 添加健康记录 GET /api/v1/admin/animals/{id}/growth # 获取生长记录 POST /api/v1/admin/animals/{id}/growth # 添加生长记录 GET /api/v1/admin/animals/statistics # 动物统计 POST /api/v1/admin/animals/batch # 批量操作动物 ``` #### 3.2.5 订单管理模块 **任务**: 实现订单管理功能 **负责人**: 后端工程师E **工期**: 7天 **详细任务**: - [ ] 订单数据模型设计 - [ ] 订单CRUD操作接口 - [ ] 订单状态管理功能 - [ ] 订单审核功能 - [ ] 订单统计分析 - [ ] 退款处理功能 - [ ] 订单数据导出 **接口列表**: ``` GET /api/v1/admin/orders # 获取订单列表 GET /api/v1/admin/orders/{id} # 获取订单详情 PUT /api/v1/admin/orders/{id} # 更新订单 DELETE /api/v1/admin/orders/{id} # 删除订单 PUT /api/v1/admin/orders/{id}/status # 更新订单状态 PUT /api/v1/admin/orders/{id}/audit # 审核订单 POST /api/v1/admin/orders/{id}/refund # 处理退款 GET /api/v1/admin/orders/statistics # 订单统计 GET /api/v1/admin/orders/export # 导出订单数据 ``` ### 3.3 第三阶段:财务和支付管理(2-3周) #### 3.3.1 财务管理模块 **任务**: 实现财务管理功能 **负责人**: 后端工程师A **工期**: 6天 **详细任务**: - [ ] 财务数据模型设计 - [ ] 收入支出统计接口 - [ ] 资金流水管理 - [ ] 财务报表生成 - [ ] 对账功能实现 - [ ] 财务数据导出 - [ ] 财务异常监控 **接口列表**: ``` GET /api/v1/admin/finance/overview # 财务概览 GET /api/v1/admin/finance/income # 收入统计 GET /api/v1/admin/finance/expense # 支出统计 GET /api/v1/admin/finance/flow # 资金流水 GET /api/v1/admin/finance/reports # 财务报表 POST /api/v1/admin/finance/reconcile # 对账处理 GET /api/v1/admin/finance/export # 导出财务数据 ``` #### 3.3.2 支付管理模块 **任务**: 实现支付管理功能 **负责人**: 后端工程师B **工期**: 5天 **详细任务**: - [ ] 支付记录数据模型 - [ ] 支付记录查询接口 - [ ] 支付方式配置管理 - [ ] 支付异常处理 - [ ] 支付统计分析 - [ ] 支付对账功能 - [ ] 支付数据导出 **接口列表**: ``` GET /api/v1/admin/payments # 获取支付记录 GET /api/v1/admin/payments/{id} # 获取支付详情 GET /api/v1/admin/payments/config # 获取支付配置 PUT /api/v1/admin/payments/config # 更新支付配置 GET /api/v1/admin/payments/exceptions # 支付异常 PUT /api/v1/admin/payments/{id}/handle # 处理支付异常 GET /api/v1/admin/payments/statistics # 支付统计 POST /api/v1/admin/payments/reconcile # 支付对账 ``` #### 3.3.3 结算管理模块 **任务**: 实现结算管理功能 **负责人**: 后端工程师C **工期**: 4天 **详细任务**: - [ ] 结算数据模型设计 - [ ] 结算规则配置 - [ ] 自动结算功能 - [ ] 结算记录管理 - [ ] 结算统计分析 - [ ] 结算数据导出 **接口列表**: ``` GET /api/v1/admin/settlements # 获取结算记录 GET /api/v1/admin/settlements/{id} # 获取结算详情 POST /api/v1/admin/settlements # 创建结算 PUT /api/v1/admin/settlements/{id} # 更新结算 GET /api/v1/admin/settlements/config # 获取结算配置 PUT /api/v1/admin/settlements/config # 更新结算配置 POST /api/v1/admin/settlements/auto # 自动结算 GET /api/v1/admin/settlements/statistics # 结算统计 ``` ### 3.4 第四阶段:系统管理和配置(1-2周) #### 3.4.1 系统配置模块 **任务**: 实现系统配置管理 **负责人**: 后端工程师D **工期**: 4天 **详细任务**: - [ ] 系统参数配置管理 - [ ] 字典数据管理 - [ ] 系统公告管理 - [ ] 邮件模板配置 - [ ] 短信模板配置 - [ ] 系统备份恢复 **接口列表**: ``` GET /api/v1/admin/system/config # 获取系统配置 PUT /api/v1/admin/system/config # 更新系统配置 GET /api/v1/admin/system/dict # 获取字典数据 POST /api/v1/admin/system/dict # 创建字典数据 PUT /api/v1/admin/system/dict/{id} # 更新字典数据 DELETE /api/v1/admin/system/dict/{id} # 删除字典数据 GET /api/v1/admin/system/notices # 获取系统公告 POST /api/v1/admin/system/notices # 创建系统公告 PUT /api/v1/admin/system/notices/{id} # 更新系统公告 DELETE /api/v1/admin/system/notices/{id} # 删除系统公告 ``` #### 3.4.2 日志管理模块 **任务**: 实现系统日志管理 **负责人**: 后端工程师E **工期**: 3天 **详细任务**: - [ ] 操作日志记录和查询 - [ ] 登录日志记录和查询 - [ ] 系统错误日志管理 - [ ] 日志搜索和筛选 - [ ] 日志导出功能 - [ ] 日志清理任务 **接口列表**: ``` GET /api/v1/admin/logs/operation # 获取操作日志 GET /api/v1/admin/logs/login # 获取登录日志 GET /api/v1/admin/logs/error # 获取错误日志 GET /api/v1/admin/logs/search # 搜索日志 GET /api/v1/admin/logs/export # 导出日志 POST /api/v1/admin/logs/cleanup # 清理日志 ``` #### 3.4.3 监控管理模块 **任务**: 实现系统监控功能 **负责人**: 后端工程师F **工期**: 3天 **详细任务**: - [ ] 系统性能监控 - [ ] 服务状态监控 - [ ] 数据库监控 - [ ] 接口调用监控 - [ ] 告警规则配置 - [ ] 监控报表生成 **接口列表**: ``` GET /api/v1/admin/monitor/system # 系统监控数据 GET /api/v1/admin/monitor/service # 服务监控数据 GET /api/v1/admin/monitor/database # 数据库监控数据 GET /api/v1/admin/monitor/api # 接口监控数据 GET /api/v1/admin/monitor/alerts # 获取告警规则 POST /api/v1/admin/monitor/alerts # 创建告警规则 PUT /api/v1/admin/monitor/alerts/{id} # 更新告警规则 DELETE /api/v1/admin/monitor/alerts/{id} # 删除告警规则 ``` ### 3.5 第五阶段:数据统计和报表(1-2周) #### 3.5.1 数据统计模块 **任务**: 实现数据统计分析功能 **负责人**: 后端工程师A **工期**: 5天 **详细任务**: - [ ] 用户数据统计分析 - [ ] 业务数据统计分析 - [ ] 交易数据统计分析 - [ ] 趋势分析算法实现 - [ ] 自定义统计报表 - [ ] 数据对比分析功能 **接口列表**: ``` GET /api/v1/admin/statistics/user # 用户统计 GET /api/v1/admin/statistics/business # 业务统计 GET /api/v1/admin/statistics/trade # 交易统计 GET /api/v1/admin/statistics/trend # 趋势分析 POST /api/v1/admin/statistics/custom # 自定义统计 GET /api/v1/admin/statistics/compare # 数据对比 ``` #### 3.5.2 报表管理模块 **任务**: 实现报表生成和管理 **负责人**: 后端工程师B **工期**: 4天 **详细任务**: - [ ] 报表模板管理 - [ ] 报表生成引擎 - [ ] 定时报表任务 - [ ] 报表分享功能 - [ ] 报表权限控制 - [ ] 报表数据缓存 **接口列表**: ``` GET /api/v1/admin/reports/templates # 获取报表模板 POST /api/v1/admin/reports/templates # 创建报表模板 PUT /api/v1/admin/reports/templates/{id} # 更新报表模板 DELETE /api/v1/admin/reports/templates/{id} # 删除报表模板 POST /api/v1/admin/reports/generate # 生成报表 GET /api/v1/admin/reports/{id} # 获取报表 GET /api/v1/admin/reports/schedule # 获取定时报表 POST /api/v1/admin/reports/schedule # 创建定时报表 POST /api/v1/admin/reports/{id}/share # 分享报表 ``` #### 3.5.3 数据导出模块 **任务**: 实现数据导出功能 **负责人**: 后端工程师C **工期**: 3天 **详细任务**: - [ ] Excel数据导出功能 - [ ] CSV数据导出功能 - [ ] PDF报表导出功能 - [ ] 大数据量分批导出 - [ ] 导出任务队列管理 - [ ] 导出文件管理 **接口列表**: ``` POST /api/v1/admin/export/excel # Excel导出 POST /api/v1/admin/export/csv # CSV导出 POST /api/v1/admin/export/pdf # PDF导出 GET /api/v1/admin/export/tasks # 获取导出任务 GET /api/v1/admin/export/tasks/{id} # 获取导出任务状态 GET /api/v1/admin/export/files # 获取导出文件列表 GET /api/v1/admin/export/files/{id} # 下载导出文件 ``` ### 3.6 第六阶段:测试和优化(1周) #### 3.6.1 功能测试 **任务**: 全面功能测试 **负责人**: 测试工程师 **工期**: 3天 **详细任务**: - [ ] 接口功能测试 - [ ] 权限控制测试 - [ ] 数据操作测试 - [ ] 异常处理测试 - [ ] 性能压力测试 #### 3.6.2 性能优化 **任务**: 性能优化和体验提升 **负责人**: 后端架构师 **工期**: 2天 **详细任务**: - [ ] 数据库查询优化 - [ ] 缓存策略优化 - [ ] 接口响应时间优化 - [ ] 内存使用优化 - [ ] 并发处理优化 ## 4. 技术实现 ### 4.1 项目配置 ```python # app/core/config.py from pydantic import BaseSettings from typing import Optional class Settings(BaseSettings): # 应用配置 APP_NAME: str = "畜牧养殖管理平台后端管理系统" APP_VERSION: str = "1.0.0" DEBUG: bool = False # 数据库配置 DATABASE_URL: str REDIS_URL: str MONGODB_URL: str # 安全配置 SECRET_KEY: str ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 REFRESH_TOKEN_EXPIRE_DAYS: int = 7 # 文件存储配置 UPLOAD_PATH: str = "uploads" MAX_FILE_SIZE: int = 10 * 1024 * 1024 # 10MB # 邮件配置 SMTP_HOST: Optional[str] = None SMTP_PORT: Optional[int] = None SMTP_USERNAME: Optional[str] = None SMTP_PASSWORD: Optional[str] = None # Celery配置 CELERY_BROKER_URL: str CELERY_RESULT_BACKEND: str class Config: env_file = ".env" settings = Settings() ``` ### 4.2 数据库模型 ```python # app/models/admin.py from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func Base = declarative_base() class Admin(Base): __tablename__ = "admins" id = Column(Integer, primary_key=True, index=True) username = Column(String(50), unique=True, index=True, nullable=False) email = Column(String(100), unique=True, index=True, nullable=False) hashed_password = Column(String(255), nullable=False) full_name = Column(String(100)) avatar = Column(String(255)) is_active = Column(Boolean, default=True) is_superuser = Column(Boolean, default=False) last_login = Column(DateTime) created_at = Column(DateTime, server_default=func.now()) updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) class Role(Base): __tablename__ = "roles" id = Column(Integer, primary_key=True, index=True) name = Column(String(50), unique=True, nullable=False) description = Column(Text) permissions = Column(Text) # JSON格式存储权限列表 is_active = Column(Boolean, default=True) created_at = Column(DateTime, server_default=func.now()) updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) class AdminRole(Base): __tablename__ = "admin_roles" id = Column(Integer, primary_key=True, index=True) admin_id = Column(Integer, nullable=False) role_id = Column(Integer, nullable=False) created_at = Column(DateTime, server_default=func.now()) ``` ### 4.3 API路由实现 ```python # app/api/v1/admin/users.py from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from typing import List, Optional from app.api.deps import get_db, get_current_admin from app.schemas.user import UserResponse, UserCreate, UserUpdate from app.services.user_service import UserService from app.utils.auth import require_permissions router = APIRouter() @router.get("/", response_model=List[UserResponse]) async def get_users( skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), keyword: Optional[str] = Query(None), status: Optional[str] = Query(None), db: Session = Depends(get_db), current_admin = Depends(get_current_admin) ): """获取用户列表""" require_permissions(current_admin, ["user:read"]) user_service = UserService(db) users, total = await user_service.get_users( skip=skip, limit=limit, keyword=keyword, status=status ) return { "list": users, "total": total, "page": skip // limit + 1, "size": limit } @router.get("/{user_id}", response_model=UserResponse) async def get_user( user_id: int, db: Session = Depends(get_db), current_admin = Depends(get_current_admin) ): """获取用户详情""" require_permissions(current_admin, ["user:read"]) user_service = UserService(db) user = await user_service.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="用户不存在") return user @router.post("/", response_model=UserResponse) async def create_user( user_data: UserCreate, db: Session = Depends(get_db), current_admin = Depends(get_current_admin) ): """创建用户""" require_permissions(current_admin, ["user:create"]) user_service = UserService(db) # 检查用户名和邮箱是否已存在 if await user_service.get_user_by_username(user_data.username): raise HTTPException(status_code=400, detail="用户名已存在") if await user_service.get_user_by_email(user_data.email): raise HTTPException(status_code=400, detail="邮箱已存在") user = await user_service.create_user(user_data) # 记录操作日志 await log_operation( admin_id=current_admin.id, action="create_user", resource_type="user", resource_id=user.id, details=f"创建用户: {user.username}" ) return user @router.put("/{user_id}", response_model=UserResponse) async def update_user( user_id: int, user_data: UserUpdate, db: Session = Depends(get_db), current_admin = Depends(get_current_admin) ): """更新用户""" require_permissions(current_admin, ["user:update"]) user_service = UserService(db) user = await user_service.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="用户不存在") updated_user = await user_service.update_user(user_id, user_data) # 记录操作日志 await log_operation( admin_id=current_admin.id, action="update_user", resource_type="user", resource_id=user_id, details=f"更新用户: {user.username}" ) return updated_user @router.delete("/{user_id}") async def delete_user( user_id: int, db: Session = Depends(get_db), current_admin = Depends(get_current_admin) ): """删除用户""" require_permissions(current_admin, ["user:delete"]) user_service = UserService(db) user = await user_service.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="用户不存在") await user_service.delete_user(user_id) # 记录操作日志 await log_operation( admin_id=current_admin.id, action="delete_user", resource_type="user", resource_id=user_id, details=f"删除用户: {user.username}" ) return {"message": "用户删除成功"} @router.post("/batch") async def batch_operation_users( operation: str, user_ids: List[int], db: Session = Depends(get_db), current_admin = Depends(get_current_admin) ): """批量操作用户""" require_permissions(current_admin, ["user:update", "user:delete"]) user_service = UserService(db) if operation == "delete": require_permissions(current_admin, ["user:delete"]) result = await user_service.batch_delete_users(user_ids) elif operation == "activate": result = await user_service.batch_update_status(user_ids, "active") elif operation == "deactivate": result = await user_service.batch_update_status(user_ids, "inactive") else: raise HTTPException(status_code=400, detail="不支持的操作类型") # 记录操作日志 await log_operation( admin_id=current_admin.id, action=f"batch_{operation}_users", resource_type="user", resource_id=None, details=f"批量{operation}用户: {len(user_ids)}个" ) return result ``` ### 4.4 业务服务层 ```python # app/services/user_service.py from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from typing import List, Tuple, Optional from datetime import datetime from app.models.user import User from app.schemas.user import UserCreate, UserUpdate from app.utils.security import get_password_hash from app.utils.cache import cache_manager class UserService: def __init__(self, db: Session): self.db = db async def get_users( self, skip: int = 0, limit: int = 20, keyword: Optional[str] = None, status: Optional[str] = None ) -> Tuple[List[User], int]: """获取用户列表""" query = self.db.query(User) # 搜索条件 if keyword: query = query.filter( or_( User.username.contains(keyword), User.email.contains(keyword), User.phone.contains(keyword) ) ) # 状态筛选 if status: query = query.filter(User.status == status) # 获取总数 total = query.count() # 分页查询 users = query.offset(skip).limit(limit).all() return users, total async def get_user_by_id(self, user_id: int) -> Optional[User]: """根据ID获取用户""" # 先从缓存获取 cache_key = f"user:{user_id}" cached_user = await cache_manager.get(cache_key) if cached_user: return cached_user user = self.db.query(User).filter(User.id == user_id).first() # 缓存用户信息 if user: await cache_manager.set(cache_key, user, expire=300) return user async def get_user_by_username(self, username: str) -> Optional[User]: """根据用户名获取用户""" return self.db.query(User).filter(User.username == username).first() async def get_user_by_email(self, email: str) -> Optional[User]: """根据邮箱获取用户""" return self.db.query(User).filter(User.email == email).first() async def create_user(self, user_data: UserCreate) -> User: """创建用户""" # 密码加密 hashed_password = get_password_hash(user_data.password) user = User( username=user_data.username, email=user_data.email, phone=user_data.phone, hashed_password=hashed_password, full_name=user_data.full_name, status="active", created_at=datetime.utcnow() ) self.db.add(user) self.db.commit() self.db.refresh(user) # 清除相关缓存 await self._clear_user_cache(user.id) return user async def update_user(self, user_id: int, user_data: UserUpdate) -> User: """更新用户""" user = self.db.query(User).filter(User.id == user_id).first() if not user: return None # 更新字段 for field, value in user_data.dict(exclude_unset=True).items(): if field == "password" and value: setattr(user, "hashed_password", get_password_hash(value)) else: setattr(user, field, value) user.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(user) # 清除缓存 await self._clear_user_cache(user_id) return user async def delete_user(self, user_id: int) -> bool: """删除用户""" user = self.db.query(User).filter(User.id == user_id).first() if not user: return False self.db.delete(user) self.db.commit() # 清除缓存 await self._clear_user_cache(user_id) return True async def batch_delete_users(self, user_ids: List[int]) -> dict: """批量删除用户""" deleted_count = self.db.query(User).filter(User.id.in_(user_ids)).delete() self.db.commit() # 清除缓存 for user_id in user_ids: await self._clear_user_cache(user_id) return {"deleted_count": deleted_count} async def batch_update_status(self, user_ids: List[int], status: str) -> dict: """批量更新用户状态""" updated_count = self.db.query(User).filter(User.id.in_(user_ids)).update( {"status": status, "updated_at": datetime.utcnow()} ) self.db.commit() # 清除缓存 for user_id in user_ids: await self._clear_user_cache(user_id) return {"updated_count": updated_count} async def _clear_user_cache(self, user_id: int): """清除用户相关缓存""" cache_keys = [ f"user:{user_id}", "users:list:*", # 清除列表缓存 ] for key in cache_keys: await cache_manager.delete(key) ``` ### 4.5 权限控制实现 ```python # app/utils/auth.py from functools import wraps from fastapi import HTTPException, status from typing import List def require_permissions(admin, required_permissions: List[str]): """权限验证装饰器""" if admin.is_superuser: return True admin_permissions = get_admin_permissions(admin) for permission in required_permissions: if permission not in admin_permissions: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"缺少权限: {permission}" ) return True def get_admin_permissions(admin) -> List[str]: """获取管理员权限列表""" permissions = [] # 从数据库获取管理员角色和权限 # 这里简化处理,实际应该从数据库查询 if hasattr(admin, 'roles'): for role in admin.roles: if role.permissions: permissions.extend(role.permissions.split(',')) return list(set(permissions)) def permission_required(permissions: List[str]): """权限验证装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): # 从kwargs中获取current_admin current_admin = kwargs.get('current_admin') if not current_admin: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证" ) require_permissions(current_admin, permissions) return await func(*args, **kwargs) return wrapper return decorator ``` ### 4.6 异步任务实现 ```python # app/tasks/report_tasks.py from celery import Celery from app.core.config import settings from app.services.report_service import ReportService from app.utils.email import send_email import pandas as pd from datetime import datetime celery_app = Celery( "admin_tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND ) @celery_app.task def generate_user_report(admin_id: int, report_params: dict): """生成用户报表任务""" try: report_service = ReportService() # 生成报表数据 data = report_service.generate_user_report(report_params) # 生成Excel文件 filename = f"user_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" filepath = f"reports/{filename}" df = pd.DataFrame(data) df.to_excel(filepath, index=False) # 发送邮件通知 admin = get_admin_by_id(admin_id) if admin and admin.email: send_email( to_email=admin.email, subject="用户报表生成完成", content=f"您请求的用户报表已生成完成,请登录系统下载。\n文件名: {filename}", attachments=[filepath] ) return { "status": "success", "filename": filename, "filepath": filepath } except Exception as e: return { "status": "error", "error": str(e) } @celery_app.task def cleanup_old_files(): """清理旧文件任务""" import os from datetime import datetime, timedelta # 清理30天前的报表文件 cutoff_date = datetime.now() - timedelta(days=30) reports_dir = "reports" if os.path.exists(reports_dir): for filename in os.listdir(reports_dir): filepath = os.path.join(reports_dir, filename) if os.path.isfile(filepath): file_time = datetime.fromtimestamp(os.path.getctime(filepath)) if file_time < cutoff_date: os.remove(filepath) return {"status": "success", "message": "旧文件清理完成"} # 定时任务配置 celery_app.conf.beat_schedule = { 'cleanup-old-files': { 'task': 'app.tasks.report_tasks.cleanup_old_files', 'schedule': 86400.0, # 每天执行一次 }, } ``` ## 5. 性能优化 ### 5.1 数据库优化 - **索引优化**: 为常用查询字段添加索引 - **查询优化**: 使用合适的查询语句和连接方式 - **分页优化**: 使用游标分页处理大数据量 - **连接池**: 配置合适的数据库连接池 - **读写分离**: 配置主从数据库读写分离 ### 5.2 缓存策略 - **Redis缓存**: 缓存热点数据和查询结果 - **应用缓存**: 使用内存缓存提升响应速度 - **缓存更新**: 合理的缓存更新和失效策略 - **缓存预热**: 系统启动时预热关键缓存 - **缓存监控**: 监控缓存命中率和性能 ### 5.3 接口优化 - **异步处理**: 使用异步编程提升并发性能 - **批量操作**: 提供批量操作接口减少请求次数 - **数据压缩**: 对响应数据进行压缩 - **限流控制**: 实现接口限流防止过载 - **响应优化**: 优化响应数据结构和大小 ## 6. 安全措施 ### 6.1 认证安全 - **JWT Token**: 使用JWT进行身份认证 - **Token刷新**: 实现Token自动刷新机制 - **密码加密**: 使用bcrypt加密存储密码 - **登录限制**: 实现登录失败次数限制 - **会话管理**: 安全的会话管理和超时控制 ### 6.2 权限控制 - **RBAC模型**: 基于角色的访问控制 - **细粒度权限**: 实现资源级权限控制 - **权限验证**: 接口级权限验证 - **权限缓存**: 权限信息缓存优化 - **权限审计**: 权限变更审计日志 ### 6.3 数据安全 - **SQL注入防护**: 使用ORM防止SQL注入 - **XSS防护**: 输入输出数据过滤 - **CSRF防护**: 实现CSRF Token验证 - **数据加密**: 敏感数据加密存储 - **数据脱敏**: 日志和导出数据脱敏 ## 7. 测试策略 ### 7.1 单元测试 ```python # tests/test_user_service.py import pytest from sqlalchemy.orm import Session from app.services.user_service import UserService from app.schemas.user import UserCreate @pytest.fixture def user_service(db_session: Session): return UserService(db_session) @pytest.fixture def sample_user_data(): return UserCreate( username="testuser", email="test@example.com", phone="13800138000", password="password123", full_name="测试用户" ) async def test_create_user(user_service: UserService, sample_user_data: UserCreate): """测试创建用户""" user = await user_service.create_user(sample_user_data) assert user.username == sample_user_data.username assert user.email == sample_user_data.email assert user.phone == sample_user_data.phone assert user.full_name == sample_user_data.full_name assert user.status == "active" async def test_get_user_by_id(user_service: UserService, sample_user_data: UserCreate): """测试根据ID获取用户""" # 先创建用户 created_user = await user_service.create_user(sample_user_data) # 根据ID获取用户 user = await user_service.get_user_by_id(created_user.id) assert user is not None assert user.id == created_user.id assert user.username == sample_user_data.username async def test_get_users_with_pagination(user_service: UserService): """测试分页获取用户列表""" # 创建多个测试用户 for i in range(25): user_data = UserCreate( username=f"user{i}", email=f"user{i}@example.com", phone=f"1380013800{i:02d}", password="password123", full_name=f"用户{i}" ) await user_service.create_user(user_data) # 测试分页 users, total = await user_service.get_users(skip=0, limit=10) assert len(users) == 10 assert total >= 25 # 测试第二页 users_page2, _ = await user_service.get_users(skip=10, limit=10) assert len(users_page2) == 10 assert users[0].id != users_page2[0].id ``` ### 7.2 集成测试 ```python # tests/test_api_users.py import pytest from fastapi.testclient import TestClient from app.main import app client = TestClient(app) def test_get_users_unauthorized(): """测试未授权访问用户列表""" response = client.get("/api/v1/admin/users") assert response.status_code == 401 def test_get_users_authorized(admin_token): """测试授权访问用户列表""" headers = {"Authorization": f"Bearer {admin_token}"} response = client.get("/api/v1/admin/users", headers=headers) assert response.status_code == 200 data = response.json() assert "list" in data assert "total" in data assert "page" in data assert "size" in data def test_create_user(admin_token): """测试创建用户""" headers = {"Authorization": f"Bearer {admin_token}"} user_data = { "username": "newuser", "email": "newuser@example.com", "phone": "13800138000", "password": "password123", "full_name": "新用户" } response = client.post("/api/v1/admin/users", json=user_data, headers=headers) assert response.status_code == 200 data = response.json() assert data["username"] == user_data["username"] assert data["email"] == user_data["email"] assert "id" in data def test_create_user_duplicate_username(admin_token): """测试创建重复用户名的用户""" headers = {"Authorization": f"Bearer {admin_token}"} user_data = { "username": "duplicateuser", "email": "user1@example.com", "phone": "13800138001", "password": "password123", "full_name": "用户1" } # 第一次创建 response1 = client.post("/api/v1/admin/users", json=user_data, headers=headers) assert response1.status_code == 200 # 第二次创建相同用户名 user_data["email"] = "user2@example.com" response2 = client.post("/api/v1/admin/users", json=user_data, headers=headers) assert response2.status_code == 400 assert "用户名已存在" in response2.json()["detail"] ``` ### 7.3 性能测试 ```python # tests/test_performance.py import pytest import asyncio import time from concurrent.futures import ThreadPoolExecutor from app.services.user_service import UserService async def test_concurrent_user_creation(user_service: UserService): """测试并发创建用户性能""" async def create_user(index): user_data = UserCreate( username=f"perfuser{index}", email=f"perfuser{index}@example.com", phone=f"1380013{index:04d}", password="password123", full_name=f"性能测试用户{index}" ) return await user_service.create_user(user_data) # 并发创建100个用户 start_time = time.time() tasks = [create_user(i) for i in range(100)] results = await asyncio.gather(*tasks) end_time = time.time() # 验证结果 assert len(results) == 100 assert all(user.id is not None for user in results) # 性能断言(100个用户创建应在10秒内完成) assert end_time - start_time < 10.0 async def test_large_dataset_query_performance(user_service: UserService): """测试大数据集查询性能""" # 创建大量测试数据 for i in range(1000): user_data = UserCreate( username=f"biguser{i}", email=f"biguser{i}@example.com", phone=f"1390013{i:04d}", password="password123", full_name=f"大数据测试用户{i}" ) await user_service.create_user(user_data) # 测试查询性能 start_time = time.time() users, total = await user_service.get_users(skip=0, limit=50) end_time = time.time() # 验证结果 assert len(users) == 50 assert total >= 1000 # 性能断言(查询应在1秒内完成) assert end_time - start_time < 1.0 ``` ## 8. 部署配置 ### 8.1 Docker配置 ```dockerfile # Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ default-libmysqlclient-dev \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements/prod.txt requirements.txt # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd --create-home --shell /bin/bash app \ && chown -R app:app /app USER app # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] ``` ### 8.2 Docker Compose配置 ```yaml # docker-compose.yml version: '3.8' services: app: build: . ports: - "8000:8000" environment: - DATABASE_URL=mysql+pymysql://user:password@mysql:3306/xlxumu - REDIS_URL=redis://redis:6379/0 - MONGODB_URL=mongodb://mongodb:27017/xlxumu depends_on: - mysql - redis - mongodb volumes: - ./logs:/app/logs restart: unless-stopped mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD} MYSQL_DATABASE: xlxumu MYSQL_USER: ${MYSQL_USER} MYSQL_PASSWORD: ${MYSQL_PASSWORD} volumes: - mysql_data:/var/lib/mysql - ./init.sql:/docker-entrypoint-initdb.d/init.sql ports: - "3306:3306" restart: unless-stopped redis: image: redis:6-alpine volumes: - redis_data:/data ports: - "6379:6379" restart: unless-stopped mongodb: image: mongo:5.0 environment: MONGO_INITDB_ROOT_USERNAME: ${MONGO_ROOT_USERNAME} MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ROOT_PASSWORD} volumes: - mongodb_data:/data/db ports: - "27017:27017" restart: unless-stopped nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./ssl:/etc/nginx/ssl depends_on: - app restart: unless-stopped volumes: mysql_data: redis_data: mongodb_data: ``` ### 8.3 Nginx配置 ```nginx # nginx.conf events { worker_connections 1024; } http { upstream backend { server app:8000; } server { listen 80; server_name api.xlxumu.com; location / { proxy_pass http://backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 超时设置 proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; } location /health { access_log off; return 200 "healthy\n"; } # 静态文件 location /static/ { alias /app/static/; expires 30d; } } } ``` ## 9. 监控运维 ### 9.1 健康检查 ```python # app/api/v1/health.py from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from app.core.database import get_db from app.core.redis import get_redis import asyncio router = APIRouter() @router.get("/health") async def health_check(db: Session = Depends(get_db)): """系统健康检查""" try: # 检查数据库连接 db.execute("SELECT 1") # 检查Redis连接 redis = await get_redis() await redis.ping() return { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "services": { "database": "connected", "redis": "connected" } } except Exception as e: return { "status": "unhealthy", "error": str(e), "timestamp": datetime.utcnow().isoformat() } @router.get("/metrics") async def get_metrics(): """获取系统指标""" return { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "active_connections": len(asyncio.all_tasks()) } ``` ### 9.2 日志配置 ```python # app/core/logging.py import logging import sys from pathlib import Path from loguru import logger class InterceptHandler(logging.Handler): def emit(self, record): # 获取对应的Loguru级别 try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # 查找调用者 frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 logger.opt(depth=depth, exception=record.exc_info).log( level, record.getMessage() ) def setup_logging(): """配置日志系统""" # 移除默认处理器 logger.remove() # 控制台输出 logger.add( sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss} | " "{level: <8} | " "{name}:{function}:{line} - " "{message}", level="INFO" ) # 文件输出 log_path = Path("logs") log_path.mkdir(exist_ok=True) logger.add( log_path / "app.log", rotation="1 day", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO" ) logger.add( log_path / "error.log", rotation="1 day", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="ERROR" ) # 拦截标准库日志 logging.basicConfig(handlers=[InterceptHandler()], level=0) ``` ### 9.3 性能监控 ```python # app/middleware/metrics.py import time from fastapi import Request, Response from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST # 定义指标 REQUEST_COUNT = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) REQUEST_DURATION = Histogram( 'http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'] ) async def metrics_middleware(request: Request, call_next): """性能监控中间件""" start_time = time.time() response = await call_next(request) # 记录指标 duration = time.time() - start_time endpoint = request.url.path method = request.method status = response.status_code REQUEST_COUNT.labels( method=method, endpoint=endpoint, status=status ).inc() REQUEST_DURATION.labels( method=method, endpoint=endpoint ).observe(duration) return response @router.get("/metrics") async def get_prometheus_metrics(): """Prometheus指标端点""" return Response( generate_latest(), media_type=CONTENT_TYPE_LATEST ) ``` ### 9.4 告警配置 ```yaml # monitoring/alerting.yml groups: - name: backend-admin rules: - alert: HighErrorRate expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1 for: 2m labels: severity: critical annotations: summary: "后端管理系统错误率过高" description: "错误率超过10%,持续2分钟" - alert: HighResponseTime expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2 for: 5m labels: severity: warning annotations: summary: "响应时间过长" description: "95%请求响应时间超过2秒" - alert: DatabaseConnectionFailed expr: up{job="mysql"} == 0 for: 1m labels: severity: critical annotations: summary: "数据库连接失败" description: "MySQL数据库连接中断" - alert: RedisConnectionFailed expr: up{job="redis"} == 0 for: 1m labels: severity: warning annotations: summary: "Redis连接失败" description: "Redis缓存服务连接中断" - alert: HighCPUUsage expr: cpu_usage > 80 for: 5m labels: severity: warning annotations: summary: "CPU使用率过高" description: "CPU使用率超过80%,持续5分钟" - alert: HighMemoryUsage expr: memory_usage > 85 for: 5m labels: severity: warning annotations: summary: "内存使用率过高" description: "内存使用率超过85%,持续5分钟" ``` ## 10. 总结 ### 10.1 开发亮点 1. **现代化架构**:基于FastAPI的异步架构,支持高并发 2. **完善的认证**:JWT + RBAC权限控制系统 3. **数据库优化**:多数据库支持,读写分离,缓存策略 4. **API标准化**:RESTful设计,自动生成文档 5. **监控体系**:完整的日志、指标、告警系统 6. **容器化部署**:Docker + Kubernetes支持 7. **自动化测试**:单元测试 + 集成测试覆盖 ### 10.2 技术特色 - **异步编程**:全面使用async/await提升性能 - **依赖注入**:FastAPI原生依赖注入系统 - **数据验证**:Pydantic模型自动验证 - **ORM优化**:SQLAlchemy异步支持 - **缓存策略**:多层缓存架构 - **消息队列**:Celery异步任务处理 - **搜索引擎**:Elasticsearch全文搜索 ### 10.3 扩展性设计 - **微服务架构**:支持服务拆分和独立部署 - **水平扩展**:支持负载均衡和集群部署 - **数据库分片**:支持数据库水平分片 - **缓存分布式**:Redis集群支持 - **消息队列**:支持分布式任务处理 - **API版本化**:支持多版本API并存 - **插件系统**:支持功能模块化扩展 ### 10.4 后续优化 1. **性能优化**: - 数据库查询优化 - 缓存命中率提升 - API响应时间优化 - 内存使用优化 2. **功能扩展**: - 实时通知系统 - 数据分析平台 - AI智能推荐 - 移动端API 3. **运维优化**: - 自动化部署流程 - 监控告警完善 - 日志分析系统 - 性能调优工具 4. **安全加固**: - API安全扫描 - 数据加密升级 - 访问控制优化 - 安全审计系统 --- **文档版本**: v1.0.0 **最后更新**: 2024年12月 **维护团队**: 后端开发团队