Add user registration and auth web app

FastAPI + Jinja2 + MariaDB web application with registration,
login, profile, password reset, and email confirmation flows.
All UI in Russian. Styled with Evotor brand colors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
mguschin
2026-03-04 22:01:58 +03:00
parent 053fa62395
commit 951c12a208
26 changed files with 929 additions and 0 deletions

0
web/routes/__init__.py Normal file
View File

123
web/routes/auth.py Normal file
View File

@@ -0,0 +1,123 @@
import uuid
from fastapi import APIRouter, Request, Depends
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from web.auth import hash_password, verify_password, get_current_user
from web.config import settings
from web.database import get_db
from web.models import User
from web.schemas import validate_registration, validate_login
router = APIRouter()
templates = Jinja2Templates(directory="web/templates")
@router.get("/register")
def register_form(request: Request, user: User | None = Depends(get_current_user)):
if user:
return RedirectResponse("/profile", 303)
return templates.TemplateResponse("register.html", {"request": request, "user": None})
@router.post("/register")
async def register_submit(request: Request, db: Session = Depends(get_db)):
form = await request.form()
data = dict(form)
errors = validate_registration(data)
if not errors:
existing = db.query(User).filter(
(User.email == data["email"].strip()) | (User.phone == data["phone"].strip())
).first()
if existing:
if existing.email == data["email"].strip():
errors.append("Пользователь с таким email уже существует")
else:
errors.append("Пользователь с таким телефоном уже существует")
if errors:
return templates.TemplateResponse("register.html", {
"request": request, "user": None, "errors": errors, "form": data,
})
token = uuid.uuid4().hex
user = User(
first_name=data["first_name"].strip(),
last_name=data["last_name"].strip(),
email=data["email"].strip(),
phone=data["phone"].strip(),
password_hash=hash_password(data["password"]),
email_confirm_token=token,
)
db.add(user)
db.commit()
confirm_url = f"{settings.BASE_URL}/confirm-email?token={token}"
print("=" * 40)
print("ПОДТВЕРЖДЕНИЕ EMAIL")
print(f"Пользователь: {user.email}")
print(f"Ссылка: {confirm_url}")
print("=" * 40)
return templates.TemplateResponse("confirm_email.html", {"request": request, "user": None})
@router.get("/confirm-email")
def confirm_email(request: Request, token: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email_confirm_token == token).first()
if not user:
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
})
user.is_email_confirmed = True
user.email_confirm_token = None
db.commit()
return templates.TemplateResponse("email_confirmed.html", {"request": request, "user": None})
@router.get("/login")
def login_form(request: Request, user: User | None = Depends(get_current_user)):
if user:
return RedirectResponse("/profile", 303)
return templates.TemplateResponse("login.html", {"request": request, "user": None})
@router.post("/login")
async def login_submit(request: Request, db: Session = Depends(get_db)):
form = await request.form()
data = dict(form)
errors = validate_login(data)
if errors:
return templates.TemplateResponse("login.html", {
"request": request, "user": None, "errors": errors, "form": data,
})
user = db.query(User).filter(User.email == data["email"].strip()).first()
if not user or not verify_password(data["password"], user.password_hash):
return templates.TemplateResponse("login.html", {
"request": request, "user": None,
"errors": ["Неверный email или пароль"], "form": data,
})
if not user.is_email_confirmed:
return templates.TemplateResponse("login.html", {
"request": request, "user": None,
"errors": ["Пожалуйста, подтвердите ваш email"], "form": data,
})
request.session["user_id"] = user.id
return RedirectResponse("/profile", 303)
@router.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse("/login", 303)

55
web/routes/profile.py Normal file
View File

@@ -0,0 +1,55 @@
from fastapi import APIRouter, Request, Depends
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from web.auth import get_current_user
from web.database import get_db
from web.models import User
from web.schemas import validate_profile
router = APIRouter()
templates = Jinja2Templates(directory="web/templates")
@router.get("/profile")
def profile_view(request: Request, user: User | None = Depends(get_current_user)):
if not user:
return RedirectResponse("/login", 303)
return templates.TemplateResponse("profile.html", {"request": request, "user": user})
@router.post("/profile")
async def profile_update(
request: Request,
db: Session = Depends(get_db),
user: User | None = Depends(get_current_user),
):
if not user:
return RedirectResponse("/login", 303)
form = await request.form()
data = dict(form)
errors = validate_profile(data)
if not errors:
existing = db.query(User).filter(
User.phone == data["phone"].strip(), User.id != user.id
).first()
if existing:
errors.append("Пользователь с таким телефоном уже существует")
if errors:
return templates.TemplateResponse("profile.html", {
"request": request, "user": user, "errors": errors, "form": data,
})
user.first_name = data["first_name"].strip()
user.last_name = data["last_name"].strip()
user.phone = data["phone"].strip()
db.commit()
return templates.TemplateResponse("profile.html", {
"request": request, "user": user, "success": "Профиль обновлен",
})

108
web/routes/reset.py Normal file
View File

@@ -0,0 +1,108 @@
import uuid
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Request, Depends
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from web.auth import hash_password
from web.config import settings
from web.database import get_db
from web.models import User
from web.schemas import validate_reset_password
router = APIRouter()
templates = Jinja2Templates(directory="web/templates")
@router.get("/forgot-password")
def forgot_form(request: Request):
return templates.TemplateResponse("forgot_password.html", {"request": request, "user": None})
@router.post("/forgot-password")
async def forgot_submit(request: Request, db: Session = Depends(get_db)):
form = await request.form()
email = form.get("email", "").strip()
if email:
user = db.query(User).filter(User.email == email).first()
if user:
token = uuid.uuid4().hex
user.password_reset_token = token
user.password_reset_expires = datetime.now(timezone.utc) + timedelta(
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
)
db.commit()
reset_url = f"{settings.BASE_URL}/reset-password?token={token}"
print("=" * 40)
print("СБРОС ПАРОЛЯ")
print(f"Пользователь: {user.email}")
print(f"Ссылка: {reset_url}")
print(f"Действительна: {settings.PASSWORD_RESET_EXPIRE_MINUTES} мин.")
print("=" * 40)
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Сброс пароля",
"message": "Если аккаунт с таким email существует, ссылка для сброса пароля выведена в консоль сервера.",
})
@router.get("/reset-password")
def reset_form(request: Request, token: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.password_reset_token == token).first()
if not user or not user.password_reset_expires:
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
})
if datetime.now(timezone.utc) > user.password_reset_expires.replace(tzinfo=timezone.utc):
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Ошибка", "message": "Срок действия ссылки истек.",
})
return templates.TemplateResponse("reset_password.html", {
"request": request, "user": None, "token": token,
})
@router.post("/reset-password")
async def reset_submit(request: Request, token: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.password_reset_token == token).first()
if not user or not user.password_reset_expires:
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Ошибка", "message": "Неверная или устаревшая ссылка.",
})
if datetime.now(timezone.utc) > user.password_reset_expires.replace(tzinfo=timezone.utc):
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Ошибка", "message": "Срок действия ссылки истек.",
})
form = await request.form()
data = dict(form)
errors = validate_reset_password(data)
if errors:
return templates.TemplateResponse("reset_password.html", {
"request": request, "user": None, "token": token, "errors": errors,
})
user.password_hash = hash_password(data["password"])
user.password_reset_token = None
user.password_reset_expires = None
db.commit()
return templates.TemplateResponse("message.html", {
"request": request, "user": None,
"title": "Пароль изменен",
"message": "Ваш пароль успешно изменен. Теперь вы можете войти.",
"link": "/login", "link_text": "Войти",
})