Files
evo-sync/web/health_checker.py
mguschin 379f781e1e feat: add connections dashboard with background health checks
- Add /connections page showing all integrations with online/offline status
- Add background health checker that polls Evotor API every 10 minutes
- Add is_online and last_checked_at fields to evotor_connections table
- Replace Evotor navbar link with unified Connections link
- Redirect connect/disconnect flows to /connections
- Add Alembic migration for new columns

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 15:26:49 +03:00

49 lines
1.3 KiB
Python

import asyncio
import logging
from datetime import datetime
import httpx
from web.database import SessionLocal
from web.models import EvotorConnection
logger = logging.getLogger("uvicorn.error")
EVOTOR_STORES_URL = "https://api.evotor.ru/stores"
async def check_evotor_connection(access_token: str) -> bool:
try:
async with httpx.AsyncClient() as client:
response = await client.get(
EVOTOR_STORES_URL,
headers={"Authorization": f"Bearer {access_token}"},
timeout=15,
)
return response.status_code == 200
except Exception:
return False
async def run_health_checks() -> None:
db = SessionLocal()
try:
connections = db.query(EvotorConnection).all()
for conn in connections:
is_online = await check_evotor_connection(conn.access_token)
conn.is_online = is_online
conn.last_checked_at = datetime.utcnow()
db.commit()
logger.info("Health checks completed for %d connection(s)", len(connections))
except Exception:
logger.exception("Error during health checks")
db.rollback()
finally:
db.close()
async def health_check_loop(interval: int) -> None:
while True:
await run_health_checks()
await asyncio.sleep(interval)