test: add test suite with 65 tests, 73% coverage

- Unit tests: password hashing, notification providers, webhook field parsing
- Integration tests: auth routes (register/login/confirm-email/logout),
  invite flow, Evotor webhooks (/user/create, /user/verify, /user/token),
  admin panel (access control, activate/suspend/delete/reset-password)
- conftest: SQLite in-memory engine, transactional sessions, factory-boy
  factories (UserFactory with UserRoleEnum variants)
- Fix bcrypt: replace passlib (broken on Python 3.14 + bcrypt 5.x) with
  direct bcrypt calls; drop passlib from requirements.txt
- Fix datetime.utcnow() deprecation across routes and tests
- Fix Jinja2 TemplateResponse signature (request as first positional arg)
- Add coverage config to pyproject.toml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
mguschin
2026-04-28 12:27:42 +03:00
parent 5ead89e0cf
commit fc65e591b3
18 changed files with 882 additions and 31 deletions

View File

@@ -1,4 +1,11 @@
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = ""
addopts = "--tb=short"
[tool.coverage.run]
source = ["web"]
omit = ["web/migrations/*"]
[tool.coverage.report]
show_missing = true

View File

@@ -6,8 +6,7 @@ sqlalchemy==2.0.36
alembic==1.14.0
pymysql==1.1.1
cryptography>=44.0.0
passlib[bcrypt]==1.7.4
bcrypt==4.2.1
bcrypt>=4.2.1
pydantic-settings==2.6.1
httpx==0.28.1
celery[redis]==5.4.0

View File

@@ -1,13 +1,25 @@
import pytest
import factory
from httpx import ASGITransport, AsyncClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from web.database import Base
import web.models # noqa: F401 — ensure all tables are registered on Base.metadata
from web.auth.password import hash_password
from web.database import Base, get_db
from web.main import app
from web.models.user import User, UserRoleEnum, UserStatusEnum
# ── Database fixtures ─────────────────────────────────────────────────────────
@pytest.fixture(scope="session")
def engine():
eng = create_engine("sqlite:///:memory:", echo=False)
eng = create_engine(
"sqlite:///:memory:",
echo=False,
connect_args={"check_same_thread": False},
)
Base.metadata.create_all(eng)
yield eng
Base.metadata.drop_all(eng)
@@ -23,3 +35,69 @@ def db_session(engine):
session.close()
transaction.rollback()
connection.close()
@pytest.fixture
def override_db(db_session):
"""Override FastAPI's get_db dependency with the transactional test session."""
app.dependency_overrides[get_db] = lambda: db_session
yield db_session
app.dependency_overrides.clear()
# ── HTTP client ───────────────────────────────────────────────────────────────
@pytest.fixture
async def client(override_db):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as c:
yield c
# ── Factories ─────────────────────────────────────────────────────────────────
class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session_persistence = "commit"
first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name")
email = factory.Sequence(lambda n: f"user{n}@test.com")
phone = factory.Sequence(lambda n: f"+7900{n:07d}")
password_hash = factory.LazyFunction(lambda: hash_password("testpass123"))
status = UserStatusEnum.active
is_email_confirmed = True
role = UserRoleEnum.user
@pytest.fixture
def user_factory(db_session):
UserFactory._meta.sqlalchemy_session = db_session
return UserFactory
@pytest.fixture
def active_user(user_factory):
return user_factory.create()
@pytest.fixture
def admin_user(user_factory):
return user_factory.create(role=UserRoleEnum.admin)
@pytest.fixture
def system_user(user_factory):
return user_factory.create(role=UserRoleEnum.system)
@pytest.fixture
def pending_user(user_factory):
return user_factory.create(
status=UserStatusEnum.pending,
is_email_confirmed=False,
password_hash=None,
)

View File

@@ -0,0 +1,26 @@
from web.auth.password import hash_password, verify_password
def test_hash_is_not_plaintext():
h = hash_password("secret123")
assert h != "secret123"
assert len(h) > 20
def test_verify_correct_password():
h = hash_password("mysecret")
assert verify_password("mysecret", h) is True
def test_verify_wrong_password():
h = hash_password("mysecret")
assert verify_password("wrongpassword", h) is False
def test_two_hashes_differ():
# bcrypt uses random salt — same plaintext produces different hashes
h1 = hash_password("same")
h2 = hash_password("same")
assert h1 != h2
assert verify_password("same", h1)
assert verify_password("same", h2)

