Files
evo-sync/web/routes/auth.py

171 lines
6.5 KiB
Python
Raw Normal View History

import secrets
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from sqlalchemy import or_
from sqlalchemy.orm import Session
from web.auth.password import hash_password, verify_password
from web.auth.session import get_session_user_id
from web.config import settings
from web.database import get_db
from web.models.user import User, UserStatusEnum
from web.notifications.tasks import send_email_task
from web.templates_env import templates
router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
@router.get("/register")
async def register_get(request: Request, db: Session = Depends(get_db)):
if get_session_user_id(request):
return RedirectResponse("/profile", 303)
return _render(request, "register.html", {"user": None})
@router.post("/register")
async def register_post(request: Request, db: Session = Depends(get_db)):
form = await request.form()
data = {k: str(v).strip() for k, v in form.items()}
errors = []
if not data.get("email"):
errors.append("Email обязателен")
if not data.get("phone"):
errors.append("Телефон обязателен")
if not data.get("password"):
errors.append("Пароль обязателен")
if len(data.get("password", "")) < 8:
errors.append("Пароль должен содержать минимум 8 символов")
if data.get("password") != data.get("password_confirm"):
errors.append("Пароли не совпадают")
if not errors:
existing = db.query(User).filter(
or_(User.email == data["email"], User.phone == data["phone"])
).first()
if existing:
if existing.email == data["email"]:
errors.append("Пользователь с таким email уже существует")
else:
errors.append("Пользователь с таким телефоном уже существует")
if errors:
return _render(request, "register.html", {"user": None, "errors": errors, "form": data})
token = secrets.token_urlsafe(32)
user = User(
first_name=data.get("first_name", ""),
last_name=data.get("last_name", ""),
email=data["email"],
phone=data["phone"],
password_hash=await _hash(data["password"]),
email_confirm_token=token,
status=UserStatusEnum.pending,
)
db.add(user)
db.commit()
confirm_url = f"{settings.BASE_URL}/confirm-email?token={token}"
html = f'<p>Подтвердите email: <a href="{confirm_url}">{confirm_url}</a></p>'
send_email_task.delay(user.email, "Подтвердите ваш email — ЭВОСИНК", html)
return _render(request, "confirm_email.html", {"user": None})
@router.get("/confirm-email")
async def confirm_email(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token")
if not token:
return _render(request, "message.html", {
"user": None, "title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
"link": "/login", "link_text": "Войти",
})
user = db.query(User).filter(User.email_confirm_token == token).first()
if not user:
return _render(request, "message.html", {
"user": None, "title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
"link": "/login", "link_text": "Войти",
})
user.is_email_confirmed = True
user.email_confirm_token = None
user.status = UserStatusEnum.active
db.commit()
return _render(request, "email_confirmed.html", {"user": None})
@router.get("/resend-confirm")
async def resend_confirm(request: Request, db: Session = Depends(get_db)):
user_id = get_session_user_id(request)
if not user_id:
return RedirectResponse("/login", 303)
user = db.get(User, user_id)
if not user or user.is_email_confirmed:
return RedirectResponse("/profile", 303)
token = secrets.token_urlsafe(32)
user.email_confirm_token = token
db.commit()
confirm_url = f"{settings.BASE_URL}/confirm-email?token={token}"
html = f'<p>Подтвердите email: <a href="{confirm_url}">{confirm_url}</a></p>'
send_email_task.delay(user.email, "Подтвердите ваш email — ЭВОСИНК", html)
return _render(request, "message.html", {
"user": user,
"title": "Письмо отправлено",
"message": "Проверьте почту и нажмите на ссылку для подтверждения.",
"link": "/profile", "link_text": "Назад",
})
@router.get("/login")
async def login_get(request: Request, db: Session = Depends(get_db)):
if get_session_user_id(request):
return RedirectResponse("/profile", 303)
return _render(request, "login.html", {"user": None})
@router.post("/login")
async def login_post(request: Request, db: Session = Depends(get_db)):
form = await request.form()
email = str(form.get("email", "")).strip()
password = str(form.get("password", ""))
errors = []
if not email:
errors.append("Email обязателен")
if not password:
errors.append("Пароль обязателен")
if not errors:
user = db.query(User).filter(User.email == email).first()
if not user or not user.password_hash or not verify_password(password, user.password_hash):
errors.append("Неверный email или пароль")
elif user.status == UserStatusEnum.suspended:
errors.append("Ваш аккаунт заблокирован. Обратитесь к администратору.")
elif not user.is_email_confirmed:
errors.append("Пожалуйста, подтвердите ваш email")
if errors:
return _render(request, "login.html", {
"user": None, "errors": errors, "form": {"email": email},
})
request.session["user_id"] = user.id
return RedirectResponse("/profile", 303)
@router.get("/logout")
async def logout(request: Request):
request.session.clear()
return RedirectResponse("/login", 303)
async def _hash(plain: str) -> str:
import asyncio
return await asyncio.get_event_loop().run_in_executor(None, hash_password, plain)