由于本次代码变更内容为空,无法生成有效的提交信息。请提供具体的代码变更内容以便生成合适的提交信息。登录、微信登录等认证功能- 添加管理员登录功能
- 实现个人资料更新和密码修改- 配置数据库连接和 Alembic 迁移 - 添加健康检查和系统统计接口 - 实现自定义错误处理和响应格式 - 配置 FastAPI 应用和中间件
This commit is contained in:
121
fastapi-backend/app/api/api.py
Normal file
121
fastapi-backend/app/api/api.py
Normal file
@@ -0,0 +1,121 @@
|
||||
from fastapi import APIRouter, FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from starlette.exceptions import HTTPException as StarletteHTTPException
|
||||
|
||||
from app.api.endpoints import auth, users
|
||||
from app.core.config import settings
|
||||
from app.utils.response import error_response
|
||||
|
||||
# 创建FastAPI应用
|
||||
app = FastAPI(
|
||||
title=settings.PROJECT_NAME,
|
||||
openapi_url=f"{settings.API_V1_STR}/openapi.json",
|
||||
docs_url="/api-docs",
|
||||
redoc_url="/redoc",
|
||||
)
|
||||
|
||||
# 配置CORS
|
||||
if settings.BACKEND_CORS_ORIGINS:
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
else:
|
||||
# 默认CORS配置
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# 创建API路由
|
||||
api_router = APIRouter()
|
||||
|
||||
# 添加各个端点路由
|
||||
api_router.include_router(auth.router, prefix="/auth", tags=["认证"])
|
||||
api_router.include_router(users.router, prefix="/users", tags=["用户"])
|
||||
|
||||
# 将API路由添加到应用
|
||||
app.include_router(api_router, prefix=settings.API_V1_STR)
|
||||
|
||||
|
||||
# 自定义异常处理
|
||||
@app.exception_handler(StarletteHTTPException)
|
||||
async def http_exception_handler(request, exc):
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content=error_response(
|
||||
message=str(exc.detail),
|
||||
code=exc.status_code
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(RequestValidationError)
|
||||
async def validation_exception_handler(request, exc):
|
||||
errors = []
|
||||
for error in exc.errors():
|
||||
error_msg = f"{error['loc'][-1]}: {error['msg']}"
|
||||
errors.append(error_msg)
|
||||
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content=error_response(
|
||||
message="请求参数验证失败",
|
||||
code=400,
|
||||
data={"errors": errors}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# 健康检查路由
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
import platform
|
||||
import psutil
|
||||
from datetime import datetime
|
||||
|
||||
return {
|
||||
"status": "OK",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"uptime": psutil.boot_time(),
|
||||
"environment": settings.DEBUG and "development" or "production",
|
||||
"no_db_mode": settings.NO_DB_MODE,
|
||||
"system_info": {
|
||||
"python_version": platform.python_version(),
|
||||
"platform": platform.platform(),
|
||||
"cpu_count": psutil.cpu_count(),
|
||||
"memory": {
|
||||
"total": psutil.virtual_memory().total,
|
||||
"available": psutil.virtual_memory().available,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# 系统统计路由
|
||||
@app.get("/system-stats")
|
||||
async def system_stats():
|
||||
import platform
|
||||
import psutil
|
||||
from datetime import datetime
|
||||
|
||||
return {
|
||||
"status": "OK",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"environment": settings.DEBUG and "development" or "production",
|
||||
"python_version": platform.python_version(),
|
||||
"memory_usage": dict(psutil.virtual_memory()._asdict()),
|
||||
"uptime": psutil.boot_time(),
|
||||
"cpu_count": psutil.cpu_count(),
|
||||
"platform": platform.platform(),
|
||||
"architecture": platform.architecture(),
|
||||
"no_db_mode": settings.NO_DB_MODE
|
||||
}
|
||||
115
fastapi-backend/app/api/deps.py
Normal file
115
fastapi-backend/app/api/deps.py
Normal file
@@ -0,0 +1,115 @@
|
||||
from typing import Generator, Optional
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jose import jwt, JWTError
|
||||
from pydantic import ValidationError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.security import verify_password
|
||||
from app.crud.user import user, admin
|
||||
from app.db.session import SessionLocal
|
||||
from app.models.user import User, Admin
|
||||
from app.schemas.user import TokenPayload
|
||||
|
||||
# OAuth2密码承载令牌
|
||||
oauth2_scheme = OAuth2PasswordBearer(
|
||||
tokenUrl=f"{settings.API_V1_STR}/auth/login"
|
||||
)
|
||||
|
||||
# 获取数据库会话
|
||||
def get_db() -> Generator:
|
||||
try:
|
||||
db = SessionLocal()
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
# 获取当前用户
|
||||
def get_current_user(
|
||||
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
|
||||
) -> User:
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
||||
)
|
||||
token_data = TokenPayload(**payload)
|
||||
|
||||
# 检查令牌是否过期
|
||||
if token_data.exp is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="令牌无效",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
except (JWTError, ValidationError):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="无法验证凭据",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
# 获取用户
|
||||
current_user = user.get(db, user_id=token_data.sub)
|
||||
if not current_user:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在")
|
||||
|
||||
# 检查用户是否活跃
|
||||
if not user.is_active(current_user):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="账户已被禁用")
|
||||
|
||||
return current_user
|
||||
|
||||
# 获取当前活跃用户
|
||||
def get_current_active_user(
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> User:
|
||||
if not user.is_active(current_user):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="账户已被禁用")
|
||||
return current_user
|
||||
|
||||
# 获取当前管理员
|
||||
def get_current_admin(
|
||||
db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
|
||||
) -> Admin:
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
||||
)
|
||||
token_data = TokenPayload(**payload)
|
||||
except (JWTError, ValidationError):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="无法验证凭据",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
# 获取管理员
|
||||
current_admin = admin.get(db, admin_id=token_data.sub)
|
||||
if not current_admin:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="管理员不存在")
|
||||
|
||||
# 检查管理员是否活跃
|
||||
if not admin.is_active(current_admin):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="账户已被禁用")
|
||||
|
||||
return current_admin
|
||||
|
||||
# 获取当前活跃管理员
|
||||
def get_current_active_admin(
|
||||
current_admin: Admin = Depends(get_current_admin),
|
||||
) -> Admin:
|
||||
if not admin.is_active(current_admin):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="账户已被禁用")
|
||||
return current_admin
|
||||
|
||||
# 获取超级管理员
|
||||
def get_current_super_admin(
|
||||
current_admin: Admin = Depends(get_current_admin),
|
||||
) -> Admin:
|
||||
if current_admin.role != "super_admin":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="需要超级管理员权限"
|
||||
)
|
||||
return current_admin
|
||||
332
fastapi-backend/app/api/endpoints/auth.py
Normal file
332
fastapi-backend/app/api/endpoints/auth.py
Normal file
@@ -0,0 +1,332 @@
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Body, Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.deps import get_db, get_current_user
|
||||
from app.core.config import settings
|
||||
from app.core.security import create_access_token, create_refresh_token
|
||||
from app.crud.user import user, admin
|
||||
from app.schemas.user import (
|
||||
UserCreate, UserResponse, UserLogin, UserWithToken,
|
||||
Token, PasswordChange, AdminWithToken
|
||||
)
|
||||
from app.utils.response import success_response, error_response
|
||||
from app.utils.errors import BadRequestError, UnauthorizedError, ForbiddenError, NotFoundError
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/register", response_model=UserWithToken, status_code=status.HTTP_201_CREATED)
|
||||
def register(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
user_in: UserCreate,
|
||||
) -> Any:
|
||||
"""
|
||||
用户注册
|
||||
"""
|
||||
# 检查用户名是否已存在
|
||||
if user.get_by_username(db, username=user_in.username):
|
||||
raise BadRequestError("用户名已存在")
|
||||
|
||||
# 检查邮箱是否已存在
|
||||
if user_in.email and user.get_by_email(db, email=user_in.email):
|
||||
raise BadRequestError("邮箱已存在")
|
||||
|
||||
# 检查手机号是否已存在
|
||||
if user_in.phone and user.get_by_phone(db, phone=user_in.phone):
|
||||
raise BadRequestError("手机号已存在")
|
||||
|
||||
# 创建新用户
|
||||
db_user = user.create(db, obj_in=user_in)
|
||||
|
||||
# 更新最后登录时间
|
||||
user.update_last_login(db, db_obj=db_user)
|
||||
|
||||
# 生成令牌
|
||||
access_token = create_access_token(db_user.id)
|
||||
refresh_token = create_refresh_token(db_user.id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"user": db_user,
|
||||
"token": access_token,
|
||||
"refresh_token": refresh_token
|
||||
},
|
||||
message="注册成功",
|
||||
code=201
|
||||
)
|
||||
|
||||
|
||||
@router.post("/login", response_model=UserWithToken)
|
||||
def login(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
form_data: OAuth2PasswordRequestForm = Depends()
|
||||
) -> Any:
|
||||
"""
|
||||
用户登录
|
||||
"""
|
||||
# 验证用户
|
||||
db_user = user.authenticate(db, username=form_data.username, password=form_data.password)
|
||||
if not db_user:
|
||||
raise UnauthorizedError("用户名或密码错误")
|
||||
|
||||
# 检查用户状态
|
||||
if not user.is_active(db_user):
|
||||
raise ForbiddenError("账户已被禁用")
|
||||
|
||||
# 更新最后登录时间
|
||||
user.update_last_login(db, db_obj=db_user)
|
||||
|
||||
# 生成令牌
|
||||
access_token = create_access_token(db_user.id)
|
||||
refresh_token = create_refresh_token(db_user.id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"user": db_user,
|
||||
"token": access_token,
|
||||
"refresh_token": refresh_token
|
||||
},
|
||||
message="登录成功"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/login/json", response_model=UserWithToken)
|
||||
def login_json(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
login_in: UserLogin
|
||||
) -> Any:
|
||||
"""
|
||||
用户登录(JSON格式)
|
||||
"""
|
||||
# 验证用户
|
||||
db_user = user.authenticate(db, username=login_in.username, password=login_in.password)
|
||||
if not db_user:
|
||||
raise UnauthorizedError("用户名或密码错误")
|
||||
|
||||
# 检查用户状态
|
||||
if not user.is_active(db_user):
|
||||
raise ForbiddenError("账户已被禁用")
|
||||
|
||||
# 更新最后登录时间
|
||||
user.update_last_login(db, db_obj=db_user)
|
||||
|
||||
# 生成令牌
|
||||
access_token = create_access_token(db_user.id)
|
||||
refresh_token = create_refresh_token(db_user.id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"user": db_user,
|
||||
"token": access_token,
|
||||
"refresh_token": refresh_token
|
||||
},
|
||||
message="登录成功"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/refresh-token", response_model=Token)
|
||||
def refresh_token(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
refresh_token: str = Body(..., embed=True)
|
||||
) -> Any:
|
||||
"""
|
||||
刷新访问令牌
|
||||
"""
|
||||
try:
|
||||
from jose import jwt
|
||||
from pydantic import ValidationError
|
||||
from app.schemas.user import TokenPayload
|
||||
|
||||
payload = jwt.decode(
|
||||
refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
||||
)
|
||||
token_data = TokenPayload(**payload)
|
||||
|
||||
# 检查令牌类型
|
||||
if token_data.type != "refresh":
|
||||
raise UnauthorizedError("无效的刷新令牌")
|
||||
|
||||
# 检查用户是否存在
|
||||
db_user = user.get(db, user_id=token_data.sub)
|
||||
if not db_user:
|
||||
raise NotFoundError("用户不存在")
|
||||
|
||||
# 检查用户状态
|
||||
if not user.is_active(db_user):
|
||||
raise ForbiddenError("账户已被禁用")
|
||||
|
||||
# 生成新令牌
|
||||
access_token = create_access_token(db_user.id)
|
||||
new_refresh_token = create_refresh_token(db_user.id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"access_token": access_token,
|
||||
"refresh_token": new_refresh_token,
|
||||
"token_type": "bearer"
|
||||
}
|
||||
)
|
||||
except (jwt.JWTError, ValidationError):
|
||||
raise UnauthorizedError("无效的刷新令牌")
|
||||
|
||||
|
||||
@router.get("/me", response_model=UserResponse)
|
||||
def get_current_user_info(
|
||||
current_user: UserResponse = Depends(get_current_user)
|
||||
) -> Any:
|
||||
"""
|
||||
获取当前用户信息
|
||||
"""
|
||||
return success_response(data=current_user)
|
||||
|
||||
|
||||
@router.put("/profile", response_model=UserResponse)
|
||||
def update_profile(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: UserResponse = Depends(get_current_user),
|
||||
profile_in: dict = Body(...)
|
||||
) -> Any:
|
||||
"""
|
||||
更新用户个人信息
|
||||
"""
|
||||
# 更新用户信息
|
||||
db_user = user.update(db, db_obj=current_user, obj_in=profile_in)
|
||||
|
||||
return success_response(
|
||||
data=db_user,
|
||||
message="个人信息更新成功"
|
||||
)
|
||||
|
||||
|
||||
@router.put("/password", response_model=dict)
|
||||
def change_password(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: UserResponse = Depends(get_current_user),
|
||||
password_in: PasswordChange
|
||||
) -> Any:
|
||||
"""
|
||||
修改密码
|
||||
"""
|
||||
from app.core.security import verify_password
|
||||
|
||||
# 验证当前密码
|
||||
if not verify_password(password_in.current_password, current_user.password_hash):
|
||||
raise UnauthorizedError("当前密码错误")
|
||||
|
||||
# 更新密码
|
||||
user.update_password(db, db_obj=current_user, new_password=password_in.new_password)
|
||||
|
||||
return success_response(message="密码修改成功")
|
||||
|
||||
|
||||
@router.post("/admin/login", response_model=AdminWithToken)
|
||||
def admin_login(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
login_in: UserLogin
|
||||
) -> Any:
|
||||
"""
|
||||
管理员登录
|
||||
"""
|
||||
# 验证管理员
|
||||
db_admin = admin.authenticate(db, username=login_in.username, password=login_in.password)
|
||||
if not db_admin:
|
||||
raise UnauthorizedError("用户名或密码错误")
|
||||
|
||||
# 检查管理员状态
|
||||
if not admin.is_active(db_admin):
|
||||
raise ForbiddenError("账户已被禁用")
|
||||
|
||||
# 更新最后登录时间
|
||||
admin.update_last_login(db, db_obj=db_admin)
|
||||
|
||||
# 生成令牌
|
||||
access_token = create_access_token(db_admin.id)
|
||||
refresh_token = create_refresh_token(db_admin.id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"admin": db_admin,
|
||||
"token": access_token,
|
||||
"refresh_token": refresh_token
|
||||
},
|
||||
message="管理员登录成功"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/wechat", response_model=UserWithToken)
|
||||
def wechat_login(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
code: str = Body(...),
|
||||
user_info: dict = Body(None)
|
||||
) -> Any:
|
||||
"""
|
||||
微信登录/注册
|
||||
"""
|
||||
# 模拟获取微信用户信息
|
||||
wechat_user_info = {
|
||||
"openid": f"mock_openid_{code}",
|
||||
"unionid": f"mock_unionid_{code}",
|
||||
"nickname": user_info.get("nickName") if user_info else "微信用户",
|
||||
"avatar": user_info.get("avatarUrl") if user_info else "",
|
||||
"gender": "male" if user_info and user_info.get("gender") == 1 else
|
||||
"female" if user_info and user_info.get("gender") == 2 else "other"
|
||||
}
|
||||
|
||||
# 查找是否已存在微信用户
|
||||
db_user = user.get_by_wechat_openid(db, openid=wechat_user_info["openid"])
|
||||
|
||||
if db_user:
|
||||
# 更新最后登录时间
|
||||
user.update_last_login(db, db_obj=db_user)
|
||||
else:
|
||||
# 创建新用户(微信注册)
|
||||
import secrets
|
||||
import string
|
||||
from app.schemas.user import UserCreate
|
||||
|
||||
# 生成随机密码
|
||||
alphabet = string.ascii_letters + string.digits
|
||||
random_password = ''.join(secrets.choice(alphabet) for _ in range(12))
|
||||
|
||||
# 创建用户
|
||||
user_in = UserCreate(
|
||||
username=f"wx_{wechat_user_info['openid'][-8:]}",
|
||||
password=random_password,
|
||||
nickname=wechat_user_info["nickname"],
|
||||
user_type="farmer"
|
||||
)
|
||||
|
||||
db_user = user.create(db, obj_in=user_in)
|
||||
|
||||
# 更新微信信息
|
||||
user.update(db, db_obj=db_user, obj_in={
|
||||
"wechat_openid": wechat_user_info["openid"],
|
||||
"wechat_unionid": wechat_user_info["unionid"],
|
||||
"avatar_url": wechat_user_info["avatar"],
|
||||
"gender": wechat_user_info["gender"]
|
||||
})
|
||||
|
||||
# 生成令牌
|
||||
access_token = create_access_token(db_user.id)
|
||||
refresh_token = create_refresh_token(db_user.id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"user": db_user,
|
||||
"token": access_token,
|
||||
"refresh_token": refresh_token
|
||||
},
|
||||
message="微信登录成功"
|
||||
)
|
||||
174
fastapi-backend/app/api/endpoints/users.py
Normal file
174
fastapi-backend/app/api/endpoints/users.py
Normal file
@@ -0,0 +1,174 @@
|
||||
from typing import Any, List
|
||||
|
||||
from fastapi import APIRouter, Body, Depends, Query, Path, status
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.deps import get_db, get_current_user, get_current_admin, get_current_super_admin
|
||||
from app.crud.user import user
|
||||
from app.models.user import User
|
||||
from app.schemas.user import (
|
||||
UserResponse, UserUpdate, UserListResponse,
|
||||
PaginationResponse, UserStatistics, BatchUserStatusUpdate
|
||||
)
|
||||
from app.utils.response import success_response
|
||||
from app.utils.errors import NotFoundError, BadRequestError
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/profile", response_model=UserResponse)
|
||||
def get_user_profile(
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> Any:
|
||||
"""
|
||||
获取当前用户个人信息
|
||||
"""
|
||||
return success_response(data=current_user)
|
||||
|
||||
|
||||
@router.put("/profile", response_model=UserResponse)
|
||||
def update_user_profile(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
user_in: UserUpdate,
|
||||
) -> Any:
|
||||
"""
|
||||
更新当前用户个人信息
|
||||
"""
|
||||
# 更新用户信息
|
||||
db_user = user.update(db, db_obj=current_user, obj_in=user_in)
|
||||
|
||||
return success_response(
|
||||
data=db_user,
|
||||
message="个人信息更新成功"
|
||||
)
|
||||
|
||||
|
||||
@router.get("", response_model=UserListResponse)
|
||||
def get_users(
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_admin),
|
||||
page: int = Query(1, ge=1, description="页码"),
|
||||
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
|
||||
user_type: str = Query(None, description="用户类型"),
|
||||
status: str = Query(None, description="用户状态"),
|
||||
keyword: str = Query(None, description="搜索关键词"),
|
||||
) -> Any:
|
||||
"""
|
||||
获取用户列表(管理员)
|
||||
"""
|
||||
# 计算分页参数
|
||||
skip = (page - 1) * page_size
|
||||
|
||||
# 获取用户列表
|
||||
users_list = user.get_multi(
|
||||
db, skip=skip, limit=page_size,
|
||||
user_type=user_type, status=status, keyword=keyword
|
||||
)
|
||||
|
||||
# 获取用户总数
|
||||
total = user.count(
|
||||
db, user_type=user_type, status=status, keyword=keyword
|
||||
)
|
||||
|
||||
# 计算总页数
|
||||
total_pages = (total + page_size - 1) // page_size
|
||||
|
||||
# 构建分页信息
|
||||
pagination = PaginationResponse(
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
total=total,
|
||||
total_pages=total_pages
|
||||
)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"users": users_list,
|
||||
"pagination": pagination
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{user_id}", response_model=UserResponse)
|
||||
def get_user_by_id(
|
||||
user_id: int = Path(..., ge=1),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_admin),
|
||||
) -> Any:
|
||||
"""
|
||||
获取用户详情(管理员)
|
||||
"""
|
||||
# 获取用户
|
||||
db_user = user.get(db, user_id=user_id)
|
||||
if not db_user:
|
||||
raise NotFoundError("用户不存在")
|
||||
|
||||
return success_response(data=db_user)
|
||||
|
||||
|
||||
@router.get("/statistics", response_model=UserStatistics)
|
||||
def get_user_statistics(
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_admin),
|
||||
) -> Any:
|
||||
"""
|
||||
获取用户统计信息(管理员)
|
||||
"""
|
||||
# 获取统计信息
|
||||
statistics = user.get_statistics(db)
|
||||
|
||||
return success_response(data=statistics)
|
||||
|
||||
|
||||
@router.post("/batch-status", response_model=dict)
|
||||
def batch_update_user_status(
|
||||
*,
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_admin),
|
||||
batch_in: BatchUserStatusUpdate,
|
||||
) -> Any:
|
||||
"""
|
||||
批量操作用户状态(管理员)
|
||||
"""
|
||||
# 检查用户ID列表是否为空
|
||||
if not batch_in.user_ids:
|
||||
raise BadRequestError("用户ID列表不能为空")
|
||||
|
||||
# 批量更新用户状态
|
||||
affected_rows = user.batch_update_status(
|
||||
db, user_ids=batch_in.user_ids, status=batch_in.status
|
||||
)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"message": f"成功更新{affected_rows}个用户的状态",
|
||||
"affected_rows": affected_rows
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/{user_id}", response_model=dict)
|
||||
def delete_user(
|
||||
user_id: int = Path(..., ge=1),
|
||||
db: Session = Depends(get_db),
|
||||
current_admin: User = Depends(get_current_super_admin),
|
||||
) -> Any:
|
||||
"""
|
||||
删除用户(超级管理员)
|
||||
"""
|
||||
# 获取用户
|
||||
db_user = user.get(db, user_id=user_id)
|
||||
if not db_user:
|
||||
raise NotFoundError("用户不存在")
|
||||
|
||||
# 删除用户
|
||||
user.remove(db, user_id=user_id)
|
||||
|
||||
return success_response(
|
||||
data={
|
||||
"message": "用户删除成功",
|
||||
"user_id": user_id
|
||||
}
|
||||
)
|
||||
Reference in New Issue
Block a user