View File

@@ -0,0 +1,47 @@
import logging
import pytest
from web.notifications.console import ConsoleEmailProvider, ConsoleSMSProvider
from web.notifications.registry import get_email_provider, get_sms_provider
def test_console_email_logs(caplog):
provider = ConsoleEmailProvider()
with caplog.at_level(logging.INFO, logger="web.notifications.console"):
provider.send("user@example.com", "Тест", '<a href="http://example.com/link">click</a>')
assert "user@example.com" in caplog.text
assert "Тест" in caplog.text
assert "http://example.com/link" in caplog.text
def test_console_sms_logs(caplog):
provider = ConsoleSMSProvider()
with caplog.at_level(logging.INFO, logger="web.notifications.console"):
provider.send("+79001234567", "Ваш код: 1234")
assert "+79001234567" in caplog.text
assert "Ваш код: 1234" in caplog.text
def test_registry_returns_console_email(monkeypatch):
monkeypatch.setattr("web.notifications.registry.settings.EMAIL_PROVIDER", "console")
provider = get_email_provider()
assert isinstance(provider, ConsoleEmailProvider)
def test_registry_returns_console_sms(monkeypatch):
monkeypatch.setattr("web.notifications.registry.settings.SMS_PROVIDER", "console")
provider = get_sms_provider()
assert isinstance(provider, ConsoleSMSProvider)
def test_registry_unknown_email_provider_raises(monkeypatch):
monkeypatch.setattr("web.notifications.registry.settings.EMAIL_PROVIDER", "sendgrid")
with pytest.raises(ValueError, match="sendgrid"):
get_email_provider()
def test_registry_unknown_sms_provider_raises(monkeypatch):
monkeypatch.setattr("web.notifications.registry.settings.SMS_PROVIDER", "twilio")
with pytest.raises(ValueError, match="twilio"):
get_sms_provider()

181
tests/test_routes_admin.py Normal file
View File

