| """ |
| FastAPI app for the FIPI scraper service. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from datetime import datetime |
| import logging |
| import os |
| from pathlib import Path |
| import re |
| import ssl |
| from typing import Dict, List, Optional |
| from urllib.parse import parse_qs, urlparse |
|
|
| from bs4 import BeautifulSoup |
| from dotenv import load_dotenv |
| from fastapi import BackgroundTasks, FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| import httpx |
|
|
| from models import ( |
| CheckAnswerRequest, |
| CheckAnswerResponse, |
| ErrorResponse, |
| HealthResponse, |
| ScrapeRequest, |
| ScrapeResponse, |
| StatsResponse, |
| TaskResponse, |
| ) |
| from scraper import FIPIScraper |
|
|
| BASE_DIR = Path(__file__).resolve().parent |
| load_dotenv(BASE_DIR / ".env") |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| SUPABASE_AVAILABLE = False |
| SupabaseClient = None |
|
|
| try: |
| from supabase_client import SupabaseClient |
|
|
| SUPABASE_AVAILABLE = True |
| except ImportError as exc: |
| logger.warning("Supabase client import failed: %s", exc) |
| except Exception as exc: |
| logger.warning("Supabase client init failed: %s", exc) |
|
|
|
|
| app = FastAPI( |
| title="AI Scraper FIPI", |
| description="Collects, stores and validates FIPI tasks.", |
| version="1.1.1-proof-20260317", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| scraper: Optional[FIPIScraper] = None |
| supabase_client: Optional[SupabaseClient] = None |
| last_auto_refresh_at: Optional[datetime] = None |
| refresh_in_progress = False |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event() -> None: |
| global scraper, supabase_client |
|
|
| scraper = FIPIScraper(base_url=os.getenv("FIPI_BASE_URL", "https://fipi.ru")) |
| logger.info("FIPIScraper initialized") |
|
|
| if not SUPABASE_AVAILABLE: |
| logger.info("Supabase disabled") |
| return |
|
|
| supabase_url = os.getenv("SUPABASE_URL") |
| supabase_key = os.getenv("SUPABASE_SERVICE_KEY") |
| if not supabase_url or not supabase_key: |
| logger.warning("SUPABASE_URL or SUPABASE_SERVICE_KEY missing") |
| return |
|
|
| try: |
| client = SupabaseClient(url=supabase_url, key=supabase_key) |
| if await client.is_available(): |
| supabase_client = client |
| logger.info("Supabase client connected") |
| else: |
| logger.error("Supabase is unavailable") |
| except Exception as exc: |
| logger.error("Supabase startup error: %s", exc) |
|
|
|
|
| def _require_supabase() -> SupabaseClient: |
| if not supabase_client: |
| raise HTTPException(status_code=503, detail="Supabase is not configured") |
| return supabase_client |
|
|
|
|
| def _require_scraper() -> FIPIScraper: |
| if not scraper: |
| raise HTTPException(status_code=503, detail="Scraper is not configured") |
| return scraper |
|
|
|
|
| def _can_check_answer(task: Dict) -> bool: |
| if task.get("source_kind") == "dynamic_bank" and task.get("task_guid"): |
| return True |
|
|
| source_url = task.get("source_url", "") |
| parsed = urlparse(source_url) |
| query = parse_qs(parsed.query) |
| project_guid = (query.get("proj") or [None])[0] |
| question_id = (query.get("qid") or [None])[0] |
|
|
| return parsed.path.endswith("/questions.php") and bool(project_guid and (question_id or task.get("task_guid"))) |
|
|
|
|
| def _serialize_task(task: Dict) -> Dict: |
| payload = dict(task) |
| payload["can_check_answer"] = _can_check_answer(task) |
| return payload |
|
|
|
|
| async def _persist_tasks(tasks: List[Dict]) -> Dict[str, int]: |
| client = _require_supabase() |
| saved = 0 |
| duplicates = 0 |
| for task in tasks: |
| result = await client.insert_task(task) |
| if result: |
| saved += 1 |
| else: |
| duplicates += 1 |
| return {"saved": saved, "duplicates": duplicates} |
|
|
|
|
| async def _collect_tasks(subject: str = "russian", *, include_official_archives: bool = True) -> List[Dict]: |
| service = _require_scraper() |
| if include_official_archives: |
| return await service.scrape_tasks(subject=subject, include_official_archives=True) |
| return await service.scrape_dynamic_bank(subject=subject) |
|
|
|
|
| async def _refresh_tasks(subject: str = "russian", *, include_official_archives: bool = True) -> Dict[str, int]: |
| scraped_tasks = await _collect_tasks( |
| subject=subject, |
| include_official_archives=include_official_archives, |
| ) |
| persisted = await _persist_tasks(scraped_tasks) |
| return { |
| "scraped": len(scraped_tasks), |
| "saved": persisted["saved"], |
| "duplicates": persisted["duplicates"], |
| } |
|
|
|
|
| def _needs_task_refresh(tasks: List[Dict]) -> bool: |
| if not tasks: |
| return True |
|
|
| dynamic_count = sum(1 for task in tasks if task.get("source_kind") == "dynamic_bank") |
| checkable_count = sum(1 for task in tasks if _can_check_answer(task)) |
| minimum_total = max(10, int(os.getenv("SCRAPER_MIN_READY_TASKS", "40"))) |
|
|
| if dynamic_count == 0 or checkable_count == 0: |
| return True |
|
|
| return len(tasks) < minimum_total |
|
|
|
|
| def _needs_dynamic_bank_refresh(tasks: List[Dict]) -> bool: |
| if not tasks: |
| return True |
|
|
| dynamic_count = sum(1 for task in tasks if task.get("source_kind") == "dynamic_bank") |
| checkable_count = sum(1 for task in tasks if _can_check_answer(task)) |
| return dynamic_count == 0 or checkable_count == 0 |
|
|
|
|
| def _is_refresh_running() -> bool: |
| return refresh_in_progress |
|
|
|
|
| async def _run_refresh(subject: str = "russian", include_official_archives: bool = True) -> None: |
| global refresh_in_progress |
|
|
| try: |
| refresh_in_progress = True |
| refreshed = await _refresh_tasks( |
| subject=subject, |
| include_official_archives=include_official_archives, |
| ) |
| logger.info( |
| "Background refresh finished: scraped=%s saved=%s duplicates=%s include_archives=%s", |
| refreshed["scraped"], |
| refreshed["saved"], |
| refreshed["duplicates"], |
| include_official_archives, |
| ) |
| except Exception as exc: |
| logger.error("Background refresh failed: %s", exc) |
| finally: |
| refresh_in_progress = False |
|
|
|
|
| def _schedule_refresh( |
| background_tasks: BackgroundTasks, |
| subject: str = "russian", |
| *, |
| include_official_archives: bool = True, |
| ) -> bool: |
| global last_auto_refresh_at |
|
|
| if _is_refresh_running(): |
| return False |
|
|
| last_auto_refresh_at = datetime.utcnow() |
| background_tasks.add_task(_run_refresh, subject, include_official_archives) |
| return True |
|
|
|
|
| async def _ensure_tasks_available(background_tasks: BackgroundTasks, subject: str = "russian") -> List[Dict]: |
| global last_auto_refresh_at |
|
|
| client = _require_supabase() |
| existing = await client.get_all_tasks() |
| if existing and not _needs_task_refresh(existing): |
| return existing |
|
|
| if not existing: |
| logger.info("Tasks table is empty, running initial dynamic scrape") |
| last_auto_refresh_at = datetime.utcnow() |
| await _refresh_tasks(subject=subject, include_official_archives=False) |
| refreshed = await client.get_all_tasks() |
| if refreshed and _schedule_refresh(background_tasks, subject, include_official_archives=True): |
| logger.info("Scheduled full refresh after initial dynamic scrape") |
| return refreshed or existing |
|
|
| if _needs_dynamic_bank_refresh(existing): |
| if _is_refresh_running(): |
| logger.info("Dynamic bank refresh is already running, returning existing tasks") |
| return existing |
|
|
| if _schedule_refresh(background_tasks, subject, include_official_archives=False): |
| logger.info("Tasks are missing dynamic/checkable entries, scheduled targeted dynamic refresh") |
| else: |
| logger.info("Unable to schedule targeted dynamic refresh, returning existing tasks") |
| return existing |
|
|
| cooldown_minutes = max(1, int(os.getenv("SCRAPER_AUTO_REFRESH_COOLDOWN_MINUTES", "30"))) |
| if last_auto_refresh_at and (datetime.utcnow() - last_auto_refresh_at).total_seconds() < cooldown_minutes * 60: |
| logger.info("Skipping auto refresh because cooldown is active") |
| return existing |
|
|
| if _schedule_refresh(background_tasks, subject, include_official_archives=True): |
| logger.info("Existing tasks are stale or incomplete, scheduled background refresh") |
| else: |
| logger.info("Refresh is already running, returning existing tasks") |
|
|
| return existing |
|
|
|
|
| def _normalize_answer(answer: str) -> str: |
| return re.sub(r"\s+", "", answer.strip()).upper() |
|
|
|
|
| async def _resolve_task_guid(task: Dict) -> Optional[str]: |
| if task.get("task_guid"): |
| return task["task_guid"] |
|
|
| source_url = task.get("source_url", "") |
| if not _can_check_answer(task): |
| return None |
|
|
| html = await _require_scraper().fetch_page(source_url) |
| if not html: |
| return None |
|
|
| soup = BeautifulSoup(html, "lxml") |
| guid_input = soup.select_one("form[id^='checkform'] input[name='guid']") |
| return guid_input.get("value") if guid_input and guid_input.get("value") else None |
|
|
|
|
| async def _check_task_answer(task: Dict, answer: str) -> CheckAnswerResponse: |
| if not _can_check_answer(task): |
| raise HTTPException(status_code=400, detail="This task does not support answer checking") |
|
|
| normalized = _normalize_answer(answer) |
| if not normalized: |
| raise HTTPException(status_code=400, detail="Answer is empty") |
|
|
| parsed = urlparse(task["source_url"]) |
| query = parse_qs(parsed.query) |
| project_guid = (query.get("proj") or [None])[0] |
| task_guid = await _resolve_task_guid(task) |
|
|
| if not project_guid or not task_guid: |
| raise HTTPException(status_code=500, detail="Unable to resolve FIPI task metadata") |
|
|
| solve_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path.rsplit('/', 1)[0]}/solve.php" |
| ssl_context = ssl.create_default_context() |
| ssl_context.check_hostname = False |
| ssl_context.verify_mode = ssl.CERT_NONE |
|
|
| async with httpx.AsyncClient( |
| headers=_require_scraper().headers, |
| timeout=45.0, |
| verify=ssl_context, |
| follow_redirects=True, |
| trust_env=False, |
| ) as client: |
| page_response = await client.get(task["source_url"]) |
| page_response.raise_for_status() |
| response = await client.post( |
| solve_url, |
| data={ |
| "guid": task_guid, |
| "answer": normalized, |
| "ajax": "1", |
| "proj": project_guid, |
| }, |
| headers={"Referer": task["source_url"]}, |
| ) |
| response.raise_for_status() |
|
|
| if not response: |
| raise HTTPException(status_code=502, detail="FIPI answer check failed") |
|
|
| status_code = response.text.strip() |
| status_map = { |
| "0": ("not_solved", False, "Не решено"), |
| "1": ("solved", True, "Решено"), |
| "2": ("incorrect", False, "Неверно"), |
| "3": ("correct", True, "Верно"), |
| } |
| if status_code not in status_map: |
| raise HTTPException(status_code=502, detail=f"Unexpected FIPI response: {status_code}") |
|
|
| status_label, is_correct, message = status_map[status_code] |
| return CheckAnswerResponse( |
| success=True, |
| is_correct=is_correct, |
| status_code=status_label, |
| status_label=message, |
| submitted_answer=answer, |
| normalized_answer=normalized, |
| message=message, |
| ) |
|
|
|
|
| @app.get("/api/health", response_model=HealthResponse) |
| async def health_check() -> HealthResponse: |
| services = { |
| "api": True, |
| "scraper": scraper is not None, |
| "supabase": False, |
| } |
|
|
| if supabase_client: |
| try: |
| services["supabase"] = await supabase_client.is_available() |
| except Exception: |
| services["supabase"] = False |
|
|
| all_critical_ok = services["api"] and services["scraper"] |
| if all_critical_ok and all(services.values()): |
| status = "healthy" |
| elif all_critical_ok: |
| status = "degraded" |
| else: |
| status = "unhealthy" |
|
|
| return HealthResponse(status=status, timestamp=datetime.utcnow(), services=services) |
|
|
|
|
| @app.get("/api/tasks", response_model=List[TaskResponse]) |
| async def get_all_tasks(background_tasks: BackgroundTasks) -> List[TaskResponse]: |
| tasks = await _ensure_tasks_available(background_tasks) |
| return [TaskResponse(**_serialize_task(task)) for task in tasks] |
|
|
|
|
| @app.get("/api/tasks/latest", response_model=List[TaskResponse]) |
| async def get_latest_tasks(limit: int = 10) -> List[TaskResponse]: |
| tasks = await _require_supabase().get_latest_tasks(limit=limit) |
| return [TaskResponse(**_serialize_task(task)) for task in tasks] |
|
|
|
|
| @app.get("/api/tasks/{task_id}", response_model=TaskResponse) |
| async def get_task(task_id: int) -> TaskResponse: |
| task = await _require_supabase().get_task_by_id(task_id) |
| if not task: |
| raise HTTPException(status_code=404, detail="Task not found") |
| return TaskResponse(**_serialize_task(task)) |
|
|
|
|
| @app.post("/api/tasks/{task_id}/check-answer", response_model=CheckAnswerResponse) |
| async def check_task_answer(task_id: int, request: CheckAnswerRequest) -> CheckAnswerResponse: |
| task = await _require_supabase().get_task_by_id(task_id) |
| if not task: |
| raise HTTPException(status_code=404, detail="Task not found") |
| return await _check_task_answer(task, request.answer) |
|
|
|
|
| @app.get("/api/tasks/type/{task_type}", response_model=List[TaskResponse]) |
| async def get_tasks_by_type(task_type: str) -> List[TaskResponse]: |
| tasks = await _require_supabase().get_tasks_by_type(task_type) |
| return [TaskResponse(**_serialize_task(task)) for task in tasks] |
|
|
|
|
| @app.get("/api/tasks/search", response_model=List[TaskResponse]) |
| async def search_tasks(q: str) -> List[TaskResponse]: |
| tasks = await _require_supabase().search_tasks(q) |
| return [TaskResponse(**_serialize_task(task)) for task in tasks] |
|
|
|
|
| @app.post("/api/scrape", response_model=ScrapeResponse) |
| async def scrape_tasks(request: ScrapeRequest) -> ScrapeResponse: |
| client = _require_supabase() |
| service = _require_scraper() |
|
|
| try: |
| tasks_scraped = 0 |
| tasks_saved = 0 |
| duplicates_skipped = 0 |
|
|
| if request.urls: |
| for url in request.urls: |
| html = await service.fetch_page(url) |
| task = service.parse_task_page(html, url) if html else None |
| if not task: |
| continue |
| tasks_scraped += 1 |
| result = await client.insert_task(task) |
| if result: |
| tasks_saved += 1 |
| else: |
| duplicates_skipped += 1 |
| elif request.query: |
| tasks = await service.search_tasks(request.query) |
| tasks_scraped = len(tasks) |
| persisted = await _persist_tasks(tasks) |
| tasks_saved = persisted["saved"] |
| duplicates_skipped = persisted["duplicates"] |
| else: |
| if request.full_refresh: |
| tasks = await service.scrape_tasks( |
| subject=request.subject or "russian", |
| include_official_archives=True, |
| ) |
| else: |
| tasks = await service.scrape_dynamic_bank(subject=request.subject or "russian") |
| tasks_scraped = len(tasks) |
| persisted = await _persist_tasks(tasks) |
| tasks_saved = persisted["saved"] |
| duplicates_skipped = persisted["duplicates"] |
|
|
| return ScrapeResponse( |
| success=True, |
| tasks_scraped=tasks_scraped, |
| tasks_saved=tasks_saved, |
| duplicates_skipped=duplicates_skipped, |
| message=( |
| f"Processed {tasks_scraped} tasks. " |
| f"Saved: {tasks_saved}, duplicates: {duplicates_skipped}" |
| ), |
| ) |
| except HTTPException: |
| raise |
| except Exception as exc: |
| logger.error("Scrape error: %s", exc) |
| raise HTTPException(status_code=500, detail=f"Scrape error: {exc}") |
|
|
|
|
| @app.get("/api/stats", response_model=StatsResponse) |
| async def get_stats() -> StatsResponse: |
| client = _require_supabase() |
| stats = await client.get_stats() |
| latest = await client.get_latest_tasks(limit=1) |
| last_scrape = latest[0].get("scraped_at") if latest else None |
| return StatsResponse( |
| total_tasks=stats.get("total", 0), |
| by_type=stats.get("by_type", {}), |
| last_scrape=last_scrape, |
| ) |
|
|
|
|
| @app.get("/", tags=["root"]) |
| async def root() -> Dict[str, str]: |
| return { |
| "message": "AI Scraper FIPI API proof-20260317", |
| "version": "1.1.1-proof-20260317", |
| "docs": "/docs", |
| } |
|
|
|
|
| @app.exception_handler(Exception) |
| async def global_exception_handler(request, exc) -> JSONResponse: |
| logger.error("Unhandled exception: %s", exc) |
| payload = ErrorResponse( |
| error="Internal Server Error", |
| detail=str(exc), |
| timestamp=datetime.utcnow(), |
| ) |
| return JSONResponse(status_code=500, content=payload.model_dump(mode="json")) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|