Files

102 lines
4.4 KiB
Python
Raw Permalink Normal View History

import secrets
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session
from web.auth.password import hash_password
from web.config import settings
from web.database import get_db
from web.models.user import User
from web.notifications.tasks import send_email_task
from web.templates_env import templates
router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
@router.get("/forgot-password")
async def forgot_get(request: Request):
return _render(request, "forgot_password.html", {"user": None})
@router.post("/forgot-password")
async def forgot_post(request: Request, db: Session = Depends(get_db)):
form = await request.form()
email = str(form.get("email", "")).strip()
user = db.query(User).filter(User.email == email).first()
if user:
token = secrets.token_urlsafe(32)
user.password_reset_token = token
user.password_reset_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
)
db.commit()
reset_url = f"{settings.BASE_URL}/reset-password?token={token}"
html = f'<p>Сброс пароля: <a href="{reset_url}">{reset_url}</a></p>'
send_email_task.delay(user.email, "Сброс пароля — ЭВОСИНК", html)
# Always show same message to prevent user enumeration
return _render(request, "message.html", {
"user": None,
"title": "Ссылка отправлена",
"message": "Если указанный email зарегистрирован, вы получите ссылку для сброса пароля.",
"link": "/login", "link_text": "Войти",
})
@router.get("/reset-password")
async def reset_get(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token", "")
user = db.query(User).filter(User.password_reset_token == token).first()
if not user or not user.password_reset_expires or user.password_reset_expires < datetime.now(timezone.utc).replace(tzinfo=None):
return _render(request, "message.html", {
"user": None, "title": "Ссылка недействительна",
"message": "Ссылка для сброса пароля устарела или недействительна.",
"link": "/forgot-password", "link_text": "Запросить новую ссылку",
})
return _render(request, "reset_password.html", {"user": None, "token": token})
@router.post("/reset-password")
async def reset_post(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token", "")
form = await request.form()
password = str(form.get("password", ""))
password_confirm = str(form.get("password_confirm", ""))
errors = []
user = db.query(User).filter(User.password_reset_token == token).first()
if not user or not user.password_reset_expires or user.password_reset_expires < datetime.now(timezone.utc).replace(tzinfo=None):
return _render(request, "message.html", {
"user": None, "title": "Ссылка недействительна",
"message": "Ссылка для сброса пароля устарела.",
"link": "/forgot-password", "link_text": "Запросить новую ссылку",
})
if len(password) < 8:
errors.append("Пароль должен содержать минимум 8 символов")
if password != password_confirm:
errors.append("Пароли не совпадают")
if errors:
return _render(request, "reset_password.html", {
"user": None, "token": token, "errors": errors,
})
user.password_hash = hash_password(password)
user.password_reset_token = None
user.password_reset_expires = None
db.commit()
return _render(request, "message.html", {
"user": None, "title": "Пароль изменён",
"message": "Ваш пароль успешно изменён.",
"link": "/login", "link_text": "Войти",
})