@@ -0,0 +1,181 @@
"""Integration tests for admin panel routes."""
import pytest
from web.models.user import User, UserRoleEnum, UserStatusEnum
def _set_session(client, user_id: int):
"""Inject a session cookie so the client appears logged in as user_id."""
client.cookies.set("session", "") # will be overwritten by actual login
# We inject directly into the app's session via a helper request
# The simplest approach: use the login endpoint to set the real session cookie
return user_id
async def _login(client, user):
"""Log in as user via the /login endpoint to get a real session cookie."""
resp = await client.post("/login", data={
"email": user.email,
"password": "testpass123",
}, follow_redirects=False)
assert resp.status_code == 303, f"Login failed: {resp.text}"
# ── Access control ────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_users_requires_auth(client):
resp = await client.get("/admin/users", follow_redirects=False)
# Unauthenticated → redirect to login
assert resp.status_code in (302, 303, 307)
@pytest.mark.asyncio
async def test_admin_users_requires_admin_role(client, active_user):
await _login(client, active_user)
resp = await client.get("/admin/users", follow_redirects=False)
# Regular user → redirect (not admin)
assert resp.status_code in (302, 303, 307)
@pytest.mark.asyncio
async def test_admin_users_accessible_by_admin(client, admin_user):
await _login(client, admin_user)
resp = await client.get("/admin/users")
assert resp.status_code == 200
assert "Пользователи" in resp.text
@pytest.mark.asyncio
async def test_admin_users_accessible_by_system(client, system_user):
await _login(client, system_user)
resp = await client.get("/admin/users")
assert resp.status_code == 200
# ── User list + filters ───────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_users_shows_all_users(client, admin_user, user_factory):
extra = user_factory.create(email="findme@test.com")
await _login(client, admin_user)
resp = await client.get("/admin/users")
assert resp.status_code == 200
assert extra.email in resp.text
@pytest.mark.asyncio
async def test_admin_users_search_filter(client, admin_user, user_factory):
target = user_factory.create(email="searchable@test.com")
await _login(client, admin_user)
resp = await client.get("/admin/users?search=searchable")
assert resp.status_code == 200
assert target.email in resp.text
# ── User detail ───────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_user_detail(client, admin_user, active_user):
await _login(client, admin_user)
resp = await client.get(f"/admin/users/{active_user.id}")
assert resp.status_code == 200
assert active_user.email in resp.text
@pytest.mark.asyncio
async def test_admin_user_detail_not_found(client, admin_user):
await _login(client, admin_user)
resp = await client.get("/admin/users/99999", follow_redirects=False)
assert resp.status_code in (302, 303, 307)
# ── Activate / suspend ────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_activate_user(client, admin_user, user_factory, override_db):
target = user_factory.create(status=UserStatusEnum.suspended)
await _login(client, admin_user)
resp = await client.post(f"/admin/users/{target.id}/activate", follow_redirects=False)
assert resp.status_code == 303
override_db.refresh(target)
assert target.status == UserStatusEnum.active
@pytest.mark.asyncio
async def test_admin_suspend_user(client, admin_user, active_user, override_db):
await _login(client, admin_user)
resp = await client.post(f"/admin/users/{active_user.id}/suspend", follow_redirects=False)
assert resp.status_code == 303
override_db.refresh(active_user)
assert active_user.status == UserStatusEnum.suspended
# ── Delete (system only) ──────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_delete_user_by_system(client, system_user, user_factory, override_db):
target = user_factory.create()
target_id = target.id
await _login(client, system_user)
resp = await client.post(f"/admin/users/{target_id}/delete", follow_redirects=False)
assert resp.status_code == 303
assert override_db.get(User, target_id) is None
@pytest.mark.asyncio
async def test_admin_delete_blocked_for_admin_role(client, admin_user, active_user, override_db):
target_id = active_user.id
await _login(client, admin_user)
resp = await client.post(f"/admin/users/{target_id}/delete", follow_redirects=False)
assert resp.status_code == 303
# Admin cannot delete — user still exists
assert override_db.get(User, target_id) is not None
# ── Reset password / send invite ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_reset_password_generates_token(client, admin_user, active_user, override_db):
from unittest.mock import patch
with patch("web.routes.admin.send_email_task") as mock_task:
await _login(client, admin_user)
resp = await client.post(
f"/admin/users/{active_user.id}/reset-password", follow_redirects=False
)
assert resp.status_code == 303
override_db.refresh(active_user)
assert active_user.password_reset_token is not None
mock_task.delay.assert_called_once()
@pytest.mark.asyncio
async def test_admin_send_invite(client, admin_user, active_user, override_db):
from unittest.mock import patch
with patch("web.routes.admin.send_email_task") as mock_task:
await _login(client, admin_user)
resp = await client.post(
f"/admin/users/{active_user.id}/send-invite", follow_redirects=False
)
assert resp.status_code == 303
override_db.refresh(active_user)
assert active_user.invite_token is not None
mock_task.delay.assert_called_once()
# ── Roles page (system only) ──────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_roles_accessible_by_system(client, system_user):
await _login(client, system_user)
resp = await client.get("/admin/roles")
assert resp.status_code == 200
assert "Роли" in resp.text
@pytest.mark.asyncio
async def test_admin_roles_blocked_for_admin(client, admin_user):
await _login(client, admin_user)
resp = await client.get("/admin/roles", follow_redirects=False)
# Admin is redirected away from roles page
assert resp.status_code in (302, 303, 307)

182
tests/test_routes_auth.py Normal file
View File

