54 KiB
后端管理开发文档
版本历史
| 版本 | 日期 | 作者 | 变更说明 |
|---|---|---|---|
| 1.0 | 2024-01-20 | 后端开发团队 | 初始版本 |
1. 项目概述
1.1 项目介绍
畜牧养殖管理平台后端管理系统,为管理员提供系统管理、数据管理、业务监控等后台管理功能。基于微服务架构设计,提供高性能、高可用的后端服务支持。
1.2 技术栈
- 开发语言: Python 3.9+
- Web框架: FastAPI
- 数据库: MySQL 8.0 + Redis 6.0 + MongoDB 5.0
- ORM框架: SQLAlchemy + Alembic
- 异步框架: asyncio + aioredis + motor
- 消息队列: Celery + Redis
- 缓存系统: Redis
- 搜索引擎: Elasticsearch
- 文件存储: MinIO / 阿里云OSS
- 监控系统: Prometheus + Grafana
- 日志系统: ELK Stack
- 容器化: Docker + Docker Compose
- API文档: Swagger/OpenAPI
1.3 项目结构
backend/
├── app/
│ ├── api/ # API路由
│ │ ├── v1/ # API版本1
│ │ │ ├── admin/ # 管理员相关接口
│ │ │ ├── auth/ # 认证相关接口
│ │ │ ├── user/ # 用户管理接口
│ │ │ ├── farm/ # 养殖场管理接口
│ │ │ ├── animal/ # 动物管理接口
│ │ │ ├── order/ # 订单管理接口
│ │ │ ├── finance/ # 财务管理接口
│ │ │ ├── system/ # 系统管理接口
│ │ │ └── statistics/ # 统计分析接口
│ │ └── deps.py # 依赖注入
│ ├── core/ # 核心配置
│ │ ├── config.py # 配置管理
│ │ ├── security.py # 安全相关
│ │ ├── database.py # 数据库配置
│ │ └── middleware.py # 中间件
│ ├── models/ # 数据模型
│ │ ├── admin.py # 管理员模型
│ │ ├── user.py # 用户模型
│ │ ├── farm.py # 养殖场模型
│ │ ├── animal.py # 动物模型
│ │ ├── order.py # 订单模型
│ │ └── system.py # 系统模型
│ ├── schemas/ # Pydantic模型
│ │ ├── admin.py # 管理员Schema
│ │ ├── user.py # 用户Schema
│ │ ├── farm.py # 养殖场Schema
│ │ └── common.py # 通用Schema
│ ├── services/ # 业务逻辑
│ │ ├── admin_service.py # 管理员服务
│ │ ├── user_service.py # 用户服务
│ │ ├── farm_service.py # 养殖场服务
│ │ ├── order_service.py # 订单服务
│ │ └── statistics_service.py # 统计服务
│ ├── utils/ # 工具函数
│ │ ├── auth.py # 认证工具
│ │ ├── cache.py # 缓存工具
│ │ ├── email.py # 邮件工具
│ │ ├── file.py # 文件工具
│ │ └── validators.py # 验证工具
│ ├── tasks/ # 异步任务
│ │ ├── celery_app.py # Celery配置
│ │ ├── email_tasks.py # 邮件任务
│ │ ├── report_tasks.py # 报表任务
│ │ └── cleanup_tasks.py # 清理任务
│ └── main.py # 应用入口
├── migrations/ # 数据库迁移
├── tests/ # 测试文件
├── scripts/ # 脚本文件
├── docs/ # 文档
├── docker/ # Docker配置
├── requirements/ # 依赖文件
│ ├── base.txt # 基础依赖
│ ├── dev.txt # 开发依赖
│ └── prod.txt # 生产依赖
├── .env.example # 环境变量示例
├── docker-compose.yml # Docker Compose配置
├── Dockerfile # Docker镜像配置
└── README.md
2. 开发环境搭建
2.1 环境要求
- Python 3.9+
- MySQL 8.0+
- Redis 6.0+
- MongoDB 5.0+
- Docker & Docker Compose
- Git
2.2 安装步骤
# 1. 克隆项目
git clone <repository-url>
cd backend
# 2. 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows
# 3. 安装依赖
pip install -r requirements/dev.txt
# 4. 配置环境变量
cp .env.example .env
# 编辑 .env 文件,配置数据库连接等信息
# 5. 启动数据库服务(使用Docker)
docker-compose up -d mysql redis mongodb
# 6. 运行数据库迁移
alembic upgrade head
# 7. 启动开发服务器
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# 8. 启动Celery工作进程
celery -A app.tasks.celery_app worker --loglevel=info
# 9. 启动Celery Beat调度器
celery -A app.tasks.celery_app beat --loglevel=info
2.3 开发工具配置
- IDE: PyCharm / VSCode
- 代码规范: Black + isort + flake8
- 类型检查: mypy
- 测试框架: pytest
- API测试: Postman / Insomnia
3. 开发计划
3.1 第一阶段:基础框架搭建(1周)
3.1.1 项目初始化
任务: 搭建基础项目架构 负责人: 后端架构师 工期: 2天 详细任务:
- 创建FastAPI项目结构
- 配置数据库连接和ORM
- 配置Redis缓存和会话管理
- 配置MongoDB文档数据库
- 配置Celery异步任务队列
- 配置日志系统和监控
- 配置Docker容器化环境
3.1.2 认证授权系统
任务: 实现认证授权和权限控制 负责人: 后端工程师A 工期: 3天 详细任务:
- 实现JWT Token认证机制
- 实现管理员登录和注册
- 实现权限控制中间件
- 实现角色权限管理
- 实现Token刷新机制
- 实现登录日志记录
接口列表:
POST /api/v1/auth/login # 管理员登录
POST /api/v1/auth/logout # 管理员登出
POST /api/v1/auth/refresh # 刷新Token
GET /api/v1/auth/profile # 获取个人信息
PUT /api/v1/auth/profile # 更新个人信息
POST /api/v1/auth/change-password # 修改密码
3.1.3 基础中间件和工具
任务: 开发基础中间件和工具函数 负责人: 后端工程师B 工期: 2天 详细任务:
- 实现CORS跨域中间件
- 实现请求日志中间件
- 实现异常处理中间件
- 实现限流中间件
- 实现缓存工具函数
- 实现文件上传工具
3.2 第二阶段:核心业务模块开发(4-5周)
3.2.1 用户管理模块
任务: 实现用户管理功能 负责人: 后端工程师A 工期: 5天 详细任务:
- 用户数据模型设计
- 用户CRUD操作接口
- 用户搜索和筛选功能
- 用户状态管理(启用/禁用)
- 用户批量操作功能
- 用户数据导入导出
- 用户操作日志记录
接口列表:
GET /api/v1/admin/users # 获取用户列表
GET /api/v1/admin/users/{id} # 获取用户详情
POST /api/v1/admin/users # 创建用户
PUT /api/v1/admin/users/{id} # 更新用户
DELETE /api/v1/admin/users/{id} # 删除用户
POST /api/v1/admin/users/batch # 批量操作用户
PUT /api/v1/admin/users/{id}/status # 更新用户状态
GET /api/v1/admin/users/export # 导出用户数据
POST /api/v1/admin/users/import # 导入用户数据
3.2.2 角色权限管理模块
任务: 实现角色权限管理功能 负责人: 后端工程师B 工期: 4天 详细任务:
- 角色权限数据模型设计
- 角色CRUD操作接口
- 权限树形结构管理
- 用户角色分配功能
- 权限验证装饰器
- 权限变更日志记录
接口列表:
GET /api/v1/admin/roles # 获取角色列表
GET /api/v1/admin/roles/{id} # 获取角色详情
POST /api/v1/admin/roles # 创建角色
PUT /api/v1/admin/roles/{id} # 更新角色
DELETE /api/v1/admin/roles/{id} # 删除角色
GET /api/v1/admin/permissions # 获取权限列表
POST /api/v1/admin/roles/{id}/permissions # 分配权限
GET /api/v1/admin/users/{id}/roles # 获取用户角色
POST /api/v1/admin/users/{id}/roles # 分配用户角色
3.2.3 养殖场管理模块
任务: 实现养殖场管理功能 负责人: 后端工程师C 工期: 6天 详细任务:
- 养殖场数据模型设计
- 养殖场CRUD操作接口
- 养殖场审核功能
- 养殖场统计分析
- 养殖场地理位置管理
- 养殖场认证管理
- 养殖场数据导出
接口列表:
GET /api/v1/admin/farms # 获取养殖场列表
GET /api/v1/admin/farms/{id} # 获取养殖场详情
POST /api/v1/admin/farms # 创建养殖场
PUT /api/v1/admin/farms/{id} # 更新养殖场
DELETE /api/v1/admin/farms/{id} # 删除养殖场
PUT /api/v1/admin/farms/{id}/audit # 审核养殖场
GET /api/v1/admin/farms/statistics # 养殖场统计
GET /api/v1/admin/farms/export # 导出养殖场数据
3.2.4 动物管理模块
任务: 实现动物档案管理功能 负责人: 后端工程师D 工期: 6天 详细任务:
- 动物档案数据模型设计
- 动物CRUD操作接口
- 动物健康记录管理
- 动物生长记录管理
- 动物统计分析功能
- 动物批量操作功能
- 动物数据导出功能
接口列表:
GET /api/v1/admin/animals # 获取动物列表
GET /api/v1/admin/animals/{id} # 获取动物详情
POST /api/v1/admin/animals # 创建动物档案
PUT /api/v1/admin/animals/{id} # 更新动物档案
DELETE /api/v1/admin/animals/{id} # 删除动物档案
GET /api/v1/admin/animals/{id}/health # 获取健康记录
POST /api/v1/admin/animals/{id}/health # 添加健康记录
GET /api/v1/admin/animals/{id}/growth # 获取生长记录
POST /api/v1/admin/animals/{id}/growth # 添加生长记录
GET /api/v1/admin/animals/statistics # 动物统计
POST /api/v1/admin/animals/batch # 批量操作动物
3.2.5 订单管理模块
任务: 实现订单管理功能 负责人: 后端工程师E 工期: 7天 详细任务:
- 订单数据模型设计
- 订单CRUD操作接口
- 订单状态管理功能
- 订单审核功能
- 订单统计分析
- 退款处理功能
- 订单数据导出
接口列表:
GET /api/v1/admin/orders # 获取订单列表
GET /api/v1/admin/orders/{id} # 获取订单详情
PUT /api/v1/admin/orders/{id} # 更新订单
DELETE /api/v1/admin/orders/{id} # 删除订单
PUT /api/v1/admin/orders/{id}/status # 更新订单状态
PUT /api/v1/admin/orders/{id}/audit # 审核订单
POST /api/v1/admin/orders/{id}/refund # 处理退款
GET /api/v1/admin/orders/statistics # 订单统计
GET /api/v1/admin/orders/export # 导出订单数据
3.3 第三阶段:财务和支付管理(2-3周)
3.3.1 财务管理模块
任务: 实现财务管理功能 负责人: 后端工程师A 工期: 6天 详细任务:
- 财务数据模型设计
- 收入支出统计接口
- 资金流水管理
- 财务报表生成
- 对账功能实现
- 财务数据导出
- 财务异常监控
接口列表:
GET /api/v1/admin/finance/overview # 财务概览
GET /api/v1/admin/finance/income # 收入统计
GET /api/v1/admin/finance/expense # 支出统计
GET /api/v1/admin/finance/flow # 资金流水
GET /api/v1/admin/finance/reports # 财务报表
POST /api/v1/admin/finance/reconcile # 对账处理
GET /api/v1/admin/finance/export # 导出财务数据
3.3.2 支付管理模块
任务: 实现支付管理功能 负责人: 后端工程师B 工期: 5天 详细任务:
- 支付记录数据模型
- 支付记录查询接口
- 支付方式配置管理
- 支付异常处理
- 支付统计分析
- 支付对账功能
- 支付数据导出
接口列表:
GET /api/v1/admin/payments # 获取支付记录
GET /api/v1/admin/payments/{id} # 获取支付详情
GET /api/v1/admin/payments/config # 获取支付配置
PUT /api/v1/admin/payments/config # 更新支付配置
GET /api/v1/admin/payments/exceptions # 支付异常
PUT /api/v1/admin/payments/{id}/handle # 处理支付异常
GET /api/v1/admin/payments/statistics # 支付统计
POST /api/v1/admin/payments/reconcile # 支付对账
3.3.3 结算管理模块
任务: 实现结算管理功能 负责人: 后端工程师C 工期: 4天 详细任务:
- 结算数据模型设计
- 结算规则配置
- 自动结算功能
- 结算记录管理
- 结算统计分析
- 结算数据导出
接口列表:
GET /api/v1/admin/settlements # 获取结算记录
GET /api/v1/admin/settlements/{id} # 获取结算详情
POST /api/v1/admin/settlements # 创建结算
PUT /api/v1/admin/settlements/{id} # 更新结算
GET /api/v1/admin/settlements/config # 获取结算配置
PUT /api/v1/admin/settlements/config # 更新结算配置
POST /api/v1/admin/settlements/auto # 自动结算
GET /api/v1/admin/settlements/statistics # 结算统计
3.4 第四阶段:系统管理和配置(1-2周)
3.4.1 系统配置模块
任务: 实现系统配置管理 负责人: 后端工程师D 工期: 4天 详细任务:
- 系统参数配置管理
- 字典数据管理
- 系统公告管理
- 邮件模板配置
- 短信模板配置
- 系统备份恢复
接口列表:
GET /api/v1/admin/system/config # 获取系统配置
PUT /api/v1/admin/system/config # 更新系统配置
GET /api/v1/admin/system/dict # 获取字典数据
POST /api/v1/admin/system/dict # 创建字典数据
PUT /api/v1/admin/system/dict/{id} # 更新字典数据
DELETE /api/v1/admin/system/dict/{id} # 删除字典数据
GET /api/v1/admin/system/notices # 获取系统公告
POST /api/v1/admin/system/notices # 创建系统公告
PUT /api/v1/admin/system/notices/{id} # 更新系统公告
DELETE /api/v1/admin/system/notices/{id} # 删除系统公告
3.4.2 日志管理模块
任务: 实现系统日志管理 负责人: 后端工程师E 工期: 3天 详细任务:
- 操作日志记录和查询
- 登录日志记录和查询
- 系统错误日志管理
- 日志搜索和筛选
- 日志导出功能
- 日志清理任务
接口列表:
GET /api/v1/admin/logs/operation # 获取操作日志
GET /api/v1/admin/logs/login # 获取登录日志
GET /api/v1/admin/logs/error # 获取错误日志
GET /api/v1/admin/logs/search # 搜索日志
GET /api/v1/admin/logs/export # 导出日志
POST /api/v1/admin/logs/cleanup # 清理日志
3.4.3 监控管理模块
任务: 实现系统监控功能 负责人: 后端工程师F 工期: 3天 详细任务:
- 系统性能监控
- 服务状态监控
- 数据库监控
- 接口调用监控
- 告警规则配置
- 监控报表生成
接口列表:
GET /api/v1/admin/monitor/system # 系统监控数据
GET /api/v1/admin/monitor/service # 服务监控数据
GET /api/v1/admin/monitor/database # 数据库监控数据
GET /api/v1/admin/monitor/api # 接口监控数据
GET /api/v1/admin/monitor/alerts # 获取告警规则
POST /api/v1/admin/monitor/alerts # 创建告警规则
PUT /api/v1/admin/monitor/alerts/{id} # 更新告警规则
DELETE /api/v1/admin/monitor/alerts/{id} # 删除告警规则
3.5 第五阶段:数据统计和报表(1-2周)
3.5.1 数据统计模块
任务: 实现数据统计分析功能 负责人: 后端工程师A 工期: 5天 详细任务:
- 用户数据统计分析
- 业务数据统计分析
- 交易数据统计分析
- 趋势分析算法实现
- 自定义统计报表
- 数据对比分析功能
接口列表:
GET /api/v1/admin/statistics/user # 用户统计
GET /api/v1/admin/statistics/business # 业务统计
GET /api/v1/admin/statistics/trade # 交易统计
GET /api/v1/admin/statistics/trend # 趋势分析
POST /api/v1/admin/statistics/custom # 自定义统计
GET /api/v1/admin/statistics/compare # 数据对比
3.5.2 报表管理模块
任务: 实现报表生成和管理 负责人: 后端工程师B 工期: 4天 详细任务:
- 报表模板管理
- 报表生成引擎
- 定时报表任务
- 报表分享功能
- 报表权限控制
- 报表数据缓存
接口列表:
GET /api/v1/admin/reports/templates # 获取报表模板
POST /api/v1/admin/reports/templates # 创建报表模板
PUT /api/v1/admin/reports/templates/{id} # 更新报表模板
DELETE /api/v1/admin/reports/templates/{id} # 删除报表模板
POST /api/v1/admin/reports/generate # 生成报表
GET /api/v1/admin/reports/{id} # 获取报表
GET /api/v1/admin/reports/schedule # 获取定时报表
POST /api/v1/admin/reports/schedule # 创建定时报表
POST /api/v1/admin/reports/{id}/share # 分享报表
3.5.3 数据导出模块
任务: 实现数据导出功能 负责人: 后端工程师C 工期: 3天 详细任务:
- Excel数据导出功能
- CSV数据导出功能
- PDF报表导出功能
- 大数据量分批导出
- 导出任务队列管理
- 导出文件管理
接口列表:
POST /api/v1/admin/export/excel # Excel导出
POST /api/v1/admin/export/csv # CSV导出
POST /api/v1/admin/export/pdf # PDF导出
GET /api/v1/admin/export/tasks # 获取导出任务
GET /api/v1/admin/export/tasks/{id} # 获取导出任务状态
GET /api/v1/admin/export/files # 获取导出文件列表
GET /api/v1/admin/export/files/{id} # 下载导出文件
3.6 第六阶段:测试和优化(1周)
3.6.1 功能测试
任务: 全面功能测试 负责人: 测试工程师 工期: 3天 详细任务:
- 接口功能测试
- 权限控制测试
- 数据操作测试
- 异常处理测试
- 性能压力测试
3.6.2 性能优化
任务: 性能优化和体验提升 负责人: 后端架构师 工期: 2天 详细任务:
- 数据库查询优化
- 缓存策略优化
- 接口响应时间优化
- 内存使用优化
- 并发处理优化
4. 技术实现
4.1 项目配置
# app/core/config.py
from pydantic import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# 应用配置
APP_NAME: str = "畜牧养殖管理平台后端管理系统"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# 数据库配置
DATABASE_URL: str
REDIS_URL: str
MONGODB_URL: str
# 安全配置
SECRET_KEY: str
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# 文件存储配置
UPLOAD_PATH: str = "uploads"
MAX_FILE_SIZE: int = 10 * 1024 * 1024 # 10MB
# 邮件配置
SMTP_HOST: Optional[str] = None
SMTP_PORT: Optional[int] = None
SMTP_USERNAME: Optional[str] = None
SMTP_PASSWORD: Optional[str] = None
# Celery配置
CELERY_BROKER_URL: str
CELERY_RESULT_BACKEND: str
class Config:
env_file = ".env"
settings = Settings()
4.2 数据库模型
# app/models/admin.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
full_name = Column(String(100))
avatar = Column(String(255))
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
last_login = Column(DateTime)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, nullable=False)
description = Column(Text)
permissions = Column(Text) # JSON格式存储权限列表
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
class AdminRole(Base):
__tablename__ = "admin_roles"
id = Column(Integer, primary_key=True, index=True)
admin_id = Column(Integer, nullable=False)
role_id = Column(Integer, nullable=False)
created_at = Column(DateTime, server_default=func.now())
4.3 API路由实现
# app/api/v1/admin/users.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from app.api.deps import get_db, get_current_admin
from app.schemas.user import UserResponse, UserCreate, UserUpdate
from app.services.user_service import UserService
from app.utils.auth import require_permissions
router = APIRouter()
@router.get("/", response_model=List[UserResponse])
async def get_users(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
keyword: Optional[str] = Query(None),
status: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_admin = Depends(get_current_admin)
):
"""获取用户列表"""
require_permissions(current_admin, ["user:read"])
user_service = UserService(db)
users, total = await user_service.get_users(
skip=skip,
limit=limit,
keyword=keyword,
status=status
)
return {
"list": users,
"total": total,
"page": skip // limit + 1,
"size": limit
}
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: Session = Depends(get_db),
current_admin = Depends(get_current_admin)
):
"""获取用户详情"""
require_permissions(current_admin, ["user:read"])
user_service = UserService(db)
user = await user_service.get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
@router.post("/", response_model=UserResponse)
async def create_user(
user_data: UserCreate,
db: Session = Depends(get_db),
current_admin = Depends(get_current_admin)
):
"""创建用户"""
require_permissions(current_admin, ["user:create"])
user_service = UserService(db)
# 检查用户名和邮箱是否已存在
if await user_service.get_user_by_username(user_data.username):
raise HTTPException(status_code=400, detail="用户名已存在")
if await user_service.get_user_by_email(user_data.email):
raise HTTPException(status_code=400, detail="邮箱已存在")
user = await user_service.create_user(user_data)
# 记录操作日志
await log_operation(
admin_id=current_admin.id,
action="create_user",
resource_type="user",
resource_id=user.id,
details=f"创建用户: {user.username}"
)
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_data: UserUpdate,
db: Session = Depends(get_db),
current_admin = Depends(get_current_admin)
):
"""更新用户"""
require_permissions(current_admin, ["user:update"])
user_service = UserService(db)
user = await user_service.get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
updated_user = await user_service.update_user(user_id, user_data)
# 记录操作日志
await log_operation(
admin_id=current_admin.id,
action="update_user",
resource_type="user",
resource_id=user_id,
details=f"更新用户: {user.username}"
)
return updated_user
@router.delete("/{user_id}")
async def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_admin = Depends(get_current_admin)
):
"""删除用户"""
require_permissions(current_admin, ["user:delete"])
user_service = UserService(db)
user = await user_service.get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
await user_service.delete_user(user_id)
# 记录操作日志
await log_operation(
admin_id=current_admin.id,
action="delete_user",
resource_type="user",
resource_id=user_id,
details=f"删除用户: {user.username}"
)
return {"message": "用户删除成功"}
@router.post("/batch")
async def batch_operation_users(
operation: str,
user_ids: List[int],
db: Session = Depends(get_db),
current_admin = Depends(get_current_admin)
):
"""批量操作用户"""
require_permissions(current_admin, ["user:update", "user:delete"])
user_service = UserService(db)
if operation == "delete":
require_permissions(current_admin, ["user:delete"])
result = await user_service.batch_delete_users(user_ids)
elif operation == "activate":
result = await user_service.batch_update_status(user_ids, "active")
elif operation == "deactivate":
result = await user_service.batch_update_status(user_ids, "inactive")
else:
raise HTTPException(status_code=400, detail="不支持的操作类型")
# 记录操作日志
await log_operation(
admin_id=current_admin.id,
action=f"batch_{operation}_users",
resource_type="user",
resource_id=None,
details=f"批量{operation}用户: {len(user_ids)}个"
)
return result
4.4 业务服务层
# app/services/user_service.py
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from typing import List, Tuple, Optional
from datetime import datetime
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.utils.security import get_password_hash
from app.utils.cache import cache_manager
class UserService:
def __init__(self, db: Session):
self.db = db
async def get_users(
self,
skip: int = 0,
limit: int = 20,
keyword: Optional[str] = None,
status: Optional[str] = None
) -> Tuple[List[User], int]:
"""获取用户列表"""
query = self.db.query(User)
# 搜索条件
if keyword:
query = query.filter(
or_(
User.username.contains(keyword),
User.email.contains(keyword),
User.phone.contains(keyword)
)
)
# 状态筛选
if status:
query = query.filter(User.status == status)
# 获取总数
total = query.count()
# 分页查询
users = query.offset(skip).limit(limit).all()
return users, total
async def get_user_by_id(self, user_id: int) -> Optional[User]:
"""根据ID获取用户"""
# 先从缓存获取
cache_key = f"user:{user_id}"
cached_user = await cache_manager.get(cache_key)
if cached_user:
return cached_user
user = self.db.query(User).filter(User.id == user_id).first()
# 缓存用户信息
if user:
await cache_manager.set(cache_key, user, expire=300)
return user
async def get_user_by_username(self, username: str) -> Optional[User]:
"""根据用户名获取用户"""
return self.db.query(User).filter(User.username == username).first()
async def get_user_by_email(self, email: str) -> Optional[User]:
"""根据邮箱获取用户"""
return self.db.query(User).filter(User.email == email).first()
async def create_user(self, user_data: UserCreate) -> User:
"""创建用户"""
# 密码加密
hashed_password = get_password_hash(user_data.password)
user = User(
username=user_data.username,
email=user_data.email,
phone=user_data.phone,
hashed_password=hashed_password,
full_name=user_data.full_name,
status="active",
created_at=datetime.utcnow()
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
# 清除相关缓存
await self._clear_user_cache(user.id)
return user
async def update_user(self, user_id: int, user_data: UserUpdate) -> User:
"""更新用户"""
user = self.db.query(User).filter(User.id == user_id).first()
if not user:
return None
# 更新字段
for field, value in user_data.dict(exclude_unset=True).items():
if field == "password" and value:
setattr(user, "hashed_password", get_password_hash(value))
else:
setattr(user, field, value)
user.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(user)
# 清除缓存
await self._clear_user_cache(user_id)
return user
async def delete_user(self, user_id: int) -> bool:
"""删除用户"""
user = self.db.query(User).filter(User.id == user_id).first()
if not user:
return False
self.db.delete(user)
self.db.commit()
# 清除缓存
await self._clear_user_cache(user_id)
return True
async def batch_delete_users(self, user_ids: List[int]) -> dict:
"""批量删除用户"""
deleted_count = self.db.query(User).filter(User.id.in_(user_ids)).delete()
self.db.commit()
# 清除缓存
for user_id in user_ids:
await self._clear_user_cache(user_id)
return {"deleted_count": deleted_count}
async def batch_update_status(self, user_ids: List[int], status: str) -> dict:
"""批量更新用户状态"""
updated_count = self.db.query(User).filter(User.id.in_(user_ids)).update(
{"status": status, "updated_at": datetime.utcnow()}
)
self.db.commit()
# 清除缓存
for user_id in user_ids:
await self._clear_user_cache(user_id)
return {"updated_count": updated_count}
async def _clear_user_cache(self, user_id: int):
"""清除用户相关缓存"""
cache_keys = [
f"user:{user_id}",
"users:list:*", # 清除列表缓存
]
for key in cache_keys:
await cache_manager.delete(key)
4.5 权限控制实现
# app/utils/auth.py
from functools import wraps
from fastapi import HTTPException, status
from typing import List
def require_permissions(admin, required_permissions: List[str]):
"""权限验证装饰器"""
if admin.is_superuser:
return True
admin_permissions = get_admin_permissions(admin)
for permission in required_permissions:
if permission not in admin_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {permission}"
)
return True
def get_admin_permissions(admin) -> List[str]:
"""获取管理员权限列表"""
permissions = []
# 从数据库获取管理员角色和权限
# 这里简化处理,实际应该从数据库查询
if hasattr(admin, 'roles'):
for role in admin.roles:
if role.permissions:
permissions.extend(role.permissions.split(','))
return list(set(permissions))
def permission_required(permissions: List[str]):
"""权限验证装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 从kwargs中获取current_admin
current_admin = kwargs.get('current_admin')
if not current_admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未认证"
)
require_permissions(current_admin, permissions)
return await func(*args, **kwargs)
return wrapper
return decorator
4.6 异步任务实现
# app/tasks/report_tasks.py
from celery import Celery
from app.core.config import settings
from app.services.report_service import ReportService
from app.utils.email import send_email
import pandas as pd
from datetime import datetime
celery_app = Celery(
"admin_tasks",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND
)
@celery_app.task
def generate_user_report(admin_id: int, report_params: dict):
"""生成用户报表任务"""
try:
report_service = ReportService()
# 生成报表数据
data = report_service.generate_user_report(report_params)
# 生成Excel文件
filename = f"user_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
filepath = f"reports/{filename}"
df = pd.DataFrame(data)
df.to_excel(filepath, index=False)
# 发送邮件通知
admin = get_admin_by_id(admin_id)
if admin and admin.email:
send_email(
to_email=admin.email,
subject="用户报表生成完成",
content=f"您请求的用户报表已生成完成,请登录系统下载。\n文件名: {filename}",
attachments=[filepath]
)
return {
"status": "success",
"filename": filename,
"filepath": filepath
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
@celery_app.task
def cleanup_old_files():
"""清理旧文件任务"""
import os
from datetime import datetime, timedelta
# 清理30天前的报表文件
cutoff_date = datetime.now() - timedelta(days=30)
reports_dir = "reports"
if os.path.exists(reports_dir):
for filename in os.listdir(reports_dir):
filepath = os.path.join(reports_dir, filename)
if os.path.isfile(filepath):
file_time = datetime.fromtimestamp(os.path.getctime(filepath))
if file_time < cutoff_date:
os.remove(filepath)
return {"status": "success", "message": "旧文件清理完成"}
# 定时任务配置
celery_app.conf.beat_schedule = {
'cleanup-old-files': {
'task': 'app.tasks.report_tasks.cleanup_old_files',
'schedule': 86400.0, # 每天执行一次
},
}
5. 性能优化
5.1 数据库优化
- 索引优化: 为常用查询字段添加索引
- 查询优化: 使用合适的查询语句和连接方式
- 分页优化: 使用游标分页处理大数据量
- 连接池: 配置合适的数据库连接池
- 读写分离: 配置主从数据库读写分离
5.2 缓存策略
- Redis缓存: 缓存热点数据和查询结果
- 应用缓存: 使用内存缓存提升响应速度
- 缓存更新: 合理的缓存更新和失效策略
- 缓存预热: 系统启动时预热关键缓存
- 缓存监控: 监控缓存命中率和性能
5.3 接口优化
- 异步处理: 使用异步编程提升并发性能
- 批量操作: 提供批量操作接口减少请求次数
- 数据压缩: 对响应数据进行压缩
- 限流控制: 实现接口限流防止过载
- 响应优化: 优化响应数据结构和大小
6. 安全措施
6.1 认证安全
- JWT Token: 使用JWT进行身份认证
- Token刷新: 实现Token自动刷新机制
- 密码加密: 使用bcrypt加密存储密码
- 登录限制: 实现登录失败次数限制
- 会话管理: 安全的会话管理和超时控制
6.2 权限控制
- RBAC模型: 基于角色的访问控制
- 细粒度权限: 实现资源级权限控制
- 权限验证: 接口级权限验证
- 权限缓存: 权限信息缓存优化
- 权限审计: 权限变更审计日志
6.3 数据安全
- SQL注入防护: 使用ORM防止SQL注入
- XSS防护: 输入输出数据过滤
- CSRF防护: 实现CSRF Token验证
- 数据加密: 敏感数据加密存储
- 数据脱敏: 日志和导出数据脱敏
7. 测试策略
7.1 单元测试
# tests/test_user_service.py
import pytest
from sqlalchemy.orm import Session
from app.services.user_service import UserService
from app.schemas.user import UserCreate
@pytest.fixture
def user_service(db_session: Session):
return UserService(db_session)
@pytest.fixture
def sample_user_data():
return UserCreate(
username="testuser",
email="test@example.com",
phone="13800138000",
password="password123",
full_name="测试用户"
)
async def test_create_user(user_service: UserService, sample_user_data: UserCreate):
"""测试创建用户"""
user = await user_service.create_user(sample_user_data)
assert user.username == sample_user_data.username
assert user.email == sample_user_data.email
assert user.phone == sample_user_data.phone
assert user.full_name == sample_user_data.full_name
assert user.status == "active"
async def test_get_user_by_id(user_service: UserService, sample_user_data: UserCreate):
"""测试根据ID获取用户"""
# 先创建用户
created_user = await user_service.create_user(sample_user_data)
# 根据ID获取用户
user = await user_service.get_user_by_id(created_user.id)
assert user is not None
assert user.id == created_user.id
assert user.username == sample_user_data.username
async def test_get_users_with_pagination(user_service: UserService):
"""测试分页获取用户列表"""
# 创建多个测试用户
for i in range(25):
user_data = UserCreate(
username=f"user{i}",
email=f"user{i}@example.com",
phone=f"1380013800{i:02d}",
password="password123",
full_name=f"用户{i}"
)
await user_service.create_user(user_data)
# 测试分页
users, total = await user_service.get_users(skip=0, limit=10)
assert len(users) == 10
assert total >= 25
# 测试第二页
users_page2, _ = await user_service.get_users(skip=10, limit=10)
assert len(users_page2) == 10
assert users[0].id != users_page2[0].id
7.2 集成测试
# tests/test_api_users.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_get_users_unauthorized():
"""测试未授权访问用户列表"""
response = client.get("/api/v1/admin/users")
assert response.status_code == 401
def test_get_users_authorized(admin_token):
"""测试授权访问用户列表"""
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/v1/admin/users", headers=headers)
assert response.status_code == 200
data = response.json()
assert "list" in data
assert "total" in data
assert "page" in data
assert "size" in data
def test_create_user(admin_token):
"""测试创建用户"""
headers = {"Authorization": f"Bearer {admin_token}"}
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"phone": "13800138000",
"password": "password123",
"full_name": "新用户"
}
response = client.post("/api/v1/admin/users", json=user_data, headers=headers)
assert response.status_code == 200
data = response.json()
assert data["username"] == user_data["username"]
assert data["email"] == user_data["email"]
assert "id" in data
def test_create_user_duplicate_username(admin_token):
"""测试创建重复用户名的用户"""
headers = {"Authorization": f"Bearer {admin_token}"}
user_data = {
"username": "duplicateuser",
"email": "user1@example.com",
"phone": "13800138001",
"password": "password123",
"full_name": "用户1"
}
# 第一次创建
response1 = client.post("/api/v1/admin/users", json=user_data, headers=headers)
assert response1.status_code == 200
# 第二次创建相同用户名
user_data["email"] = "user2@example.com"
response2 = client.post("/api/v1/admin/users", json=user_data, headers=headers)
assert response2.status_code == 400
assert "用户名已存在" in response2.json()["detail"]
7.3 性能测试
# tests/test_performance.py
import pytest
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from app.services.user_service import UserService
async def test_concurrent_user_creation(user_service: UserService):
"""测试并发创建用户性能"""
async def create_user(index):
user_data = UserCreate(
username=f"perfuser{index}",
email=f"perfuser{index}@example.com",
phone=f"1380013{index:04d}",
password="password123",
full_name=f"性能测试用户{index}"
)
return await user_service.create_user(user_data)
# 并发创建100个用户
start_time = time.time()
tasks = [create_user(i) for i in range(100)]
results = await asyncio.gather(*tasks)
end_time = time.time()
# 验证结果
assert len(results) == 100
assert all(user.id is not None for user in results)
# 性能断言(100个用户创建应在10秒内完成)
assert end_time - start_time < 10.0
async def test_large_dataset_query_performance(user_service: UserService):
"""测试大数据集查询性能"""
# 创建大量测试数据
for i in range(1000):
user_data = UserCreate(
username=f"biguser{i}",
email=f"biguser{i}@example.com",
phone=f"1390013{i:04d}",
password="password123",
full_name=f"大数据测试用户{i}"
)
await user_service.create_user(user_data)
# 测试查询性能
start_time = time.time()
users, total = await user_service.get_users(skip=0, limit=50)
end_time = time.time()
# 验证结果
assert len(users) == 50
assert total >= 1000
# 性能断言(查询应在1秒内完成)
assert end_time - start_time < 1.0
8. 部署配置
8.1 Docker配置
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
default-libmysqlclient-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements/prod.txt requirements.txt
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
8.2 Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=mysql+pymysql://user:password@mysql:3306/xlxumu
- REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://mongodb:27017/xlxumu
depends_on:
- mysql
- redis
- mongodb
volumes:
- ./logs:/app/logs
restart: unless-stopped
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_DATABASE: xlxumu
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
volumes:
- mysql_data:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "3306:3306"
restart: unless-stopped
redis:
image: redis:6-alpine
volumes:
- redis_data:/data
ports:
- "6379:6379"
restart: unless-stopped
mongodb:
image: mongo:5.0
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_ROOT_USERNAME}
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_ROOT_PASSWORD}
volumes:
- mongodb_data:/data/db
ports:
- "27017:27017"
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app
restart: unless-stopped
volumes:
mysql_data:
redis_data:
mongodb_data:
8.3 Nginx配置
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream backend {
server app:8000;
}
server {
listen 80;
server_name api.xlxumu.com;
location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
location /health {
access_log off;
return 200 "healthy\n";
}
# 静态文件
location /static/ {
alias /app/static/;
expires 30d;
}
}
}
9. 监控运维
9.1 健康检查
# app/api/v1/health.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.redis import get_redis
import asyncio
router = APIRouter()
@router.get("/health")
async def health_check(db: Session = Depends(get_db)):
"""系统健康检查"""
try:
# 检查数据库连接
db.execute("SELECT 1")
# 检查Redis连接
redis = await get_redis()
await redis.ping()
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"services": {
"database": "connected",
"redis": "connected"
}
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
@router.get("/metrics")
async def get_metrics():
"""获取系统指标"""
return {
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"active_connections": len(asyncio.all_tasks())
}
9.2 日志配置
# app/core/logging.py
import logging
import sys
from pathlib import Path
from loguru import logger
class InterceptHandler(logging.Handler):
def emit(self, record):
# 获取对应的Loguru级别
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# 查找调用者
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
def setup_logging():
"""配置日志系统"""
# 移除默认处理器
logger.remove()
# 控制台输出
logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>",
level="INFO"
)
# 文件输出
log_path = Path("logs")
log_path.mkdir(exist_ok=True)
logger.add(
log_path / "app.log",
rotation="1 day",
retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO"
)
logger.add(
log_path / "error.log",
rotation="1 day",
retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="ERROR"
)
# 拦截标准库日志
logging.basicConfig(handlers=[InterceptHandler()], level=0)
9.3 性能监控
# app/middleware/metrics.py
import time
from fastapi import Request, Response
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
# 定义指标
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
async def metrics_middleware(request: Request, call_next):
"""性能监控中间件"""
start_time = time.time()
response = await call_next(request)
# 记录指标
duration = time.time() - start_time
endpoint = request.url.path
method = request.method
status = response.status_code
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
REQUEST_DURATION.labels(
method=method,
endpoint=endpoint
).observe(duration)
return response
@router.get("/metrics")
async def get_prometheus_metrics():
"""Prometheus指标端点"""
return Response(
generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
9.4 告警配置
# monitoring/alerting.yml
groups:
- name: backend-admin
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "后端管理系统错误率过高"
description: "错误率超过10%,持续2分钟"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "响应时间过长"
description: "95%请求响应时间超过2秒"
- alert: DatabaseConnectionFailed
expr: up{job="mysql"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "数据库连接失败"
description: "MySQL数据库连接中断"
- alert: RedisConnectionFailed
expr: up{job="redis"} == 0
for: 1m
labels:
severity: warning
annotations:
summary: "Redis连接失败"
description: "Redis缓存服务连接中断"
- alert: HighCPUUsage
expr: cpu_usage > 80
for: 5m
labels:
severity: warning
annotations:
summary: "CPU使用率过高"
description: "CPU使用率超过80%,持续5分钟"
- alert: HighMemoryUsage
expr: memory_usage > 85
for: 5m
labels:
severity: warning
annotations:
summary: "内存使用率过高"
description: "内存使用率超过85%,持续5分钟"
10. 总结
10.1 开发亮点
- 现代化架构:基于FastAPI的异步架构,支持高并发
- 完善的认证:JWT + RBAC权限控制系统
- 数据库优化:多数据库支持,读写分离,缓存策略
- API标准化:RESTful设计,自动生成文档
- 监控体系:完整的日志、指标、告警系统
- 容器化部署:Docker + Kubernetes支持
- 自动化测试:单元测试 + 集成测试覆盖
10.2 技术特色
- 异步编程:全面使用async/await提升性能
- 依赖注入:FastAPI原生依赖注入系统
- 数据验证:Pydantic模型自动验证
- ORM优化:SQLAlchemy异步支持
- 缓存策略:多层缓存架构
- 消息队列:Celery异步任务处理
- 搜索引擎:Elasticsearch全文搜索
10.3 扩展性设计
- 微服务架构:支持服务拆分和独立部署
- 水平扩展:支持负载均衡和集群部署
- 数据库分片:支持数据库水平分片
- 缓存分布式:Redis集群支持
- 消息队列:支持分布式任务处理
- API版本化:支持多版本API并存
- 插件系统:支持功能模块化扩展
10.4 后续优化
-
性能优化:
- 数据库查询优化
- 缓存命中率提升
- API响应时间优化
- 内存使用优化
-
功能扩展:
- 实时通知系统
- 数据分析平台
- AI智能推荐
- 移动端API
-
运维优化:
- 自动化部署流程
- 监控告警完善
- 日志分析系统
- 性能调优工具
-
安全加固:
- API安全扫描
- 数据加密升级
- 访问控制优化
- 安全审计系统
文档版本: v1.0.0
最后更新: 2024年12月
维护团队: 后端开发团队