Files
evo-sync/web/routes/auth.py

124 lines
4.5 KiB
Python
Raw Normal View History

import secrets
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from sqlalchemy.orm import Session
from web.auth.password import verify_password
from web.auth.session import get_session_user_id
from web.config import settings
from web.database import get_db
from web.models.user import User, UserStatusEnum
from web.notifications.tasks import send_email_task
from web.templates_env import templates
router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
@router.get("/register")
async def register_get(request: Request):
return Response(status_code=404)
@router.post("/register")
async def register_post(request: Request):
return Response(status_code=404)
@router.get("/confirm-email")
async def confirm_email(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token")
if not token:
return _render(request, "message.html", {
"user": None, "title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
"link": "/login", "link_text": "Войти",
})
user = db.query(User).filter(User.email_confirm_token == token).first()
if not user:
return _render(request, "message.html", {
"user": None, "title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
"link": "/login", "link_text": "Войти",
})
user.is_email_confirmed = True
user.email_confirm_token = None
user.status = UserStatusEnum.active
db.commit()
return _render(request, "email_confirmed.html", {"user": None})
@router.get("/resend-confirm")
async def resend_confirm(request: Request, db: Session = Depends(get_db)):
user_id = get_session_user_id(request)
if not user_id:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if not user or user.is_email_confirmed:
return RedirectResponse("/profile", 303)
token = secrets.token_urlsafe(32)
user.email_confirm_token = token
db.commit()
confirm_url = f"{settings.BASE_URL}/confirm-email?token={token}"
html = f'<p>Подтвердите email: <a href="{confirm_url}">{confirm_url}</a></p>'
send_email_task.delay(user.email, "Подтвердите ваш email — ЭВОСИНК", html)
return _render(request, "message.html", {
"user": user,
"title": "Письмо отправлено",
"message": "Проверьте почту и нажмите на ссылку для подтверждения.",
"link": "/profile", "link_text": "Назад",
})
@router.get("/login")
async def login_get(request: Request, db: Session = Depends(get_db)):
if get_session_user_id(request):
return RedirectResponse("/profile", 303)
return _render(request, "login.html", {"user": None})
@router.post("/login")
async def login_post(request: Request, db: Session = Depends(get_db)):
form = await request.form()
email = str(form.get("email", "")).strip()
password = str(form.get("password", ""))
errors = []
if not email:
errors.append("Email обязателен")
if not password:
errors.append("Пароль обязателен")
if not errors:
user = db.query(User).filter(User.email == email).first()
if not user or not user.password_hash or not verify_password(password, user.password_hash):
errors.append("Неверный email или пароль")
elif user.status == UserStatusEnum.suspended:
errors.append("Ваш аккаунт заблокирован. Обратитесь к администратору.")
elif not user.is_email_confirmed:
errors.append("Пожалуйста, подтвердите ваш email")
if errors:
return _render(request, "login.html", {
"user": None, "errors": errors, "form": {"email": email},
})
request.session["user_id"] = user.id
return RedirectResponse("/profile", 303)
@router.get("/logout")
async def logout(request: Request):
request.session.clear()
return RedirectResponse("/login", 303)
async def _hash(plain: str) -> str:
import asyncio
from web.auth.password import hash_password
return await asyncio.get_event_loop().run_in_executor(None, hash_password, plain)