scraper / app.py
greeta's picture
Upload app.py
bc0e1a3 verified
"""
FastAPI app for the FIPI scraper service.
"""
from __future__ import annotations
from datetime import datetime
import logging
import os
from pathlib import Path
import re
import ssl
from typing import Dict, List, Optional
from urllib.parse import parse_qs, urlparse
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import httpx
from models import (
CheckAnswerRequest,
CheckAnswerResponse,
ErrorResponse,
HealthResponse,
ScrapeRequest,
ScrapeResponse,
StatsResponse,
TaskResponse,
)
from scraper import FIPIScraper
BASE_DIR = Path(__file__).resolve().parent
load_dotenv(BASE_DIR / ".env")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SUPABASE_AVAILABLE = False
SupabaseClient = None
try:
from supabase_client import SupabaseClient
SUPABASE_AVAILABLE = True
except ImportError as exc:
logger.warning("Supabase client import failed: %s", exc)
except Exception as exc: # pragma: no cover - startup guard
logger.warning("Supabase client init failed: %s", exc)
app = FastAPI(
title="AI Scraper FIPI",
description="Collects, stores and validates FIPI tasks.",
version="1.1.1-proof-20260317",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
scraper: Optional[FIPIScraper] = None
supabase_client: Optional[SupabaseClient] = None
last_auto_refresh_at: Optional[datetime] = None
refresh_in_progress = False
@app.on_event("startup")
async def startup_event() -> None:
global scraper, supabase_client
scraper = FIPIScraper(base_url=os.getenv("FIPI_BASE_URL", "https://fipi.ru"))
logger.info("FIPIScraper initialized")
if not SUPABASE_AVAILABLE:
logger.info("Supabase disabled")
return
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
if not supabase_url or not supabase_key:
logger.warning("SUPABASE_URL or SUPABASE_SERVICE_KEY missing")
return
try:
client = SupabaseClient(url=supabase_url, key=supabase_key)
if await client.is_available():
supabase_client = client
logger.info("Supabase client connected")
else:
logger.error("Supabase is unavailable")
except Exception as exc: # pragma: no cover - startup guard
logger.error("Supabase startup error: %s", exc)
def _require_supabase() -> SupabaseClient:
if not supabase_client:
raise HTTPException(status_code=503, detail="Supabase is not configured")
return supabase_client
def _require_scraper() -> FIPIScraper:
if not scraper:
raise HTTPException(status_code=503, detail="Scraper is not configured")
return scraper
def _can_check_answer(task: Dict) -> bool:
if task.get("source_kind") == "dynamic_bank" and task.get("task_guid"):
return True
source_url = task.get("source_url", "")
parsed = urlparse(source_url)
query = parse_qs(parsed.query)
project_guid = (query.get("proj") or [None])[0]
question_id = (query.get("qid") or [None])[0]
return parsed.path.endswith("/questions.php") and bool(project_guid and (question_id or task.get("task_guid")))
def _serialize_task(task: Dict) -> Dict:
payload = dict(task)
payload["can_check_answer"] = _can_check_answer(task)
return payload
async def _persist_tasks(tasks: List[Dict]) -> Dict[str, int]:
client = _require_supabase()
saved = 0
duplicates = 0
for task in tasks:
result = await client.insert_task(task)
if result:
saved += 1
else:
duplicates += 1
return {"saved": saved, "duplicates": duplicates}
async def _collect_tasks(subject: str = "russian", *, include_official_archives: bool = True) -> List[Dict]:
service = _require_scraper()
if include_official_archives:
return await service.scrape_tasks(subject=subject, include_official_archives=True)
return await service.scrape_dynamic_bank(subject=subject)
async def _refresh_tasks(subject: str = "russian", *, include_official_archives: bool = True) -> Dict[str, int]:
scraped_tasks = await _collect_tasks(
subject=subject,
include_official_archives=include_official_archives,
)
persisted = await _persist_tasks(scraped_tasks)
return {
"scraped": len(scraped_tasks),
"saved": persisted["saved"],
"duplicates": persisted["duplicates"],
}
def _needs_task_refresh(tasks: List[Dict]) -> bool:
if not tasks:
return True
dynamic_count = sum(1 for task in tasks if task.get("source_kind") == "dynamic_bank")
checkable_count = sum(1 for task in tasks if _can_check_answer(task))
minimum_total = max(10, int(os.getenv("SCRAPER_MIN_READY_TASKS", "40")))
if dynamic_count == 0 or checkable_count == 0:
return True
return len(tasks) < minimum_total
def _needs_dynamic_bank_refresh(tasks: List[Dict]) -> bool:
if not tasks:
return True
dynamic_count = sum(1 for task in tasks if task.get("source_kind") == "dynamic_bank")
checkable_count = sum(1 for task in tasks if _can_check_answer(task))
return dynamic_count == 0 or checkable_count == 0
def _is_refresh_running() -> bool:
return refresh_in_progress
async def _run_refresh(subject: str = "russian", include_official_archives: bool = True) -> None:
global refresh_in_progress
try:
refresh_in_progress = True
refreshed = await _refresh_tasks(
subject=subject,
include_official_archives=include_official_archives,
)
logger.info(
"Background refresh finished: scraped=%s saved=%s duplicates=%s include_archives=%s",
refreshed["scraped"],
refreshed["saved"],
refreshed["duplicates"],
include_official_archives,
)
except Exception as exc: # pragma: no cover - background guard
logger.error("Background refresh failed: %s", exc)
finally:
refresh_in_progress = False
def _schedule_refresh(
background_tasks: BackgroundTasks,
subject: str = "russian",
*,
include_official_archives: bool = True,
) -> bool:
global last_auto_refresh_at
if _is_refresh_running():
return False
last_auto_refresh_at = datetime.utcnow()
background_tasks.add_task(_run_refresh, subject, include_official_archives)
return True
async def _ensure_tasks_available(background_tasks: BackgroundTasks, subject: str = "russian") -> List[Dict]:
global last_auto_refresh_at
client = _require_supabase()
existing = await client.get_all_tasks()
if existing and not _needs_task_refresh(existing):
return existing
if not existing:
logger.info("Tasks table is empty, running initial dynamic scrape")
last_auto_refresh_at = datetime.utcnow()
await _refresh_tasks(subject=subject, include_official_archives=False)
refreshed = await client.get_all_tasks()
if refreshed and _schedule_refresh(background_tasks, subject, include_official_archives=True):
logger.info("Scheduled full refresh after initial dynamic scrape")
return refreshed or existing
if _needs_dynamic_bank_refresh(existing):
if _is_refresh_running():
logger.info("Dynamic bank refresh is already running, returning existing tasks")
return existing
if _schedule_refresh(background_tasks, subject, include_official_archives=False):
logger.info("Tasks are missing dynamic/checkable entries, scheduled targeted dynamic refresh")
else:
logger.info("Unable to schedule targeted dynamic refresh, returning existing tasks")
return existing
cooldown_minutes = max(1, int(os.getenv("SCRAPER_AUTO_REFRESH_COOLDOWN_MINUTES", "30")))
if last_auto_refresh_at and (datetime.utcnow() - last_auto_refresh_at).total_seconds() < cooldown_minutes * 60:
logger.info("Skipping auto refresh because cooldown is active")
return existing
if _schedule_refresh(background_tasks, subject, include_official_archives=True):
logger.info("Existing tasks are stale or incomplete, scheduled background refresh")
else:
logger.info("Refresh is already running, returning existing tasks")
return existing
def _normalize_answer(answer: str) -> str:
return re.sub(r"\s+", "", answer.strip()).upper()
async def _resolve_task_guid(task: Dict) -> Optional[str]:
if task.get("task_guid"):
return task["task_guid"]
source_url = task.get("source_url", "")
if not _can_check_answer(task):
return None
html = await _require_scraper().fetch_page(source_url)
if not html:
return None
soup = BeautifulSoup(html, "lxml")
guid_input = soup.select_one("form[id^='checkform'] input[name='guid']")
return guid_input.get("value") if guid_input and guid_input.get("value") else None
async def _check_task_answer(task: Dict, answer: str) -> CheckAnswerResponse:
if not _can_check_answer(task):
raise HTTPException(status_code=400, detail="This task does not support answer checking")
normalized = _normalize_answer(answer)
if not normalized:
raise HTTPException(status_code=400, detail="Answer is empty")
parsed = urlparse(task["source_url"])
query = parse_qs(parsed.query)
project_guid = (query.get("proj") or [None])[0]
task_guid = await _resolve_task_guid(task)
if not project_guid or not task_guid:
raise HTTPException(status_code=500, detail="Unable to resolve FIPI task metadata")
solve_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path.rsplit('/', 1)[0]}/solve.php"
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with httpx.AsyncClient(
headers=_require_scraper().headers,
timeout=45.0,
verify=ssl_context,
follow_redirects=True,
trust_env=False,
) as client:
page_response = await client.get(task["source_url"])
page_response.raise_for_status()
response = await client.post(
solve_url,
data={
"guid": task_guid,
"answer": normalized,
"ajax": "1",
"proj": project_guid,
},
headers={"Referer": task["source_url"]},
)
response.raise_for_status()
if not response:
raise HTTPException(status_code=502, detail="FIPI answer check failed")
status_code = response.text.strip()
status_map = {
"0": ("not_solved", False, "Не решено"),
"1": ("solved", True, "Решено"),
"2": ("incorrect", False, "Неверно"),
"3": ("correct", True, "Верно"),
}
if status_code not in status_map:
raise HTTPException(status_code=502, detail=f"Unexpected FIPI response: {status_code}")
status_label, is_correct, message = status_map[status_code]
return CheckAnswerResponse(
success=True,
is_correct=is_correct,
status_code=status_label,
status_label=message,
submitted_answer=answer,
normalized_answer=normalized,
message=message,
)
@app.get("/api/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
services = {
"api": True,
"scraper": scraper is not None,
"supabase": False,
}
if supabase_client:
try:
services["supabase"] = await supabase_client.is_available()
except Exception:
services["supabase"] = False
all_critical_ok = services["api"] and services["scraper"]
if all_critical_ok and all(services.values()):
status = "healthy"
elif all_critical_ok:
status = "degraded"
else:
status = "unhealthy"
return HealthResponse(status=status, timestamp=datetime.utcnow(), services=services)
@app.get("/api/tasks", response_model=List[TaskResponse])
async def get_all_tasks(background_tasks: BackgroundTasks) -> List[TaskResponse]:
tasks = await _ensure_tasks_available(background_tasks)
return [TaskResponse(**_serialize_task(task)) for task in tasks]
@app.get("/api/tasks/latest", response_model=List[TaskResponse])
async def get_latest_tasks(limit: int = 10) -> List[TaskResponse]:
tasks = await _require_supabase().get_latest_tasks(limit=limit)
return [TaskResponse(**_serialize_task(task)) for task in tasks]
@app.get("/api/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: int) -> TaskResponse:
task = await _require_supabase().get_task_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return TaskResponse(**_serialize_task(task))
@app.post("/api/tasks/{task_id}/check-answer", response_model=CheckAnswerResponse)
async def check_task_answer(task_id: int, request: CheckAnswerRequest) -> CheckAnswerResponse:
task = await _require_supabase().get_task_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return await _check_task_answer(task, request.answer)
@app.get("/api/tasks/type/{task_type}", response_model=List[TaskResponse])
async def get_tasks_by_type(task_type: str) -> List[TaskResponse]:
tasks = await _require_supabase().get_tasks_by_type(task_type)
return [TaskResponse(**_serialize_task(task)) for task in tasks]
@app.get("/api/tasks/search", response_model=List[TaskResponse])
async def search_tasks(q: str) -> List[TaskResponse]:
tasks = await _require_supabase().search_tasks(q)
return [TaskResponse(**_serialize_task(task)) for task in tasks]
@app.post("/api/scrape", response_model=ScrapeResponse)
async def scrape_tasks(request: ScrapeRequest) -> ScrapeResponse:
client = _require_supabase()
service = _require_scraper()
try:
tasks_scraped = 0
tasks_saved = 0
duplicates_skipped = 0
if request.urls:
for url in request.urls:
html = await service.fetch_page(url)
task = service.parse_task_page(html, url) if html else None
if not task:
continue
tasks_scraped += 1
result = await client.insert_task(task)
if result:
tasks_saved += 1
else:
duplicates_skipped += 1
elif request.query:
tasks = await service.search_tasks(request.query)
tasks_scraped = len(tasks)
persisted = await _persist_tasks(tasks)
tasks_saved = persisted["saved"]
duplicates_skipped = persisted["duplicates"]
else:
if request.full_refresh:
tasks = await service.scrape_tasks(
subject=request.subject or "russian",
include_official_archives=True,
)
else:
tasks = await service.scrape_dynamic_bank(subject=request.subject or "russian")
tasks_scraped = len(tasks)
persisted = await _persist_tasks(tasks)
tasks_saved = persisted["saved"]
duplicates_skipped = persisted["duplicates"]
return ScrapeResponse(
success=True,
tasks_scraped=tasks_scraped,
tasks_saved=tasks_saved,
duplicates_skipped=duplicates_skipped,
message=(
f"Processed {tasks_scraped} tasks. "
f"Saved: {tasks_saved}, duplicates: {duplicates_skipped}"
),
)
except HTTPException:
raise
except Exception as exc: # pragma: no cover - endpoint guard
logger.error("Scrape error: %s", exc)
raise HTTPException(status_code=500, detail=f"Scrape error: {exc}")
@app.get("/api/stats", response_model=StatsResponse)
async def get_stats() -> StatsResponse:
client = _require_supabase()
stats = await client.get_stats()
latest = await client.get_latest_tasks(limit=1)
last_scrape = latest[0].get("scraped_at") if latest else None
return StatsResponse(
total_tasks=stats.get("total", 0),
by_type=stats.get("by_type", {}),
last_scrape=last_scrape,
)
@app.get("/", tags=["root"])
async def root() -> Dict[str, str]:
return {
"message": "AI Scraper FIPI API proof-20260317",
"version": "1.1.1-proof-20260317",
"docs": "/docs",
}
@app.exception_handler(Exception)
async def global_exception_handler(request, exc) -> JSONResponse:
logger.error("Unhandled exception: %s", exc)
payload = ErrorResponse(
error="Internal Server Error",
detail=str(exc),
timestamp=datetime.utcnow(),
)
return JSONResponse(status_code=500, content=payload.model_dump(mode="json"))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)