Files
evo-sync/web/routes/admin.py
mguschin 5e7be16755 feat: remove register, add evo webhooks, admin view-as user
- Remove /register route and nav links (users created via Evotor webhook)
- Fix evotor_webhooks.py: use phone=None instead of phone="" to avoid unique constraint
- Add admin "view as user" feature: POST /admin/users/{id}/view-as sets viewed_user_id
  in session; POST /admin/view-as/stop clears it
- catalog, vk_catalog, sync, connections GET routes use get_viewed_user() so admins
  see another user's data while browsing
- Orange banner shown at top when admin is viewing as another user

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 20:44:25 +03:00

296 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import secrets
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session
from web.auth.password import hash_password
from web.auth.rbac import require_role
from web.auth.session import get_current_user
from web.config import settings
from web.database import get_db
from web.models.rbac import Permission, Role, UserRole, role_permissions
from web.models.user import User, UserRoleEnum, UserStatusEnum
from web.notifications.tasks import send_email_task
from web.templates_env import templates
router = APIRouter(prefix="/admin")
PAGE_SIZE = 25
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
def _admin_user(request: Request, db: Session) -> User:
"""Get current user and verify admin/system role."""
try:
user = get_current_user(request, db)
except Exception:
raise
if user.role not in (UserRoleEnum.admin, UserRoleEnum.system):
from fastapi import HTTPException
raise HTTPException(403, "Недостаточно прав")
return user
# ── User list ─────────────────────────────────────────────────────────────────
@router.get("/users")
async def admin_users(request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
q = db.query(User)
search = request.query_params.get("search", "").strip()
status_filter = request.query_params.get("status", "")
role_filter = request.query_params.get("role", "")
page = max(1, int(request.query_params.get("page", 1)))
if search:
q = q.filter(
(User.first_name.ilike(f"%{search}%")) |
(User.last_name.ilike(f"%{search}%")) |
(User.email.ilike(f"%{search}%")) |
(User.phone.ilike(f"%{search}%"))
)
if status_filter:
try:
q = q.filter(User.status == UserStatusEnum(status_filter))
except ValueError:
pass
if role_filter:
try:
q = q.filter(User.role == UserRoleEnum(role_filter))
except ValueError:
pass
total = q.count()
users = q.order_by(User.created_at.desc()).offset((page - 1) * PAGE_SIZE).limit(PAGE_SIZE).all()
total_pages = max(1, (total + PAGE_SIZE - 1) // PAGE_SIZE)
return _render(request, "admin/users.html", {
"user": admin,
"users": users,
"search": search,
"status_filter": status_filter,
"role_filter": role_filter,
"page": page,
"total_pages": total_pages,
"total": total,
})
# ── User detail ───────────────────────────────────────────────────────────────
@router.get("/users/{user_id}")
async def admin_user_detail(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
target = db.get(User, user_id)
if not target:
return RedirectResponse("/admin/users", 303)
return _render(request, "admin/user_detail.html", {"user": admin, "target": target})
# ── View-as ───────────────────────────────────────────────────────────────────
@router.post("/users/{user_id}/view-as")
async def admin_view_as(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
target = db.get(User, user_id)
if target:
request.session["viewed_user_id"] = user_id
return RedirectResponse("/connections", 303)
@router.post("/view-as/stop")
async def admin_view_as_stop(request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
request.session.pop("viewed_user_id", None)
return RedirectResponse("/admin/users", 303)
# ── User actions ──────────────────────────────────────────────────────────────
@router.post("/users/{user_id}/activate")
async def admin_activate(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
user.status = UserStatusEnum.active
db.commit()
return RedirectResponse(f"/admin/users/{user_id}", 303)
@router.post("/users/{user_id}/suspend")
async def admin_suspend(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
user.status = UserStatusEnum.suspended
db.commit()
return RedirectResponse(f"/admin/users/{user_id}", 303)
@router.post("/users/{user_id}/reset-password")
async def admin_reset_password(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
token = secrets.token_urlsafe(32)
user.password_reset_token = token
user.password_reset_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
)
db.commit()
reset_url = f"{settings.BASE_URL}/reset-password?token={token}"
html = f'<p>Сброс пароля (запрошен администратором): <a href="{reset_url}">{reset_url}</a></p>'
send_email_task.delay(user.email, "Сброс пароля — ЭВОСИНК", html)
return RedirectResponse(f"/admin/users/{user_id}?success=reset_sent", 303)
@router.post("/users/{user_id}/send-invite")
async def admin_send_invite(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
_admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if user:
token = secrets.token_urlsafe(32)
user.invite_token = token
user.invite_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=settings.INVITE_EXPIRE_HOURS)
db.commit()
invite_url = f"{settings.BASE_URL}/invite?token={token}"
html = (
f"<p>Вам отправлено приглашение в ЭВОСИНК.</p>"
f'<p><a href="{invite_url}">{invite_url}</a></p>'
f"<p>Ссылка действительна {settings.INVITE_EXPIRE_HOURS} часов.</p>"
)
send_email_task.delay(user.email, "Приглашение в ЭВОСИНК", html)
return RedirectResponse(f"/admin/users/{user_id}?success=invite_sent", 303)
@router.post("/users/{user_id}/edit")
async def admin_edit_user(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if not user:
return RedirectResponse("/admin/users", 303)
form = await request.form()
data = {k: str(v).strip() for k, v in form.items()}
errors = []
if not data.get("first_name"):
errors.append("Имя обязательно")
if not data.get("last_name"):
errors.append("Фамилия обязательна")
if errors:
return _render(request, "admin/user_detail.html", {
"user": admin, "target": user, "errors": errors,
})
user.first_name = data["first_name"]
user.last_name = data["last_name"]
if data.get("email"):
user.email = data["email"]
if data.get("phone"):
user.phone = data["phone"]
if data.get("role") and admin.role == UserRoleEnum.system:
try:
user.role = UserRoleEnum(data["role"])
except ValueError:
pass
db.commit()
return RedirectResponse(f"/admin/users/{user_id}?success=saved", 303)
@router.post("/users/{user_id}/delete")
async def admin_delete_user(user_id: int, request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
if admin.role != UserRoleEnum.system:
return RedirectResponse(f"/admin/users/{user_id}", 303)
user = db.get(User, user_id)
if user:
db.delete(user)
db.commit()
return RedirectResponse("/admin/users", 303)
# ── Roles ─────────────────────────────────────────────────────────────────────
@router.get("/roles")
async def admin_roles(request: Request, db: Session = Depends(get_db)):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
if admin.role != UserRoleEnum.system:
return RedirectResponse("/admin/users", 303)
roles = db.query(Role).order_by(Role.id).all()
permissions = db.query(Permission).order_by(Permission.name).all()
role_perm_ids: dict[int, set[int]] = {}
for role in roles:
rows = db.execute(
role_permissions.select().where(role_permissions.c.role_id == role.id)
).fetchall()
role_perm_ids[role.id] = {r.permission_id for r in rows}
return _render(request, "admin/roles.html", {
"user": admin, "roles": roles, "permissions": permissions,
"role_perm_ids": role_perm_ids,
})
@router.post("/roles/{role_id}/permissions")
async def admin_update_role_permissions(
role_id: int, request: Request, db: Session = Depends(get_db)
):
try:
admin = _admin_user(request, db)
except Exception:
return RedirectResponse("/login", 303)
if admin.role != UserRoleEnum.system:
return RedirectResponse("/admin/roles", 303)
form = await request.form()
selected_ids = {int(v) for k, v in form.items() if k.startswith("perm_")}
# Remove all existing, re-insert selected
db.execute(role_permissions.delete().where(role_permissions.c.role_id == role_id))
for perm_id in selected_ids:
db.execute(role_permissions.insert().values(role_id=role_id, permission_id=perm_id))
db.commit()
return RedirectResponse("/admin/roles", 303)