Files
evo-sync/web/routes/auth.py
mguschin 5ead89e0cf feat: Evotor user lifecycle, RBAC, admin panel
- Receive Evotor webhooks: POST /user/create, /user/verify, /user/token
- Create users in pending status; match to existing users by email/phone
- Send invite link via Celery notification task; user sets password at /invite
- Abstract EmailProvider/SMSProvider with ConsoleEmailProvider default
- Role-based access control: role enum on users + roles/permissions tables
- Admin panel: /admin/users (list, filter, search, paginate), user detail card
  with activate/suspend/reset-password/send-invite/edit/delete actions
- Admin roles management: /admin/roles with per-role permission assignment
- Extend user profile card: role, status, Evotor ID, email confirmation badge
- Auth routes: register, login, logout, confirm-email, forgot/reset password
- Alembic migrations 0002 (full schema + new fields) and 0003 (RBAC + seeds)
- Port Pico CSS + Bootstrap Icons UI from Node.js commit (854c912)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 12:01:36 +03:00

171 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import secrets
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from sqlalchemy import or_
from sqlalchemy.orm import Session
from web.auth.password import hash_password, verify_password
from web.auth.session import get_session_user_id
from web.config import settings
from web.database import get_db
from web.models.user import User, UserStatusEnum
from web.notifications.tasks import send_email_task
from web.templates_env import templates
router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(template, ctx)
@router.get("/register")
async def register_get(request: Request, db: Session = Depends(get_db)):
if get_session_user_id(request):
return RedirectResponse("/profile", 303)
return _render(request, "register.html", {"user": None})
@router.post("/register")
async def register_post(request: Request, db: Session = Depends(get_db)):
form = await request.form()
data = {k: str(v).strip() for k, v in form.items()}
errors = []
if not data.get("email"):
errors.append("Email обязателен")
if not data.get("phone"):
errors.append("Телефон обязателен")
if not data.get("password"):
errors.append("Пароль обязателен")
if len(data.get("password", "")) < 8:
errors.append("Пароль должен содержать минимум 8 символов")
if data.get("password") != data.get("password_confirm"):
errors.append("Пароли не совпадают")
if not errors:
existing = db.query(User).filter(
or_(User.email == data["email"], User.phone == data["phone"])
).first()
if existing:
if existing.email == data["email"]:
errors.append("Пользователь с таким email уже существует")
else:
errors.append("Пользователь с таким телефоном уже существует")
if errors:
return _render(request, "register.html", {"user": None, "errors": errors, "form": data})
token = secrets.token_urlsafe(32)
user = User(
first_name=data.get("first_name", ""),
last_name=data.get("last_name", ""),
email=data["email"],
phone=data["phone"],
password_hash=await _hash(data["password"]),
email_confirm_token=token,
status=UserStatusEnum.pending,
)
db.add(user)
db.commit()
confirm_url = f"{settings.BASE_URL}/confirm-email?token={token}"
html = f'<p>Подтвердите email: <a href="{confirm_url}">{confirm_url}</a></p>'
send_email_task.delay(user.email, "Подтвердите ваш email — ЭВОСИНК", html)
return _render(request, "confirm_email.html", {"user": None})
@router.get("/confirm-email")
async def confirm_email(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token")
if not token:
return _render(request, "message.html", {
"user": None, "title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
"link": "/login", "link_text": "Войти",
})
user = db.query(User).filter(User.email_confirm_token == token).first()
if not user:
return _render(request, "message.html", {
"user": None, "title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
"link": "/login", "link_text": "Войти",
})
user.is_email_confirmed = True
user.email_confirm_token = None
user.status = UserStatusEnum.active
db.commit()
return _render(request, "email_confirmed.html", {"user": None})
@router.get("/resend-confirm")
async def resend_confirm(request: Request, db: Session = Depends(get_db)):
user_id = get_session_user_id(request)
if not user_id:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if not user or user.is_email_confirmed:
return RedirectResponse("/profile", 303)
token = secrets.token_urlsafe(32)
user.email_confirm_token = token
db.commit()
confirm_url = f"{settings.BASE_URL}/confirm-email?token={token}"
html = f'<p>Подтвердите email: <a href="{confirm_url}">{confirm_url}</a></p>'
send_email_task.delay(user.email, "Подтвердите ваш email — ЭВОСИНК", html)
return _render(request, "message.html", {
"user": user,
"title": "Письмо отправлено",
"message": "Проверьте почту и нажмите на ссылку для подтверждения.",
"link": "/profile", "link_text": "Назад",
})
@router.get("/login")
async def login_get(request: Request, db: Session = Depends(get_db)):
if get_session_user_id(request):
return RedirectResponse("/profile", 303)
return _render(request, "login.html", {"user": None})
@router.post("/login")
async def login_post(request: Request, db: Session = Depends(get_db)):
form = await request.form()
email = str(form.get("email", "")).strip()
password = str(form.get("password", ""))
errors = []
if not email:
errors.append("Email обязателен")
if not password:
errors.append("Пароль обязателен")
if not errors:
user = db.query(User).filter(User.email == email).first()
if not user or not user.password_hash or not verify_password(password, user.password_hash):
errors.append("Неверный email или пароль")
elif user.status == UserStatusEnum.suspended:
errors.append("Ваш аккаунт заблокирован. Обратитесь к администратору.")
elif not user.is_email_confirmed:
errors.append("Пожалуйста, подтвердите ваш email")
if errors:
return _render(request, "login.html", {
"user": None, "errors": errors, "form": {"email": email},
})
request.session["user_id"] = user.id
return RedirectResponse("/profile", 303)
@router.get("/logout")
async def logout(request: Request):
request.session.clear()
return RedirectResponse("/login", 303)
async def _hash(plain: str) -> str:
import asyncio
return await asyncio.get_event_loop().run_in_executor(None, hash_password, plain)