Files
evo-sync/web/routes/admin.py
mguschin fc65e591b3 test: add test suite with 65 tests, 73% coverage
- Unit tests: password hashing, notification providers, webhook field parsing
- Integration tests: auth routes (register/login/confirm-email/logout),
  invite flow, Evotor webhooks (/user/create, /user/verify, /user/token),
  admin panel (access control, activate/suspend/delete/reset-password)
- conftest: SQLite in-memory engine, transactional sessions, factory-boy
  factories (UserFactory with UserRoleEnum variants)
- Fix bcrypt: replace passlib (broken on Python 3.14 + bcrypt 5.x) with
  direct bcrypt calls; drop passlib from requirements.txt
- Fix datetime.utcnow() deprecation across routes and tests
- Fix Jinja2 TemplateResponse signature (request as first positional arg)
- Add coverage config to pyproject.toml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 12:27:42 +03:00

272 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import secrets
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session
from web.auth.password import hash_password
from web.auth.rbac import require_role
from web.auth.session import get_current_user
from web.config import settings
from web.database import get_db
from web.models.rbac import Permission, Role, UserRole, role_permissions
from web.models.user import User, UserRoleEnum, UserStatusEnum
from web.notifications.tasks import send_email_task
from web.templates_env import templates
router = APIRouter(prefix="/admin")
PAGE_SIZE = 25
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
def _admin_user(request: Request, db: Session) -> User:
"""Get current user and verify admin/system role."""
try:
user = get_current_user(request, db)
except Exception:
raise
if user.role not in (UserRoleEnum.admin, UserRoleEnum.system):
from fastapi import HTTPException
raise HTTPException(403, "Недостаточно прав")
return user
# ── User list ─────────────────────────────────────────────────────────────────
@router.get("/users")
async def admin_users(request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
q = db.query(User)
search = request.query_params.get("search", "").strip()
status_filter = request.query_params.get("status", "")
role_filter = request.query_params.get("role", "")
page = max(1, int(request.query_params.get("page", 1)))
if search:
q = q.filter(
(User.first_name.ilike(f"%{search}%")) |
(User.last_name.ilike(f"%{search}%")) |
(User.email.ilike(f"%{search}%")) |
(User.phone.ilike(f"%{search}%"))
)
if status_filter:
try:
q = q.filter(User.status == UserStatusEnum(status_filter))
except ValueError:
pass
if role_filter:
try:
q = q.filter(User.role == UserRoleEnum(role_filter))
except ValueError:
pass
total = q.count()
users = q.order_by(User.created_at.desc()).offset((page - 1) * PAGE_SIZE).limit(PAGE_SIZE).all()
total_pages = max(1, (total + PAGE_SIZE - 1) // PAGE_SIZE)
return _render(request, "admin/users.html", {
"user": admin,
"users": users,
"search": search,
"status_filter": status_filter,
"role_filter": role_filter,
"page": page,
"total_pages": total_pages,
"total": total,
})
# ── User detail ───────────────────────────────────────────────────────────────
@router.get("/users/{user_id}")
async def admin_user_detail(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
target = db.get(User, user_id)
if not target:
return RedirectResponse("/admin/users", 303)
return _render(request, "admin/user_detail.html", {"user": admin, "target": target})
# ── User actions ──────────────────────────────────────────────────────────────
@router.post("/users/{user_id}/activate")
async def admin_activate(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
user.status = UserStatusEnum.active
db.commit()
return RedirectResponse(f"/admin/users/{user_id}", 303)
@router.post("/users/{user_id}/suspend")
async def admin_suspend(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
user.status = UserStatusEnum.suspended
db.commit()
return RedirectResponse(f"/admin/users/{user_id}", 303)
@router.post("/users/{user_id}/reset-password")
async def admin_reset_password(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
token = secrets.token_urlsafe(32)
user.password_reset_token = token
user.password_reset_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
)
db.commit()
reset_url = f"{settings.BASE_URL}/reset-password?token={token}"
html = f'<p>Сброс пароля (запрошен администратором): <a href="{reset_url}">{reset_url}</a></p>'
send_email_task.delay(user.email, "Сброс пароля — ЭВОСИНК", html)
return RedirectResponse(f"/admin/users/{user_id}?success=reset_sent", 303)
@router.post("/users/{user_id}/send-invite")
async def admin_send_invite(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
token = secrets.token_urlsafe(32)
user.invite_token = token
user.invite_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=settings.INVITE_EXPIRE_HOURS)
db.commit()
invite_url = f"{settings.BASE_URL}/invite?token={token}"
html = (
f"<p>Вам отправлено приглашение в ЭВОСИНК.</p>"
f'<p><a href="{invite_url}">{invite_url}</a></p>'
f"<p>Ссылка действительна {settings.INVITE_EXPIRE_HOURS} часов.</p>"
)
send_email_task.delay(user.email, "Приглашение в ЭВОСИНК", html)
return RedirectResponse(f"/admin/users/{user_id}?success=invite_sent", 303)
@router.post("/users/{user_id}/edit")
async def admin_edit_user(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if not user:
return RedirectResponse("/admin/users", 303)
form = await request.form()
data = {k: str(v).strip() for k, v in form.items()}
errors = []
if not data.get("first_name"):
errors.append("Имя обязательно")
if not data.get("last_name"):
errors.append("Фамилия обязательна")
if errors:
return _render(request, "admin/user_detail.html", {
"user": admin, "target": user, "errors": errors,
})
user.first_name = data["first_name"]
user.last_name = data["last_name"]
if data.get("email"):
user.email = data["email"]
if data.get("phone"):
user.phone = data["phone"]
if data.get("role") and admin.role == UserRoleEnum.system:
try:
user.role = UserRoleEnum(data["role"])
except ValueError:
pass
db.commit()
return RedirectResponse(f"/admin/users/{user_id}?success=saved", 303)
@router.post("/users/{user_id}/delete")
async def admin_delete_user(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
if admin.role != UserRoleEnum.system:
return RedirectResponse(f"/admin/users/{user_id}", 303)
user = db.get(User, user_id)
if user:
db.delete(user)
db.commit()
return RedirectResponse("/admin/users", 303)
# ── Roles ─────────────────────────────────────────────────────────────────────
@router.get("/roles")
async def admin_roles(request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
if admin.role != UserRoleEnum.system:
return RedirectResponse("/admin/users", 303)
roles = db.query(Role).order_by(Role.id).all()
permissions = db.query(Permission).order_by(Permission.name).all()
role_perm_ids: dict[int, set[int]] = {}
for role in roles:
rows = db.execute(
role_permissions.select().where(role_permissions.c.role_id == role.id)
).fetchall()
role_perm_ids[role.id] = {r.permission_id for r in rows}
return _render(request, "admin/roles.html", {
"user": admin, "roles": roles, "permissions": permissions,
"role_perm_ids": role_perm_ids,
})
@router.post("/roles/{role_id}/permissions")
async def admin_update_role_permissions(
role_id: int, request: Request, db: Session = Depends(get_db)
):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
if admin.role != UserRoleEnum.system:
return RedirectResponse("/admin/roles", 303)
form = await request.form()
selected_ids = {int(v) for k, v in form.items() if k.startswith("perm_")}
# Remove all existing, re-insert selected
db.execute(role_permissions.delete().where(role_permissions.c.role_id == role_id))
for perm_id in selected_ids:
db.execute(role_permissions.insert().values(role_id=role_id, permission_id=perm_id))
db.commit()
return RedirectResponse("/admin/roles", 303)