@@ -0,0 +1,182 @@
"""Integration tests for auth routes (register / login / confirm-email / logout)."""
import secrets
from unittest.mock import patch
import pytest
from web.models.user import User, UserStatusEnum
# ── /register ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_register_get(client):
resp = await client.get("/register")
assert resp.status_code == 200
assert "Регистрация" in resp.text
@pytest.mark.asyncio
@patch("web.routes.auth.send_email_task")
async def test_register_creates_pending_user(mock_task, client, override_db):
resp = await client.post("/register", data={
"first_name": "Иван",
"last_name": "Иванов",
"email": "ivan@test.com",
"phone": "+79001234567",
"password": "password123",
"password_confirm": "password123",
})
assert resp.status_code == 200
assert "Подтвердите" in resp.text
user = override_db.query(User).filter(User.email == "ivan@test.com").first()
assert user is not None
assert user.status == UserStatusEnum.pending
assert user.is_email_confirmed is False
assert user.email_confirm_token is not None
mock_task.delay.assert_called_once()
@pytest.mark.asyncio
@patch("web.routes.auth.send_email_task")
async def test_register_duplicate_email(mock_task, client, active_user):
resp = await client.post("/register", data={
"first_name": "X",
"last_name": "Y",
"email": active_user.email,
"phone": "+79999999999",
"password": "password123",
"password_confirm": "password123",
})
assert resp.status_code == 200
assert "уже существует" in resp.text
mock_task.delay.assert_not_called()
@pytest.mark.asyncio
async def test_register_password_mismatch(client):
resp = await client.post("/register", data={
"email": "new@test.com",
"phone": "+79000000001",
"password": "password123",
"password_confirm": "different",
})
assert resp.status_code == 200
assert "не совпадают" in resp.text
@pytest.mark.asyncio
async def test_register_short_password(client):
resp = await client.post("/register", data={
"email": "new@test.com",
"phone": "+79000000002",
"password": "short",
"password_confirm": "short",
})
assert resp.status_code == 200
assert "минимум 8" in resp.text
# ── /confirm-email ────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_confirm_email_valid_token(client, override_db, user_factory):
token = secrets.token_urlsafe(32)
user = user_factory.create(
is_email_confirmed=False,
email_confirm_token=token,
status=UserStatusEnum.pending,
)
resp = await client.get(f"/confirm-email?token={token}")
assert resp.status_code == 200
assert "подтвержден" in resp.text.lower()
override_db.refresh(user)
assert user.is_email_confirmed is True
assert user.status == UserStatusEnum.active
assert user.email_confirm_token is None
@pytest.mark.asyncio
async def test_confirm_email_invalid_token(client):
resp = await client.get("/confirm-email?token=bogustoken")
assert resp.status_code == 200
assert "Ошибка" in resp.text
@pytest.mark.asyncio
async def test_confirm_email_missing_token(client):
resp = await client.get("/confirm-email")
assert resp.status_code == 200
assert "Ошибка" in resp.text
# ── /login ────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_login_get(client):
resp = await client.get("/login")
assert resp.status_code == 200
assert "Вход" in resp.text
@pytest.mark.asyncio
async def test_login_success(client, active_user):
resp = await client.post("/login", data={
"email": active_user.email,
"password": "testpass123",
}, follow_redirects=False)
assert resp.status_code == 303
assert resp.headers["location"] == "/profile"
@pytest.mark.asyncio
async def test_login_wrong_password(client, active_user):
resp = await client.post("/login", data={
"email": active_user.email,
"password": "wrongpassword",
})
assert resp.status_code == 200
assert "Неверный" in resp.text
@pytest.mark.asyncio
async def test_login_unknown_email(client):
resp = await client.post("/login", data={
"email": "nobody@test.com",
"password": "testpass123",
})
assert resp.status_code == 200
assert "Неверный" in resp.text
@pytest.mark.asyncio
async def test_login_suspended_user(client, user_factory):
user = user_factory.create(status=UserStatusEnum.suspended)
resp = await client.post("/login", data={
"email": user.email,
"password": "testpass123",
})
assert resp.status_code == 200
assert "заблокирован" in resp.text.lower()
@pytest.mark.asyncio
async def test_login_unconfirmed_email(client, user_factory):
user = user_factory.create(is_email_confirmed=False, status=UserStatusEnum.pending)
resp = await client.post("/login", data={
"email": user.email,
"password": "testpass123",
})
assert resp.status_code == 200
assert "подтвердите" in resp.text.lower()
# ── /logout ───────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_logout_redirects(client):
resp = await client.get("/logout", follow_redirects=False)
assert resp.status_code == 303
assert resp.headers["location"] == "/login"

