- Unit tests: password hashing, notification providers, webhook field parsing - Integration tests: auth routes (register/login/confirm-email/logout), invite flow, Evotor webhooks (/user/create, /user/verify, /user/token), admin panel (access control, activate/suspend/delete/reset-password) - conftest: SQLite in-memory engine, transactional sessions, factory-boy factories (UserFactory with UserRoleEnum variants) - Fix bcrypt: replace passlib (broken on Python 3.14 + bcrypt 5.x) with direct bcrypt calls; drop passlib from requirements.txt - Fix datetime.utcnow() deprecation across routes and tests - Fix Jinja2 TemplateResponse signature (request as first positional arg) - Add coverage config to pyproject.toml Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
272 lines
10 KiB
Python
272 lines
10 KiB
Python
import secrets
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
from fastapi import APIRouter, Depends, Request
|
||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||
from sqlalchemy.orm import Session
|
||
|
||
from web.auth.password import hash_password
|
||
from web.auth.rbac import require_role
|
||
from web.auth.session import get_current_user
|
||
from web.config import settings
|
||
from web.database import get_db
|
||
from web.models.rbac import Permission, Role, UserRole, role_permissions
|
||
from web.models.user import User, UserRoleEnum, UserStatusEnum
|
||
from web.notifications.tasks import send_email_task
|
||
from web.templates_env import templates
|
||
|
||
router = APIRouter(prefix="/admin")
|
||
|
||
PAGE_SIZE = 25
|
||
|
||
|
||
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
|
||
ctx["request"] = request
|
||
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
|
||
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
|
||
|
||
|
||
def _admin_user(request: Request, db: Session) -> User:
|
||
"""Get current user and verify admin/system role."""
|
||
try:
|
||
user = get_current_user(request, db)
|
||
except Exception:
|
||
raise
|
||
if user.role not in (UserRoleEnum.admin, UserRoleEnum.system):
|
||
from fastapi import HTTPException
|
||
raise HTTPException(403, "Недостаточно прав")
|
||
return user
|
||
|
||
|
||
# ── User list ─────────────────────────────────────────────────────────────────
|
||
|
||
@router.get("/users")
|
||
async def admin_users(request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
admin = _admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
|
||
q = db.query(User)
|
||
search = request.query_params.get("search", "").strip()
|
||
status_filter = request.query_params.get("status", "")
|
||
role_filter = request.query_params.get("role", "")
|
||
page = max(1, int(request.query_params.get("page", 1)))
|
||
|
||
if search:
|
||
q = q.filter(
|
||
(User.first_name.ilike(f"%{search}%")) |
|
||
(User.last_name.ilike(f"%{search}%")) |
|
||
(User.email.ilike(f"%{search}%")) |
|
||
(User.phone.ilike(f"%{search}%"))
|
||
)
|
||
if status_filter:
|
||
try:
|
||
q = q.filter(User.status == UserStatusEnum(status_filter))
|
||
except ValueError:
|
||
pass
|
||
if role_filter:
|
||
try:
|
||
q = q.filter(User.role == UserRoleEnum(role_filter))
|
||
except ValueError:
|
||
pass
|
||
|
||
total = q.count()
|
||
users = q.order_by(User.created_at.desc()).offset((page - 1) * PAGE_SIZE).limit(PAGE_SIZE).all()
|
||
total_pages = max(1, (total + PAGE_SIZE - 1) // PAGE_SIZE)
|
||
|
||
return _render(request, "admin/users.html", {
|
||
"user": admin,
|
||
"users": users,
|
||
"search": search,
|
||
"status_filter": status_filter,
|
||
"role_filter": role_filter,
|
||
"page": page,
|
||
"total_pages": total_pages,
|
||
"total": total,
|
||
})
|
||
|
||
|
||
# ── User detail ───────────────────────────────────────────────────────────────
|
||
|
||
@router.get("/users/{user_id}")
|
||
async def admin_user_detail(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
admin = _admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
target = db.get(User, user_id)
|
||
if not target:
|
||
return RedirectResponse("/admin/users", 303)
|
||
return _render(request, "admin/user_detail.html", {"user": admin, "target": target})
|
||
|
||
|
||
# ── User actions ──────────────────────────────────────────────────────────────
|
||
|
||
@router.post("/users/{user_id}/activate")
|
||
async def admin_activate(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
_admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
user = db.get(User, user_id)
|
||
if user:
|
||
user.status = UserStatusEnum.active
|
||
db.commit()
|
||
return RedirectResponse(f"/admin/users/{user_id}", 303)
|
||
|
||
|
||
@router.post("/users/{user_id}/suspend")
|
||
async def admin_suspend(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
_admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
user = db.get(User, user_id)
|
||
if user:
|
||
user.status = UserStatusEnum.suspended
|
||
db.commit()
|
||
return RedirectResponse(f"/admin/users/{user_id}", 303)
|
||
|
||
|
||
@router.post("/users/{user_id}/reset-password")
|
||
async def admin_reset_password(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
_admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
user = db.get(User, user_id)
|
||
if user:
|
||
token = secrets.token_urlsafe(32)
|
||
user.password_reset_token = token
|
||
user.password_reset_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(
|
||
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
|
||
)
|
||
db.commit()
|
||
reset_url = f"{settings.BASE_URL}/reset-password?token={token}"
|
||
html = f'<p>Сброс пароля (запрошен администратором): <a href="{reset_url}">{reset_url}</a></p>'
|
||
send_email_task.delay(user.email, "Сброс пароля — ЭВОСИНК", html)
|
||
return RedirectResponse(f"/admin/users/{user_id}?success=reset_sent", 303)
|
||
|
||
|
||
@router.post("/users/{user_id}/send-invite")
|
||
async def admin_send_invite(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
_admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
user = db.get(User, user_id)
|
||
if user:
|
||
token = secrets.token_urlsafe(32)
|
||
user.invite_token = token
|
||
user.invite_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=settings.INVITE_EXPIRE_HOURS)
|
||
db.commit()
|
||
invite_url = f"{settings.BASE_URL}/invite?token={token}"
|
||
html = (
|
||
f"<p>Вам отправлено приглашение в ЭВОСИНК.</p>"
|
||
f'<p><a href="{invite_url}">{invite_url}</a></p>'
|
||
f"<p>Ссылка действительна {settings.INVITE_EXPIRE_HOURS} часов.</p>"
|
||
)
|
||
send_email_task.delay(user.email, "Приглашение в ЭВОСИНК", html)
|
||
return RedirectResponse(f"/admin/users/{user_id}?success=invite_sent", 303)
|
||
|
||
|
||
@router.post("/users/{user_id}/edit")
|
||
async def admin_edit_user(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
admin = _admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
user = db.get(User, user_id)
|
||
if not user:
|
||
return RedirectResponse("/admin/users", 303)
|
||
|
||
form = await request.form()
|
||
data = {k: str(v).strip() for k, v in form.items()}
|
||
errors = []
|
||
|
||
if not data.get("first_name"):
|
||
errors.append("Имя обязательно")
|
||
if not data.get("last_name"):
|
||
errors.append("Фамилия обязательна")
|
||
|
||
if errors:
|
||
return _render(request, "admin/user_detail.html", {
|
||
"user": admin, "target": user, "errors": errors,
|
||
})
|
||
|
||
user.first_name = data["first_name"]
|
||
user.last_name = data["last_name"]
|
||
if data.get("email"):
|
||
user.email = data["email"]
|
||
if data.get("phone"):
|
||
user.phone = data["phone"]
|
||
if data.get("role") and admin.role == UserRoleEnum.system:
|
||
try:
|
||
user.role = UserRoleEnum(data["role"])
|
||
except ValueError:
|
||
pass
|
||
db.commit()
|
||
return RedirectResponse(f"/admin/users/{user_id}?success=saved", 303)
|
||
|
||
|
||
@router.post("/users/{user_id}/delete")
|
||
async def admin_delete_user(user_id: int, request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
admin = _admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
if admin.role != UserRoleEnum.system:
|
||
return RedirectResponse(f"/admin/users/{user_id}", 303)
|
||
user = db.get(User, user_id)
|
||
if user:
|
||
db.delete(user)
|
||
db.commit()
|
||
return RedirectResponse("/admin/users", 303)
|
||
|
||
|
||
# ── Roles ─────────────────────────────────────────────────────────────────────
|
||
|
||
@router.get("/roles")
|
||
async def admin_roles(request: Request, db: Session = Depends(get_db)):
|
||
try:
|
||
admin = _admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
if admin.role != UserRoleEnum.system:
|
||
return RedirectResponse("/admin/users", 303)
|
||
roles = db.query(Role).order_by(Role.id).all()
|
||
permissions = db.query(Permission).order_by(Permission.name).all()
|
||
role_perm_ids: dict[int, set[int]] = {}
|
||
for role in roles:
|
||
rows = db.execute(
|
||
role_permissions.select().where(role_permissions.c.role_id == role.id)
|
||
).fetchall()
|
||
role_perm_ids[role.id] = {r.permission_id for r in rows}
|
||
return _render(request, "admin/roles.html", {
|
||
"user": admin, "roles": roles, "permissions": permissions,
|
||
"role_perm_ids": role_perm_ids,
|
||
})
|
||
|
||
|
||
@router.post("/roles/{role_id}/permissions")
|
||
async def admin_update_role_permissions(
|
||
role_id: int, request: Request, db: Session = Depends(get_db)
|
||
):
|
||
try:
|
||
admin = _admin_user(request, db)
|
||
except Exception:
|
||
return RedirectResponse("/login", 303)
|
||
if admin.role != UserRoleEnum.system:
|
||
return RedirectResponse("/admin/roles", 303)
|
||
|
||
form = await request.form()
|
||
selected_ids = {int(v) for k, v in form.items() if k.startswith("perm_")}
|
||
|
||
# Remove all existing, re-insert selected
|
||
db.execute(role_permissions.delete().where(role_permissions.c.role_id == role_id))
|
||
for perm_id in selected_ids:
|
||
db.execute(role_permissions.insert().values(role_id=role_id, permission_id=perm_id))
|
||
db.commit()
|
||
return RedirectResponse("/admin/roles", 303)
|