""" FastAPI app for the FIPI scraper service. """ from __future__ import annotations from datetime import datetime import logging import os from pathlib import Path import re import ssl from typing import Dict, List, Optional from urllib.parse import parse_qs, urlparse from bs4 import BeautifulSoup from dotenv import load_dotenv from fastapi import BackgroundTasks, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import httpx from models import ( CheckAnswerRequest, CheckAnswerResponse, ErrorResponse, HealthResponse, ScrapeRequest, ScrapeResponse, StatsResponse, TaskResponse, ) from scraper import FIPIScraper BASE_DIR = Path(__file__).resolve().parent load_dotenv(BASE_DIR / ".env") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) SUPABASE_AVAILABLE = False SupabaseClient = None try: from supabase_client import SupabaseClient SUPABASE_AVAILABLE = True except ImportError as exc: logger.warning("Supabase client import failed: %s", exc) except Exception as exc: # pragma: no cover - startup guard logger.warning("Supabase client init failed: %s", exc) app = FastAPI( title="AI Scraper FIPI", description="Collects, stores and validates FIPI tasks.", version="1.1.1-proof-20260317", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) scraper: Optional[FIPIScraper] = None supabase_client: Optional[SupabaseClient] = None last_auto_refresh_at: Optional[datetime] = None refresh_in_progress = False @app.on_event("startup") async def startup_event() -> None: global scraper, supabase_client scraper = FIPIScraper(base_url=os.getenv("FIPI_BASE_URL", "https://fipi.ru")) logger.info("FIPIScraper initialized") if not SUPABASE_AVAILABLE: logger.info("Supabase disabled") return supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") if not supabase_url or not supabase_key: logger.warning("SUPABASE_URL or SUPABASE_SERVICE_KEY missing") return try: client = SupabaseClient(url=supabase_url, key=supabase_key) if await client.is_available(): supabase_client = client logger.info("Supabase client connected") else: logger.error("Supabase is unavailable") except Exception as exc: # pragma: no cover - startup guard logger.error("Supabase startup error: %s", exc) def _require_supabase() -> SupabaseClient: if not supabase_client: raise HTTPException(status_code=503, detail="Supabase is not configured") return supabase_client def _require_scraper() -> FIPIScraper: if not scraper: raise HTTPException(status_code=503, detail="Scraper is not configured") return scraper def _can_check_answer(task: Dict) -> bool: if task.get("source_kind") == "dynamic_bank" and task.get("task_guid"): return True source_url = task.get("source_url", "") parsed = urlparse(source_url) query = parse_qs(parsed.query) project_guid = (query.get("proj") or [None])[0] question_id = (query.get("qid") or [None])[0] return parsed.path.endswith("/questions.php") and bool(project_guid and (question_id or task.get("task_guid"))) def _serialize_task(task: Dict) -> Dict: payload = dict(task) payload["can_check_answer"] = _can_check_answer(task) return payload async def _persist_tasks(tasks: List[Dict]) -> Dict[str, int]: client = _require_supabase() saved = 0 duplicates = 0 for task in tasks: result = await client.insert_task(task) if result: saved += 1 else: duplicates += 1 return {"saved": saved, "duplicates": duplicates} async def _collect_tasks(subject: str = "russian", *, include_official_archives: bool = True) -> List[Dict]: service = _require_scraper() if include_official_archives: return await service.scrape_tasks(subject=subject, include_official_archives=True) return await service.scrape_dynamic_bank(subject=subject) async def _refresh_tasks(subject: str = "russian", *, include_official_archives: bool = True) -> Dict[str, int]: scraped_tasks = await _collect_tasks( subject=subject, include_official_archives=include_official_archives, ) persisted = await _persist_tasks(scraped_tasks) return { "scraped": len(scraped_tasks), "saved": persisted["saved"], "duplicates": persisted["duplicates"], } def _needs_task_refresh(tasks: List[Dict]) -> bool: if not tasks: return True dynamic_count = sum(1 for task in tasks if task.get("source_kind") == "dynamic_bank") checkable_count = sum(1 for task in tasks if _can_check_answer(task)) minimum_total = max(10, int(os.getenv("SCRAPER_MIN_READY_TASKS", "40"))) if dynamic_count == 0 or checkable_count == 0: return True return len(tasks) < minimum_total def _needs_dynamic_bank_refresh(tasks: List[Dict]) -> bool: if not tasks: return True dynamic_count = sum(1 for task in tasks if task.get("source_kind") == "dynamic_bank") checkable_count = sum(1 for task in tasks if _can_check_answer(task)) return dynamic_count == 0 or checkable_count == 0 def _is_refresh_running() -> bool: return refresh_in_progress async def _run_refresh(subject: str = "russian", include_official_archives: bool = True) -> None: global refresh_in_progress try: refresh_in_progress = True refreshed = await _refresh_tasks( subject=subject, include_official_archives=include_official_archives, ) logger.info( "Background refresh finished: scraped=%s saved=%s duplicates=%s include_archives=%s", refreshed["scraped"], refreshed["saved"], refreshed["duplicates"], include_official_archives, ) except Exception as exc: # pragma: no cover - background guard logger.error("Background refresh failed: %s", exc) finally: refresh_in_progress = False def _schedule_refresh( background_tasks: BackgroundTasks, subject: str = "russian", *, include_official_archives: bool = True, ) -> bool: global last_auto_refresh_at if _is_refresh_running(): return False last_auto_refresh_at = datetime.utcnow() background_tasks.add_task(_run_refresh, subject, include_official_archives) return True async def _ensure_tasks_available(background_tasks: BackgroundTasks, subject: str = "russian") -> List[Dict]: global last_auto_refresh_at client = _require_supabase() existing = await client.get_all_tasks() if existing and not _needs_task_refresh(existing): return existing if not existing: logger.info("Tasks table is empty, running initial dynamic scrape") last_auto_refresh_at = datetime.utcnow() await _refresh_tasks(subject=subject, include_official_archives=False) refreshed = await client.get_all_tasks() if refreshed and _schedule_refresh(background_tasks, subject, include_official_archives=True): logger.info("Scheduled full refresh after initial dynamic scrape") return refreshed or existing if _needs_dynamic_bank_refresh(existing): if _is_refresh_running(): logger.info("Dynamic bank refresh is already running, returning existing tasks") return existing if _schedule_refresh(background_tasks, subject, include_official_archives=False): logger.info("Tasks are missing dynamic/checkable entries, scheduled targeted dynamic refresh") else: logger.info("Unable to schedule targeted dynamic refresh, returning existing tasks") return existing cooldown_minutes = max(1, int(os.getenv("SCRAPER_AUTO_REFRESH_COOLDOWN_MINUTES", "30"))) if last_auto_refresh_at and (datetime.utcnow() - last_auto_refresh_at).total_seconds() < cooldown_minutes * 60: logger.info("Skipping auto refresh because cooldown is active") return existing if _schedule_refresh(background_tasks, subject, include_official_archives=True): logger.info("Existing tasks are stale or incomplete, scheduled background refresh") else: logger.info("Refresh is already running, returning existing tasks") return existing def _normalize_answer(answer: str) -> str: return re.sub(r"\s+", "", answer.strip()).upper() async def _resolve_task_guid(task: Dict) -> Optional[str]: if task.get("task_guid"): return task["task_guid"] source_url = task.get("source_url", "") if not _can_check_answer(task): return None html = await _require_scraper().fetch_page(source_url) if not html: return None soup = BeautifulSoup(html, "lxml") guid_input = soup.select_one("form[id^='checkform'] input[name='guid']") return guid_input.get("value") if guid_input and guid_input.get("value") else None async def _check_task_answer(task: Dict, answer: str) -> CheckAnswerResponse: if not _can_check_answer(task): raise HTTPException(status_code=400, detail="This task does not support answer checking") normalized = _normalize_answer(answer) if not normalized: raise HTTPException(status_code=400, detail="Answer is empty") parsed = urlparse(task["source_url"]) query = parse_qs(parsed.query) project_guid = (query.get("proj") or [None])[0] task_guid = await _resolve_task_guid(task) if not project_guid or not task_guid: raise HTTPException(status_code=500, detail="Unable to resolve FIPI task metadata") solve_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path.rsplit('/', 1)[0]}/solve.php" ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE async with httpx.AsyncClient( headers=_require_scraper().headers, timeout=45.0, verify=ssl_context, follow_redirects=True, trust_env=False, ) as client: page_response = await client.get(task["source_url"]) page_response.raise_for_status() response = await client.post( solve_url, data={ "guid": task_guid, "answer": normalized, "ajax": "1", "proj": project_guid, }, headers={"Referer": task["source_url"]}, ) response.raise_for_status() if not response: raise HTTPException(status_code=502, detail="FIPI answer check failed") status_code = response.text.strip() status_map = { "0": ("not_solved", False, "Не решено"), "1": ("solved", True, "Решено"), "2": ("incorrect", False, "Неверно"), "3": ("correct", True, "Верно"), } if status_code not in status_map: raise HTTPException(status_code=502, detail=f"Unexpected FIPI response: {status_code}") status_label, is_correct, message = status_map[status_code] return CheckAnswerResponse( success=True, is_correct=is_correct, status_code=status_label, status_label=message, submitted_answer=answer, normalized_answer=normalized, message=message, ) @app.get("/api/health", response_model=HealthResponse) async def health_check() -> HealthResponse: services = { "api": True, "scraper": scraper is not None, "supabase": False, } if supabase_client: try: services["supabase"] = await supabase_client.is_available() except Exception: services["supabase"] = False all_critical_ok = services["api"] and services["scraper"] if all_critical_ok and all(services.values()): status = "healthy" elif all_critical_ok: status = "degraded" else: status = "unhealthy" return HealthResponse(status=status, timestamp=datetime.utcnow(), services=services) @app.get("/api/tasks", response_model=List[TaskResponse]) async def get_all_tasks(background_tasks: BackgroundTasks) -> List[TaskResponse]: tasks = await _ensure_tasks_available(background_tasks) return [TaskResponse(**_serialize_task(task)) for task in tasks] @app.get("/api/tasks/latest", response_model=List[TaskResponse]) async def get_latest_tasks(limit: int = 10) -> List[TaskResponse]: tasks = await _require_supabase().get_latest_tasks(limit=limit) return [TaskResponse(**_serialize_task(task)) for task in tasks] @app.get("/api/tasks/{task_id}", response_model=TaskResponse) async def get_task(task_id: int) -> TaskResponse: task = await _require_supabase().get_task_by_id(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return TaskResponse(**_serialize_task(task)) @app.post("/api/tasks/{task_id}/check-answer", response_model=CheckAnswerResponse) async def check_task_answer(task_id: int, request: CheckAnswerRequest) -> CheckAnswerResponse: task = await _require_supabase().get_task_by_id(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return await _check_task_answer(task, request.answer) @app.get("/api/tasks/type/{task_type}", response_model=List[TaskResponse]) async def get_tasks_by_type(task_type: str) -> List[TaskResponse]: tasks = await _require_supabase().get_tasks_by_type(task_type) return [TaskResponse(**_serialize_task(task)) for task in tasks] @app.get("/api/tasks/search", response_model=List[TaskResponse]) async def search_tasks(q: str) -> List[TaskResponse]: tasks = await _require_supabase().search_tasks(q) return [TaskResponse(**_serialize_task(task)) for task in tasks] @app.post("/api/scrape", response_model=ScrapeResponse) async def scrape_tasks(request: ScrapeRequest) -> ScrapeResponse: client = _require_supabase() service = _require_scraper() try: tasks_scraped = 0 tasks_saved = 0 duplicates_skipped = 0 if request.urls: for url in request.urls: html = await service.fetch_page(url) task = service.parse_task_page(html, url) if html else None if not task: continue tasks_scraped += 1 result = await client.insert_task(task) if result: tasks_saved += 1 else: duplicates_skipped += 1 elif request.query: tasks = await service.search_tasks(request.query) tasks_scraped = len(tasks) persisted = await _persist_tasks(tasks) tasks_saved = persisted["saved"] duplicates_skipped = persisted["duplicates"] else: if request.full_refresh: tasks = await service.scrape_tasks( subject=request.subject or "russian", include_official_archives=True, ) else: tasks = await service.scrape_dynamic_bank(subject=request.subject or "russian") tasks_scraped = len(tasks) persisted = await _persist_tasks(tasks) tasks_saved = persisted["saved"] duplicates_skipped = persisted["duplicates"] return ScrapeResponse( success=True, tasks_scraped=tasks_scraped, tasks_saved=tasks_saved, duplicates_skipped=duplicates_skipped, message=( f"Processed {tasks_scraped} tasks. " f"Saved: {tasks_saved}, duplicates: {duplicates_skipped}" ), ) except HTTPException: raise except Exception as exc: # pragma: no cover - endpoint guard logger.error("Scrape error: %s", exc) raise HTTPException(status_code=500, detail=f"Scrape error: {exc}") @app.get("/api/stats", response_model=StatsResponse) async def get_stats() -> StatsResponse: client = _require_supabase() stats = await client.get_stats() latest = await client.get_latest_tasks(limit=1) last_scrape = latest[0].get("scraped_at") if latest else None return StatsResponse( total_tasks=stats.get("total", 0), by_type=stats.get("by_type", {}), last_scrape=last_scrape, ) @app.get("/", tags=["root"]) async def root() -> Dict[str, str]: return { "message": "AI Scraper FIPI API proof-20260317", "version": "1.1.1-proof-20260317", "docs": "/docs", } @app.exception_handler(Exception) async def global_exception_handler(request, exc) -> JSONResponse: logger.error("Unhandled exception: %s", exc) payload = ErrorResponse( error="Internal Server Error", detail=str(exc), timestamp=datetime.utcnow(), ) return JSONResponse(status_code=500, content=payload.model_dump(mode="json")) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)