View File

@@ -0,0 +1,196 @@
"""Integration tests for Evotor webhook endpoints."""
import json
from unittest.mock import patch
import pytest
from web.models.connections import EvotorConnection
from web.models.user import User, UserStatusEnum
WEBHOOK_SECRET = "test-secret-abc"
def auth_headers(secret=WEBHOOK_SECRET):
return {"Authorization": f"Bearer {secret}"}
# ── /user/create ──────────────────────────────────────────────────────────────
@pytest.mark.asyncio
@patch("web.routes.evotor_webhooks.send_email_task")
async def test_user_create_new_user(mock_task, client, override_db, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
monkeypatch.setattr("web.routes.evotor_webhooks.settings.INVITE_EXPIRE_HOURS", 48)
monkeypatch.setattr("web.routes.evotor_webhooks.settings.BASE_URL", "http://test")
payload = {
"userId": "evo-001",
"customField": json.dumps({"email": "newuser@test.com", "phone": "+79001234501"}),
}
resp = await client.post("/user/create", json=payload, headers=auth_headers())
assert resp.status_code == 200
data = resp.json()
assert data["userId"] == "evo-001"
assert "token" in data
assert len(data["token"]) > 10
user = override_db.query(User).filter(User.evotor_user_id == "evo-001").first()
assert user is not None
assert user.status == UserStatusEnum.pending
assert user.invite_token is not None
assert user.password_hash is None
conn = override_db.query(EvotorConnection).filter(
EvotorConnection.evotor_user_id == "evo-001"
).first()
assert conn is not None
assert conn.api_token == data["token"]
mock_task.delay.assert_called_once()
@pytest.mark.asyncio
@patch("web.routes.evotor_webhooks.send_email_task")
async def test_user_create_links_existing_user_by_email(mock_task, client, override_db, user_factory, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
monkeypatch.setattr("web.routes.evotor_webhooks.settings.INVITE_EXPIRE_HOURS", 48)
monkeypatch.setattr("web.routes.evotor_webhooks.settings.BASE_URL", "http://test")
existing = user_factory.create(email="existing@test.com")
assert existing.evotor_user_id is None
payload = {
"userId": "evo-link-001",
"customField": json.dumps({"email": "existing@test.com"}),
}
resp = await client.post("/user/create", json=payload, headers=auth_headers())
assert resp.status_code == 200
override_db.refresh(existing)
assert existing.evotor_user_id == "evo-link-001"
@pytest.mark.asyncio
async def test_user_create_wrong_secret(client, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
resp = await client.post("/user/create", json={"userId": "x"}, headers={"Authorization": "Bearer wrong"})
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_user_create_missing_user_id(client, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", "")
resp = await client.post("/user/create", json={"customField": "{}"})
assert resp.status_code == 400
@pytest.mark.asyncio
@patch("web.routes.evotor_webhooks.send_email_task")
async def test_user_create_no_secret_dev_mode(mock_task, client, override_db, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", "")
monkeypatch.setattr("web.routes.evotor_webhooks.settings.INVITE_EXPIRE_HOURS", 48)
monkeypatch.setattr("web.routes.evotor_webhooks.settings.BASE_URL", "http://test")
resp = await client.post("/user/create", json={"userId": "evo-dev-001"})
assert resp.status_code == 200
assert resp.json()["userId"] == "evo-dev-001"
# ── /user/verify ──────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_user_verify_success(client, override_db, user_factory, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
user = user_factory.create(evotor_user_id="evo-verify-001")
conn = EvotorConnection(
user_id=user.id,
evotor_user_id="evo-verify-001",
access_token="evotor-access-token",
api_token="my-api-token-xyz",
)
override_db.add(conn)
override_db.commit()
resp = await client.post("/user/verify", json={
"userId": "evo-verify-001",
"username": user.email,
"password": "testpass123",
}, headers=auth_headers())
assert resp.status_code == 200
data = resp.json()
assert data["userId"] == "evo-verify-001"
assert "token" in data
@pytest.mark.asyncio
async def test_user_verify_wrong_password(client, user_factory, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
user = user_factory.create()
resp = await client.post("/user/verify", json={
"userId": "x",
"username": user.email,
"password": "wrongpass",
}, headers=auth_headers())
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_user_verify_suspended(client, user_factory, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
user = user_factory.create(status=UserStatusEnum.suspended)
resp = await client.post("/user/verify", json={
"userId": "x",
"username": user.email,
"password": "testpass123",
}, headers=auth_headers())
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_user_verify_no_password_hash(client, user_factory, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
user = user_factory.create(password_hash=None)
resp = await client.post("/user/verify", json={
"userId": "x",
"username": user.email,
"password": "anything",
}, headers=auth_headers())
assert resp.status_code == 401
# ── /user/token ───────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_user_token_updates_connection(client, override_db, user_factory, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
user = user_factory.create(evotor_user_id="evo-token-001")
old_conn = EvotorConnection(
user_id=user.id,
evotor_user_id="evo-token-001",
access_token="old-token",
api_token="api-tok",
)
override_db.add(old_conn)
override_db.commit()
resp = await client.post("/user/token", json={
"userId": "evo-token-001",
"token": "new-evotor-token-xyz",
}, headers=auth_headers())
assert resp.status_code == 200
assert resp.json() == {}
override_db.refresh(old_conn)
assert old_conn.access_token == "new-evotor-token-xyz"
assert old_conn.is_online is True
@pytest.mark.asyncio
async def test_user_token_unknown_user(client, monkeypatch):
monkeypatch.setattr("web.routes.evotor_webhooks.settings.EVOTOR_WEBHOOK_SECRET", WEBHOOK_SECRET)
resp = await client.post("/user/token", json={
"userId": "does-not-exist",
"token": "some-token",
}, headers=auth_headers())
assert resp.status_code == 404

View File

@@ -0,0 +1,95 @@
"""Integration tests for the Evotor invite flow (/invite)."""
import secrets
from datetime import datetime, timezone, timedelta
from unittest.mock import patch
import pytest
from web.models.user import User, UserStatusEnum
@pytest.mark.asyncio
async def test_invite_get_valid_token(client, override_db, user_factory):
token = secrets.token_urlsafe(32)
user = user_factory.create(
invite_token=token,
invite_expires=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=48),
password_hash=None,
status=UserStatusEnum.pending,
is_email_confirmed=False,
)
resp = await client.get(f"/invite?token={token}")
assert resp.status_code == 200
assert "Завершение регистрации" in resp.text or "ЭВОСИНК" in resp.text
@pytest.mark.asyncio
async def test_invite_get_expired_token(client, user_factory):
token = secrets.token_urlsafe(32)
user_factory.create(
invite_token=token,
invite_expires=datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1),
password_hash=None,
status=UserStatusEnum.pending,
)
resp = await client.get(f"/invite?token={token}")
assert resp.status_code == 200
assert "недействительна" in resp.text
@pytest.mark.asyncio
async def test_invite_get_bogus_token(client):
resp = await client.get("/invite?token=notexist")
assert resp.status_code == 200
assert "недействительна" in resp.text
@pytest.mark.asyncio
async def test_invite_post_activates_user(client, override_db, user_factory):
token = secrets.token_urlsafe(32)
user = user_factory.create(
email="invite@test.com",
phone="+79001119999",
invite_token=token,
invite_expires=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=48),
password_hash=None,
status=UserStatusEnum.pending,
is_email_confirmed=False,
)
resp = await client.post(f"/invite?token={token}", data={
"first_name": "Петр",
"last_name": "Петров",
"email": "invite@test.com",
"phone": "+79001119999",
"password": "newpassword1",
"password_confirm": "newpassword1",
})
assert resp.status_code == 200
assert "активирован" in resp.text.lower()
override_db.refresh(user)
assert user.status == UserStatusEnum.active
assert user.is_email_confirmed is True
assert user.password_hash is not None
assert user.invite_token is None
@pytest.mark.asyncio
async def test_invite_post_password_mismatch(client, user_factory):
token = secrets.token_urlsafe(32)
user_factory.create(
invite_token=token,
invite_expires=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=48),
password_hash=None,
status=UserStatusEnum.pending,
)
resp = await client.post(f"/invite?token={token}", data={
"first_name": "А",
"last_name": "Б",
"email": "x@test.com",
"phone": "+79001112233",
"password": "password123",
"password_confirm": "different",
})
assert resp.status_code == 200
assert "не совпадают" in resp.text

View File

@@ -0,0 +1,40 @@
"""Unit tests for _parse_custom_fields — no DB or HTTP needed."""
import json
from web.routes.evotor_webhooks import _parse_custom_fields
def test_none_returns_empty():
assert _parse_custom_fields(None) == {}
def test_dict_passthrough():
d = {"email": "a@b.com", "phone": "+79001234567"}
assert _parse_custom_fields(d) == d
def test_json_string_parsed():
raw = json.dumps({"email": "a@b.com", "firstName": "Иван"})
result = _parse_custom_fields(raw)
assert result["email"] == "a@b.com"
assert result["firstName"] == "Иван"
def test_plain_string_returns_empty():
# A plain non-JSON string cannot be parsed into fields
assert _parse_custom_fields("just some text") == {}
def test_json_array_returns_empty():
# A JSON array is not a dict — treat as unparseable
assert _parse_custom_fields("[1, 2, 3]") == {}
def test_empty_string_returns_empty():
assert _parse_custom_fields("") == {}
def test_nested_values_preserved():
raw = {"email": "x@y.com", "meta": {"key": "val"}}
result = _parse_custom_fields(raw)
assert result["meta"]["key"] == "val"

View File

@@ -1,11 +1,12 @@
from passlib.context import CryptContext
_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
import bcrypt
def hash_password(plain: str) -> str:
return _ctx.hash(plain)
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt(rounds=12)).decode()
def verify_password(plain: str, hashed: str) -> bool:
return _ctx.verify(plain, hashed)
try:
return bcrypt.checkpw(plain.encode(), hashed.encode())
except Exception:
return False

View File

@@ -67,8 +67,7 @@ from fastapi.exception_handlers import http_exception_handler # noqa: E402
@app.exception_handler(403)
async def forbidden_handler(request: Request, exc: HTTPException) -> HTMLResponse:
return templates.TemplateResponse("message.html", {
"request": request,
return templates.TemplateResponse(request, "message.html", {
"user": None,
"title": "Нет доступа",
"message": "У вас недостаточно прав для просмотра этой страницы.",

View File

@@ -1,5 +1,5 @@
import secrets
from datetime import datetime, timedelta
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
@@ -23,7 +23,7 @@ PAGE_SIZE = 25
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(template, ctx)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
def _admin_user(request: Request, db: Session) -> User:
@@ -139,7 +139,7 @@ async def admin_reset_password(user_id: int, request: Request, db: Session = Dep
if user:
token = secrets.token_urlsafe(32)
user.password_reset_token = token
user.password_reset_expires = datetime.utcnow() + timedelta(
user.password_reset_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
)
db.commit()
@@ -159,7 +159,7 @@ async def admin_send_invite(user_id: int, request: Request, db: Session = Depend
if user:
token = secrets.token_urlsafe(32)
user.invite_token = token
user.invite_expires = datetime.utcnow() + timedelta(hours=settings.INVITE_EXPIRE_HOURS)
user.invite_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=settings.INVITE_EXPIRE_HOURS)
db.commit()
invite_url = f"{settings.BASE_URL}/invite?token={token}"
html = (

View File

@@ -19,7 +19,7 @@ router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(template, ctx)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
@router.get("/register")

View File

@@ -8,7 +8,7 @@ POST /user/token — Evotor sends us its own API token for the user.
import json
import logging
import secrets
from datetime import datetime, timedelta
from datetime import datetime, timezone, timedelta
from typing import Any
from fastapi import APIRouter, Depends, Request
@@ -64,7 +64,7 @@ def _upsert_evotor_connection(
conn = db.query(EvotorConnection).filter(
EvotorConnection.evotor_user_id == evotor_user_id
).first()
now = datetime.utcnow()
now = datetime.now(timezone.utc).replace(tzinfo=None)
if conn:
conn.api_token = api_token
if user_id is not None:
@@ -120,7 +120,7 @@ async def user_create(request: Request, db: Session = Depends(get_db)):
if user is None and phone:
user = db.query(User).filter(User.phone == phone).first()
now = datetime.utcnow()
now = datetime.now(timezone.utc).replace(tzinfo=None)
if user:
# Link Evotor to existing user
@@ -246,7 +246,7 @@ async def user_token(request: Request, db: Session = Depends(get_db)):
conn = db.query(EvotorConnection).filter(
EvotorConnection.evotor_user_id == evotor_user_id
).first()
now = datetime.utcnow()
now = datetime.now(timezone.utc).replace(tzinfo=None)
if conn:
conn.access_token = evotor_token
conn.is_online = True

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
@@ -17,7 +17,7 @@ router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(template, ctx)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
def _bad_token(request: Request) -> HTMLResponse:
@@ -33,7 +33,7 @@ def _bad_token(request: Request) -> HTMLResponse:
async def invite_get(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token", "")
user = db.query(User).filter(User.invite_token == token).first()
if not user or not user.invite_expires or user.invite_expires < datetime.utcnow():
if not user or not user.invite_expires or user.invite_expires < datetime.now(timezone.utc).replace(tzinfo=None):
return _bad_token(request)
return _render(request, "invite.html", {"user": None, "invite_user": user, "token": token})
@@ -42,7 +42,7 @@ async def invite_get(request: Request, db: Session = Depends(get_db)):
async def invite_post(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token", "")
invite_user = db.query(User).filter(User.invite_token == token).first()
if not invite_user or not invite_user.invite_expires or invite_user.invite_expires < datetime.utcnow():
if not invite_user or not invite_user.invite_expires or invite_user.invite_expires < datetime.now(timezone.utc).replace(tzinfo=None):
return _bad_token(request)
form = await request.form()

View File

@@ -16,7 +16,7 @@ router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(template, ctx)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
@router.get("/profile")

View File

@@ -1,5 +1,5 @@
import secrets
from datetime import datetime, timedelta
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
@@ -18,7 +18,7 @@ router = APIRouter()
def _render(request: Request, template: str, ctx: dict) -> HTMLResponse:
ctx["request"] = request
ctx.setdefault("jivosite_widget_id", settings.JIVOSITE_WIDGET_ID)
return templates.TemplateResponse(template, ctx)
return templates.TemplateResponse(ctx.pop("request"), template, ctx)
@router.get("/forgot-password")
@@ -34,7 +34,7 @@ async def forgot_post(request: Request, db: Session = Depends(get_db)):
if user:
token = secrets.token_urlsafe(32)
user.password_reset_token = token
user.password_reset_expires = datetime.utcnow() + timedelta(
user.password_reset_expires = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(
minutes=settings.PASSWORD_RESET_EXPIRE_MINUTES
)
db.commit()
@@ -54,7 +54,7 @@ async def forgot_post(request: Request, db: Session = Depends(get_db)):
async def reset_get(request: Request, db: Session = Depends(get_db)):
token = request.query_params.get("token", "")
user = db.query(User).filter(User.password_reset_token == token).first()
if not user or not user.password_reset_expires or user.password_reset_expires < datetime.utcnow():
if not user or not user.password_reset_expires or user.password_reset_expires < datetime.now(timezone.utc).replace(tzinfo=None):
return _render(request, "message.html", {
"user": None, "title": "Ссылка недействительна",
"message": "Ссылка для сброса пароля устарела или недействительна.",
@@ -72,7 +72,7 @@ async def reset_post(request: Request, db: Session = Depends(get_db)):
errors = []
user = db.query(User).filter(User.password_reset_token == token).first()
if not user or not user.password_reset_expires or user.password_reset_expires < datetime.utcnow():
if not user or not user.password_reset_expires or user.password_reset_expires < datetime.now(timezone.utc).replace(tzinfo=None):
return _render(request, "message.html", {
"user": None, "title": "Ссылка недействительна",
"message": "Ссылка для сброса пароля устарела.",