| """ |
| FastAPI Application with Swagger Documentation |
| |
| UNESCO Metadata Pipeline API |
| - Automatic Swagger UI at /docs |
| - ReDoc at /redoc |
| - OpenAPI schema at /openapi.json |
| """ |
|
|
| import ipaddress |
| import logging |
| import os |
| import socket |
| from contextlib import asynccontextmanager |
| from datetime import datetime, timezone |
| from typing import Optional, Dict, Any, List |
| from urllib.parse import urlparse |
|
|
| from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks, Query |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse, HTMLResponse |
|
|
| from src.api.models import ( |
| DocumentSubmission, |
| BatchSubmission, |
| ProcessingResult, |
| DCATResult, |
| BatchResult, |
| HealthCheck, |
| ErrorResponse, |
| ProcessingStatus |
| ) |
| from src.api.auth import verify_api_key, verify_api_key_optional, APIKeyInfo, setup_demo_key |
| from src.api.storage import get_storage, LocalJSONStorage |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| API_TITLE = "UNESCO Metadata Pipeline API" |
| API_DESCRIPTION = """ |
| ## Multi-Lingual Metadata Extraction Pipeline |
| |
| Extracts DCAT-AP 3.0 compliant metadata from UNESCO documents using: |
| - **GLiNER2**: Entity extraction & SDG classification |
| - **Graph RAG**: UNESCO Thesaurus grounding |
| - **GGUF**: Quantized LLM for formatting |
| |
| ### Authentication |
| All endpoints require API key authentication via `X-API-Key` header. |
| |
| ``` |
| X-API-Key: demo-key-12345 |
| ``` |
| |
| ### Processing Flow |
| 1. Submit document β `POST /api/v1/process` |
| 2. Check status β `GET /api/v1/status/{document_id}` |
| 3. Retrieve result β `GET /api/v1/result/{document_id}` |
| |
| ### Webhooks (Future) |
| Configure `webhook_url` in submission to receive completion notifications. |
| """ |
|
|
| API_VERSION = "2.0.0" |
|
|
| |
|
|
| _PRIVATE_NETS = [ |
| ipaddress.ip_network(n) for n in ( |
| "127.0.0.0/8", "10.0.0.0/8", "172.16.0.0/12", |
| "192.168.0.0/16", "169.254.0.0/16", "::1/128", "fc00::/7", |
| ) |
| ] |
|
|
|
|
| def _is_ssrf_safe(url: str) -> bool: |
| """Return False if the URL resolves to a private/loopback address (SSRF guard).""" |
| host = urlparse(url).hostname or "" |
| try: |
| addr = ipaddress.ip_address(socket.gethostbyname(host)) |
| return not any(addr in net for net in _PRIVATE_NETS) |
| except Exception: |
| return False |
|
|
|
|
| |
|
|
| import asyncio as _asyncio |
|
|
| _MAX_CONCURRENT = int(os.getenv("MAX_CONCURRENT_PIPELINES", "3")) |
| _PIPELINE_TIMEOUT = float(os.getenv("PIPELINE_TIMEOUT_SECONDS", str(20 * 60))) |
| _PIPELINE_SEMAPHORE: Optional[_asyncio.Semaphore] = None |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Application lifespan events.""" |
| |
| global _PIPELINE_SEMAPHORE |
| logger.info(f"Starting {API_TITLE} v{API_VERSION}") |
| setup_demo_key() |
| _PIPELINE_SEMAPHORE = _asyncio.Semaphore(_MAX_CONCURRENT) |
| from src.utils.country_normalizer import get_geo_thesaurus_index, get_country_normalizer |
| get_geo_thesaurus_index() |
| get_country_normalizer() |
| yield |
| |
| logger.info(f"Shutting down {API_TITLE}") |
|
|
|
|
| |
| import os |
| is_production = os.getenv("UNESDOC_PIPELINE_ENV") == "production" |
|
|
| |
| docs_url = "/docs" if not is_production else None |
| redoc_url = "/redoc" if not is_production else None |
| openapi_url = "/openapi.json" |
|
|
| |
| app = FastAPI( |
| title=API_TITLE, |
| description=API_DESCRIPTION, |
| version=API_VERSION, |
| lifespan=lifespan, |
| |
| |
| docs_url=docs_url, |
| redoc_url=redoc_url, |
| openapi_url=openapi_url, |
| |
| |
| contact={ |
| "name": "UNESCO Metadata Team", |
| "url": "https://github.com/unesco/metadata-pipeline" |
| }, |
| |
| |
| license_info={ |
| "name": "Apache 2.0", |
| "url": "https://www.apache.org/licenses/LICENSE-2.0" |
| } |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| @app.exception_handler(Exception) |
| async def generic_exception_handler(request, exc): |
| """Handle generic exceptions.""" |
| logger.error(f"Unhandled exception: {exc}", exc_info=True) |
| return JSONResponse( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| content=ErrorResponse( |
| error="Internal server error", |
| detail=str(exc) |
| ).model_dump() |
| ) |
|
|
|
|
| |
| @app.get("/", include_in_schema=False) |
| async def landing_page(): |
| """UNESCO-styled HTML landing page.""" |
| docs_href = "/docs" if not is_production else "/openapi.json" |
| docs_label = "Open API Docs (Swagger)" if not is_production else "OpenAPI Schema" |
| html = f"""<!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>UNESCO Metadata Pipeline</title> |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link href="https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap" rel="stylesheet"> |
| <style> |
| *{{box-sizing:border-box;margin:0;padding:0}} |
| body{{font-family:'Inter',sans-serif;background:#F1F4F6;color:#212121}} |
| header{{background:linear-gradient(135deg,#0E4280 0%,#0077D4 100%);color:#fff;padding:52px 32px;text-align:center}} |
| .logo-badge{{display:inline-block;background:rgba(255,255,255,.15);border:2px solid rgba(255,255,255,.3);border-radius:8px;padding:7px 18px;font-size:11px;font-weight:700;letter-spacing:2.5px;text-transform:uppercase;margin-bottom:18px}} |
| header h1{{font-size:2.1rem;font-weight:700;margin-bottom:12px;line-height:1.2}} |
| header p{{font-size:1rem;opacity:.87;max-width:560px;margin:0 auto 28px;line-height:1.65}} |
| .btn{{display:inline-block;padding:12px 26px;border-radius:6px;font-size:.9rem;font-weight:600;text-decoration:none;transition:all .2s;cursor:pointer}} |
| .btn-white{{background:#fff;color:#0077D4}}.btn-white:hover{{background:#B2D6F2}} |
| .btn-outline{{background:transparent;color:#fff;border:1.5px solid rgba(255,255,255,.55);margin-left:12px}}.btn-outline:hover{{background:rgba(255,255,255,.12)}} |
| main{{max-width:960px;margin:0 auto;padding:48px 24px 32px}} |
| .badges{{display:flex;flex-wrap:wrap;gap:8px;justify-content:center;margin-bottom:44px}} |
| .badge{{background:#fff;border:1px solid #D5DADD;border-radius:20px;padding:4px 14px;font-size:.78rem;color:#4C5054;font-weight:500}} |
| .badge-blue{{background:#B2D6F2;border-color:#0077D4;color:#0E4280}} |
| .section{{margin-bottom:40px}} |
| .section h2{{font-size:1rem;font-weight:700;color:#0E4280;margin-bottom:14px;padding-bottom:8px;border-bottom:2px solid #B2D6F2;text-transform:uppercase;letter-spacing:.8px}} |
| .pipeline{{display:flex;flex-wrap:wrap;background:#fff;border:1px solid #D5DADD;border-radius:8px;overflow:hidden}} |
| .step{{flex:1;min-width:110px;padding:18px 10px;text-align:center;border-right:1px solid #D5DADD}} |
| .step:last-child{{border-right:none}} |
| .step-num{{width:30px;height:30px;background:#0077D4;color:#fff;border-radius:50%;display:flex;align-items:center;justify-content:center;font-size:.75rem;font-weight:700;margin:0 auto 8px}} |
| .step-label{{font-size:.77rem;font-weight:600;color:#212121;margin-bottom:2px}} |
| .step-sub{{font-size:.67rem;color:#7F888F}} |
| .grid{{display:grid;grid-template-columns:repeat(auto-fill,minmax(280px,1fr));gap:16px}} |
| .card{{background:#fff;border-radius:8px;border:1px solid #D5DADD;padding:20px;transition:box-shadow .2s,border-color .2s}} |
| .card:hover{{box-shadow:0 4px 14px rgba(0,119,212,.13);border-color:#0077D4}} |
| .method{{display:inline-block;padding:2px 8px;border-radius:4px;font-size:.68rem;font-weight:700;letter-spacing:.4px;margin-bottom:8px}} |
| .post{{background:#4FB293;color:#fff}}.get{{background:#0077D4;color:#fff}} |
| .path{{font-family:monospace;font-size:.85rem;color:#115A9E;margin-bottom:6px;font-weight:500}} |
| .desc{{font-size:.82rem;color:#4C5054;line-height:1.55}} |
| .auth-box{{background:#fff;border:1px solid #D5DADD;border-left:4px solid #0077D4;border-radius:0 6px 6px 0;padding:16px 20px;font-size:.87rem;line-height:1.7}} |
| code{{background:#0E4280;color:#B2D6F2;padding:2px 7px;border-radius:4px;font-family:monospace;font-size:.84em}} |
| footer{{text-align:center;padding:28px;font-size:.78rem;color:#7F888F;border-top:1px solid #D5DADD}} |
| footer a{{color:#0077D4;text-decoration:none}} |
| @media(max-width:600px){{header h1{{font-size:1.5rem}}.btn-outline{{margin-left:0;margin-top:10px}}}} |
| </style> |
| </head> |
| <body> |
| <header> |
| <div class="logo-badge">ποΈ UNESCO</div> |
| <h1>Metadata Pipeline API</h1> |
| <p>Multi-lingual DCAT-AP 3.0 metadata extraction from UNESCO documents β powered by GLiNER2, Graph RAG & quantized LLMs.</p> |
| <a href="{docs_href}" class="btn btn-white">{docs_label}</a> |
| <a href="/health" class="btn btn-outline">Health Check</a> |
| </header> |
| |
| <main> |
| <div class="badges"> |
| <span class="badge badge-blue">DCAT-AP 3.0</span> |
| <span class="badge badge-blue">SDG Classification</span> |
| <span class="badge">5 Languages</span> |
| <span class="badge">UNESCO Thesaurus</span> |
| <span class="badge">GLiNER2</span> |
| <span class="badge">Graph RAG</span> |
| <span class="badge">Apache 2.0</span> |
| </div> |
| |
| <div class="section"> |
| <h2>Processing Pipeline</h2> |
| <div class="pipeline"> |
| <div class="step"><div class="step-num">1</div><div class="step-label">Parsing</div><div class="step-sub">PDF / text / URL</div></div> |
| <div class="step"><div class="step-num">2</div><div class="step-label">Extraction</div><div class="step-sub">GLiNER2 NER</div></div> |
| <div class="step"><div class="step-num">3</div><div class="step-label">Grounding</div><div class="step-sub">Graph RAG CoE</div></div> |
| <div class="step"><div class="step-num">4</div><div class="step-label">Validation</div><div class="step-sub">Thesaurus URIs</div></div> |
| <div class="step"><div class="step-num">5</div><div class="step-label">Aggregation</div><div class="step-sub">Hi-Transformer</div></div> |
| <div class="step"><div class="step-num">6</div><div class="step-label">Formatting</div><div class="step-sub">LLM β JSON-LD</div></div> |
| <div class="step"><div class="step-num">7</div><div class="step-label">Validation</div><div class="step-sub">Threshold guard</div></div> |
| </div> |
| </div> |
| |
| <div class="section"> |
| <h2>Key Endpoints</h2> |
| <div class="grid"> |
| <div class="card"><span class="method post">POST</span><div class="path">/api/v1/process</div><div class="desc">Submit a document (PDF, UNESDOC ID, raw text, or URL) for metadata extraction.</div></div> |
| <div class="card"><span class="method get">GET</span><div class="path">/api/v1/status/{{id}}</div><div class="desc">Check processing status with stage and progress percentage.</div></div> |
| <div class="card"><span class="method get">GET</span><div class="path">/api/v1/result/{{id}}</div><div class="desc">Retrieve the complete DCAT-AP 3.0 JSON-LD metadata output.</div></div> |
| <div class="card"><span class="method post">POST</span><div class="path">/api/v1/batch</div><div class="desc">Submit up to 100 documents in a single batch request.</div></div> |
| <div class="card"><span class="method get">GET</span><div class="path">/api/v1/documents</div><div class="desc">List processed documents with filtering by country, year, or region.</div></div> |
| <div class="card"><span class="method get">GET</span><div class="path">/health</div><div class="desc">Public health check β returns service status and version. No auth required.</div></div> |
| </div> |
| </div> |
| |
| <div class="section"> |
| <h2>Authentication</h2> |
| <div class="auth-box"> |
| All endpoints except <code>/health</code> require an API key via the <code>X-API-Key</code> request header.<br> |
| Example: <code>curl -H "X-API-Key: your-key" https://<space-url>/api/v1/status/test</code> |
| </div> |
| </div> |
| </main> |
| |
| <footer> |
| UNESCO Metadata Pipeline v{API_VERSION} Β· |
| <a href="https://huggingface.co/fastino/gliner2-base-v1">GLiNER2 model</a> Β· |
| <a href="https://huggingface.co/Qwen/Qwen3.5-2B">Qwen3.5-2B</a> Β· |
| Apache 2.0 |
| </footer> |
| </body> |
| </html>""" |
| return HTMLResponse(content=html) |
|
|
|
|
| |
| @app.get( |
| "/health", |
| response_model=HealthCheck, |
| summary="Health check", |
| description="Check API health status. Public endpoint, no authentication required.", |
| tags=["Health"] |
| ) |
| async def health_check(): |
| """Get API health status.""" |
| return HealthCheck( |
| status="healthy", |
| version=API_VERSION, |
| timestamp=datetime.now(timezone.utc), |
| components={ |
| "api": "ok", |
| "storage": "ok", |
| "pipeline": "ready" |
| } |
| ) |
|
|
|
|
| |
| @app.post( |
| "/api/v1/process", |
| response_model=ProcessingResult, |
| summary="Submit document for processing", |
| description="Submit a single document for metadata extraction.", |
| tags=["Documents"], |
| responses={ |
| 401: {"model": ErrorResponse, "description": "Invalid API key"}, |
| 422: {"model": ErrorResponse, "description": "Validation error"} |
| } |
| ) |
| async def process_document( |
| submission: DocumentSubmission, |
| background_tasks: BackgroundTasks, |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """ |
| Submit a document for processing. |
| |
| The document will be processed through: |
| 1. PDF parsing |
| 2. Entity extraction (GLiNER2) |
| 3. Knowledge graph grounding |
| 4. DCAT-AP formatting |
| |
| Returns immediately with status. Use `/status/{document_id}` to check progress. |
| """ |
| logger.info(f"Processing request for {submission.document_id} (API key: {api_key.name})") |
|
|
| |
| _MAX_FILE_CONTENT = 67_000_000 |
| if submission.file_content and len(submission.file_content) > _MAX_FILE_CONTENT: |
| raise HTTPException( |
| status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, |
| detail="PDF exceeds the 50 MB limit. Use file_url to provide a download link instead.", |
| ) |
|
|
| |
| existing = storage.get_status(submission.document_id) |
| if existing: |
| current_status = existing.get("status") |
| |
| if current_status not in [ProcessingStatus.FAILED.value]: |
| raise HTTPException( |
| status_code=status.HTTP_409_CONFLICT, |
| detail=f"Document {submission.document_id} already exists with status: {current_status}" |
| ) |
| |
| |
| storage.save_status( |
| document_id=submission.document_id, |
| status=ProcessingStatus.PENDING, |
| stage="queued", |
| progress=0, |
| metadata={ |
| "languages": submission.languages, |
| "document_family": submission.document_family, |
| "priority": submission.priority, |
| "webhook_url": str(submission.webhook_url) if submission.webhook_url else None |
| } |
| ) |
| |
| |
| background_tasks.add_task( |
| process_document_background, |
| submission, |
| storage |
| ) |
| |
| return ProcessingResult( |
| document_id=submission.document_id, |
| status=ProcessingStatus.PENDING, |
| created_at=datetime.now(timezone.utc), |
| updated_at=datetime.now(timezone.utc), |
| current_stage="queued", |
| progress_percent=0 |
| ) |
|
|
|
|
| @app.post( |
| "/api/v1/batch", |
| response_model=BatchResult, |
| summary="Submit batch of documents", |
| description="Submit multiple documents for batch processing.", |
| tags=["Documents"], |
| responses={ |
| 401: {"model": ErrorResponse, "description": "Invalid API key"} |
| } |
| ) |
| async def process_batch( |
| batch: BatchSubmission, |
| background_tasks: BackgroundTasks, |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """ |
| Submit a batch of documents for processing. |
| |
| Maximum 100 documents per batch. |
| """ |
| logger.info(f"Batch submission: {len(batch.documents)} documents (API key: {api_key.name})") |
| |
| batch_id = f"batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}" |
| doc_ids = [doc.document_id for doc in batch.documents] |
| |
| |
| for doc in batch.documents: |
| storage.save_status( |
| document_id=doc.document_id, |
| status=ProcessingStatus.PENDING, |
| metadata={"batch_id": batch_id} |
| ) |
| background_tasks.add_task(process_document_background, doc, storage) |
| |
| return BatchResult( |
| batch_id=batch_id, |
| total_documents=len(batch.documents), |
| submitted_documents=doc_ids, |
| status="submitted" |
| ) |
|
|
|
|
| @app.get( |
| "/api/v1/status/{document_id}", |
| response_model=ProcessingResult, |
| summary="Get processing status", |
| description="Check the current processing status of a document.", |
| tags=["Documents"], |
| responses={ |
| 401: {"model": ErrorResponse, "description": "Invalid API key"}, |
| 404: {"model": ErrorResponse, "description": "Document not found"} |
| } |
| ) |
| async def get_status( |
| document_id: str, |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """ |
| Get the processing status of a document. |
| |
| Returns current stage, progress percentage, and any error messages. |
| """ |
| status_data = storage.get_status(document_id) |
| |
| if not status_data: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Document {document_id} not found" |
| ) |
| |
| return ProcessingResult( |
| document_id=document_id, |
| status=ProcessingStatus(status_data.get("status", "pending")), |
| created_at=datetime.fromisoformat(status_data.get("created_at", datetime.now(timezone.utc).isoformat())), |
| updated_at=datetime.fromisoformat(status_data.get("updated_at", datetime.now(timezone.utc).isoformat())), |
| current_stage=status_data.get("stage"), |
| progress_percent=status_data.get("progress", 0), |
| error_message=status_data.get("error_message") |
| ) |
|
|
|
|
| @app.get( |
| "/api/v1/result/{document_id}", |
| response_model=DCATResult, |
| summary="Get processing result", |
| description="Retrieve the DCAT-AP formatted result for a completed document.", |
| tags=["Documents"], |
| responses={ |
| 401: {"model": ErrorResponse, "description": "Invalid API key"}, |
| 404: {"model": ErrorResponse, "description": "Result not found"}, |
| 409: {"model": ErrorResponse, "description": "Processing not complete"} |
| } |
| ) |
| async def get_result( |
| document_id: str, |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """ |
| Get the DCAT-AP formatted result for a document. |
| |
| Only available after processing is complete. |
| """ |
| |
| status_data = storage.get_status(document_id) |
| |
| if not status_data: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Document {document_id} not found" |
| ) |
| |
| if status_data.get("status") != ProcessingStatus.COMPLETED.value: |
| raise HTTPException( |
| status_code=status.HTTP_409_CONFLICT, |
| detail=f"Document {document_id} processing not complete. Current status: {status_data.get('status')}" |
| ) |
| |
| |
| result = storage.get_result(document_id) |
| |
| if not result: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Result for {document_id} not found" |
| ) |
| |
| return result |
|
|
|
|
| @app.get( |
| "/api/v1/documents", |
| summary="List documents", |
| description="List all documents with optional filtering by country, year, and region.", |
| tags=["Documents"] |
| ) |
| async def list_documents( |
| status: ProcessingStatus = None, |
| country: Optional[str] = Query( |
| None, |
| description="Filter by ISO3 code (e.g., FRA, USA, BRA)", |
| min_length=3, |
| max_length=3, |
| pattern=r"^[A-Z]{3}$", |
| examples={"france": {"summary": "France", "value": "FRA"}} |
| ), |
| year: Optional[int] = Query( |
| None, |
| description="Filter by year (e.g., 2024)", |
| ge=1945, |
| le=2100, |
| examples={"2024": {"summary": "Year 2024", "value": 2024}} |
| ), |
| region: Optional[str] = Query( |
| None, |
| description="Filter by UNESCO region (e.g., Africa, Europe)" |
| ), |
| limit: int = Query(100, ge=1, le=1000), |
| offset: int = Query(0, ge=0), |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """ |
| List documents with optional filtering. |
| |
| ## Filtering Options |
| - **country**: Filter by ISO3 code (e.g., "FRA" for France) |
| - **year**: Filter by document year (e.g., 2024) |
| - **region**: Filter by UNESCO region (e.g., "Africa", "Europe") |
| - **status**: Filter by processing status |
| |
| ## Pagination |
| - **limit**: Maximum results to return (1-1000) |
| - **offset**: Number of results to skip |
| |
| ## Examples |
| - `/api/v1/documents?country=FRA` - Documents related to France |
| - `/api/v1/documents?year=2024` - Documents from 2024 |
| - `/api/v1/documents?region=Africa` - Documents about Africa |
| - `/api/v1/documents?country=USA&year=2024` - US documents from 2024 |
| """ |
| |
| documents = storage.list_documents(status=status, limit=1000, offset=0) |
| |
| |
| if country: |
| country_upper = country.upper() |
| documents = [ |
| d for d in documents |
| if _document_has_country(d, country_upper) |
| ] |
| |
| |
| if year: |
| documents = [ |
| d for d in documents |
| if _document_has_year(d, year) |
| ] |
| |
| |
| if region: |
| region_lower = region.lower() |
| documents = [ |
| d for d in documents |
| if _document_has_region(d, region_lower) |
| ] |
| |
| |
| total = len(documents) |
| documents = documents[offset:offset + limit] |
| |
| return { |
| "documents": documents, |
| "total": total, |
| "limit": limit, |
| "offset": offset, |
| "filters_applied": { |
| "country": country, |
| "year": year, |
| "region": region, |
| "status": status.value if status else None |
| } |
| } |
|
|
|
|
| def _document_has_country(document: Dict, iso3: str) -> bool: |
| """Check if document has a specific country by ISO3 code.""" |
| |
| geo = document.get("geographical_coverage", {}) |
| if iso3 in geo.get("iso3_codes", []): |
| return True |
| |
| |
| for country in geo.get("countries", []): |
| if country.get("iso3") == iso3: |
| return True |
| |
| return False |
|
|
|
|
| def _document_has_year(document: Dict, year: int) -> bool: |
| """Check if document has a specific year.""" |
| |
| time_cov = document.get("time_coverage", {}) |
| if time_cov.get("year") == year: |
| return True |
| if year in time_cov.get("years_mentioned", []): |
| return True |
| |
| |
| temp = document.get("temporal_coverage", {}) |
| if temp.get("year") == year: |
| return True |
| |
| |
| result = document.get("result", {}) |
| if result.get("year") == year: |
| return True |
| |
| return False |
|
|
|
|
| def _document_has_region(document: Dict, region: str) -> bool: |
| """Check if document has a specific region.""" |
| |
| geo = document.get("geographical_coverage", {}) |
| for r in geo.get("regions", []): |
| region_name = r.get("name", "") if isinstance(r, dict) else str(r) |
| if region in region_name.lower(): |
| return True |
| |
| |
| for r in document.get("unesco_regions", []): |
| if isinstance(r, str) and region in r.lower(): |
| return True |
| |
| return False |
|
|
|
|
| @app.delete( |
| "/api/v1/documents/{document_id}", |
| summary="Delete document", |
| description="Delete a document and all its results.", |
| tags=["Documents"] |
| ) |
| async def delete_document( |
| document_id: str, |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """Delete a document and all associated data.""" |
| deleted = storage.delete_document(document_id) |
| |
| if not deleted: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Document {document_id} not found" |
| ) |
| |
| return {"message": f"Document {document_id} deleted"} |
|
|
|
|
| @app.get( |
| "/api/v1/stats", |
| summary="Get statistics", |
| description="Get system statistics and metrics.", |
| tags=["System"] |
| ) |
| async def get_stats( |
| api_key: APIKeyInfo = Depends(verify_api_key), |
| storage: LocalJSONStorage = Depends(get_storage) |
| ): |
| """Get processing statistics.""" |
| return storage.get_stats() |
|
|
|
|
| |
|
|
| def _build_extracted_document(submission: DocumentSubmission): |
| """Build a pre-parsed ExtractedDocument from text_content, source_url, file_url, or file_content.""" |
| from src.parsing.pdf_extractor import ExtractedDocument, DocumentSection, PDFExtractor |
| from pathlib import Path as _Path |
| import os |
| import tempfile |
|
|
| if submission.text_content: |
| |
| if submission.languages: |
| langs = submission.languages |
| else: |
| try: |
| from langdetect import detect, DetectorFactory |
| DetectorFactory.seed = 0 |
| langs = [detect(submission.text_content[:2000])] |
| except Exception: |
| langs = ["en"] |
| sections = [DocumentSection( |
| text=submission.text_content, |
| section_type="body", |
| page_number=1, |
| language=langs[0], |
| )] |
| return ExtractedDocument( |
| document_id=submission.document_id, |
| file_path=_Path("."), |
| sections=sections, |
| languages=langs, |
| total_pages=1, |
| metadata={}, |
| ) |
|
|
| langs = submission.languages or ["en"] |
|
|
| if submission.source_url: |
| from src.parsing.web_scraper import WebScraper |
| scraper = WebScraper() |
| return scraper.scrape(str(submission.source_url), submission.document_id) |
|
|
| if submission.file_url: |
| _url = str(submission.file_url) |
| logger.info("Downloading PDF from file_url: %s", _url) |
|
|
| if "unesdoc.unesco.org" in _url: |
| |
| from src.utils.pdf_downloader import UNESDOCPDFDownloader |
| try: |
| downloader = UNESDOCPDFDownloader(output_folder=tempfile.mkdtemp()) |
| pdf_path = downloader.download_pdf(_url) |
| try: |
| extractor = PDFExtractor() |
| return extractor.extract(pdf_path, submission.document_id) |
| finally: |
| if pdf_path and pdf_path.exists(): |
| pdf_path.unlink() |
| except (ValueError, RuntimeError) as exc: |
| logger.error("UNESDOC download failed: %s", exc) |
| raise ValueError(str(exc)) from exc |
| else: |
| |
| if not _is_ssrf_safe(_url): |
| raise HTTPException(status_code=400, detail="URL not allowed") |
| import requests as _req_lib |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: |
| tmp_path = tmp.name |
| try: |
| _MAX_PDF = 50 * 1024 * 1024 |
| resp = _req_lib.get(_url, timeout=30, stream=True) |
| resp.raise_for_status() |
| _size = 0 |
| with open(tmp_path, "wb") as _fout: |
| for _chunk in resp.iter_content(65536): |
| _size += len(_chunk) |
| if _size > _MAX_PDF: |
| raise ValueError(f"PDF exceeds {_MAX_PDF // 1024 // 1024} MB limit") |
| _fout.write(_chunk) |
| extractor = PDFExtractor() |
| return extractor.extract(_Path(tmp_path), submission.document_id) |
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| if submission.file_content: |
| import base64 |
| logger.info("Decoding base64 PDF for document %s", submission.document_id) |
| pdf_bytes = base64.b64decode(submission.file_content) |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: |
| tmp.write(pdf_bytes) |
| tmp_path = tmp.name |
| try: |
| extractor = PDFExtractor() |
| return extractor.extract(_Path(tmp_path), submission.document_id) |
| finally: |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
| return None |
|
|
|
|
| def _run_pipeline_sync(submission: DocumentSubmission) -> dict: |
| """Run PipelineRunner synchronously; raises on failure.""" |
| from src.pipeline.runner import PipelineRunner |
|
|
| extracted = _build_extracted_document(submission) |
| |
| _langs = ( |
| submission.languages |
| or (getattr(extracted, "languages", None) if extracted else None) |
| or ["en"] |
| ) |
| runner = PipelineRunner( |
| document_id=submission.document_id, |
| languages=_langs, |
| extracted_document=extracted, |
| ) |
| return runner.run() |
|
|
|
|
| def _pipeline_to_dcat(result: dict, doc_id: str) -> DCATResult: |
| """Convert PipelineRunner result dict β DCATResult.""" |
| from src.api.models import DCATEntity, GeographicalCoverage, TimeCoverage |
|
|
| stages = result.get("stages", {}) |
| extract = stages.get("extract", {}) |
| ground = stages.get("ground", {}) |
| fmt = stages.get("format", {}) |
|
|
| |
| entities: list[DCATEntity] = [] |
| seen_uris: set[str] = set() |
|
|
| from src.config.extraction_rules import MAX_OUTPUT_ENTITIES, OUTPUT_CAPS, THESAURUS_LABEL_BLOCKLIST |
|
|
| for item in ground.get("grounded_entities", []): |
| concept = item.get("concept", {}) |
| entity = item.get("entity", {}) |
| uri = concept.get("uri", "") |
| if not uri or uri in seen_uris: |
| continue |
| seen_uris.add(uri) |
| label = (concept.get("label") or entity.get("text", "")).strip() |
| if label.lower() in THESAURUS_LABEL_BLOCKLIST: |
| continue |
| |
| |
| |
| entity_type = entity.get("label", "") |
| if entity_type.startswith(("geo.", "temporal.")): |
| continue |
| entities.append(DCATEntity( |
| uri=uri, |
| label=label, |
| entity_type=entity_type or None, |
| source="GLiNER2+Thesaurus", |
| confidence=float(item.get("match_score", entity.get("score", 0.8))), |
| )) |
|
|
| |
| grounded_texts = {e.label.lower() for e in entities} |
| for ent in extract.get("entities", [])[:30]: |
| text = ent.get("text", "").strip() |
| if not text or text.lower() in grounded_texts: |
| continue |
| entity_type = ent.get("label", "") |
| if entity_type.startswith(("geo.", "temporal.")): |
| continue |
| grounded_texts.add(text.lower()) |
| uri = f"http://vocabularies.unesco.org/thesaurus/extracted/{text.replace(' ','_')}" |
| entities.append(DCATEntity( |
| uri=uri, |
| label=text, |
| entity_type=ent.get("label", "") or None, |
| source="GLiNER2", |
| confidence=float(ent.get("score", 0.8)), |
| )) |
|
|
| |
| |
| entities.sort(key=lambda e: e.confidence, reverse=True) |
| entities = entities[:MAX_OUTPUT_ENTITIES] |
|
|
| |
| sdg_goals = [ |
| {"goal": s.get("sdg", ""), "confidence": float(s.get("confidence", 0.8))} |
| for s in extract.get("sdg_predictions", []) |
| if s.get("sdg") |
| ] |
|
|
| |
| _primary_lang = (result.get("detected_languages") or ["en"])[0] |
|
|
| |
| |
| |
| import re as _re |
| from src.utils.country_normalizer import EntityType as _GeoType, get_country_normalizer as _get_norm |
| from src.api.models import CountryEntity as _CountryEntity, RegionEntity as _RegionEntity |
|
|
| _all_ents = extract.get("entities", []) |
| _country_names = [e["text"] for e in _all_ents if e.get("label") == "geo.country" and e.get("text")] |
| _region_names = [e["text"] for e in _all_ents if e.get("label") == "geo.region" and e.get("text")] |
|
|
| if not _country_names: |
| |
| _full_text = result.get("full_text", "") |
| _candidate_texts = [e["text"] for e in _all_ents if e.get("text")] |
| |
| |
| _GEO_BLOCKLIST = frozenset({ |
| "the", "an", "this", "that", "these", "those", "its", "their", "our", |
| "a", "and", "or", "of", "in", "on", "to", "for", "by", "at", "from", |
| "ai", "ml", "ict", "it", "ar", "id", "io", "as", "do", |
| "is", "me", "my", "no", "ok", "so", "up", "be", |
| "annex", "chapter", "part", "section", "resolution", "decision", |
| "note", "report", "table", "figure", "appendix", "document", |
| }) |
| |
| _cap_tokens = _re.findall( |
| r'\b[A-ZΓ-ΕΈ][a-zA-ZΓ-ΓΏ\u0400-\u04FF\u0600-\u06FF\'-]{3,}(?:\s+[A-ZΓ-ΕΈ][a-zA-ZΓ-ΓΏ\u0400-\u04FF\u0600-\u06FF\'-]{2,}){0,2}\b', |
| _full_text |
| ) |
| _candidate_texts += [t for t in _cap_tokens if t.lower() not in _GEO_BLOCKLIST] |
| _norm_fb = _get_norm() |
| for _cand in dict.fromkeys(_candidate_texts): |
| _ent = _norm_fb.normalize(_cand) |
| if _ent.entity_type == _GeoType.COUNTRY and _ent.iso3 and _ent.confidence >= 0.9: |
| _country_names.append(_cand) |
| elif _ent.entity_type == _GeoType.REGION and _ent.confidence >= 0.9: |
| _region_names.append(_cand) |
|
|
| from src.utils.country_normalizer import get_geo_thesaurus_index as _get_geo_idx |
| _geo_idx = _get_geo_idx() |
| _norm = _get_norm() |
| _cov = _norm.normalize_multiple(_country_names) |
| _countries = [ |
| _CountryEntity( |
| name=c.name, iso3=c.iso3, iso2=c.iso2, confidence=c.confidence, |
| thesaurus_uri=_geo_idx.lookup_any_lang(c.name), |
| ) |
| for c in _cov.countries if c.iso3 |
| ] |
| _regions = [ |
| _RegionEntity( |
| name=r.name, entity_type="region", confidence=r.confidence, |
| thesaurus_uri=_geo_idx.lookup_any_lang(r.name), |
| ) |
| for r in _cov.regions |
| ] |
| _seen_regions = {r.name.lower() for r in _regions} |
| for _rn in dict.fromkeys(_region_names): |
| if _rn.lower() not in _seen_regions: |
| _regions.append(_RegionEntity( |
| name=_rn, entity_type="region", |
| thesaurus_uri=_geo_idx.lookup_any_lang(_rn), |
| )) |
| _seen_regions.add(_rn.lower()) |
| _countries = _countries[:OUTPUT_CAPS["max_countries"]] |
| _regions = _regions[:OUTPUT_CAPS["max_regions"]] |
| geo_coverage = GeographicalCoverage( |
| countries=_countries, |
| regions=_regions, |
| iso3_codes=[c.iso3 for c in _countries], |
| ) |
|
|
| |
| |
| |
| _year_texts = [e["text"] for e in _all_ents if e.get("label") == "temporal.year" and e.get("text")] |
| _session_texts = [e["text"] for e in _all_ents if e.get("label") == "temporal.session" and e.get("text")] |
| _adoption_texts = [e["text"] for e in _all_ents if e.get("label") == "temporal.adoption_date" and e.get("text")] |
|
|
| _years: list[int] = [] |
| for _y in _year_texts: |
| try: |
| _years.append(int(_y.strip())) |
| except ValueError: |
| pass |
|
|
| if not _years: |
| |
| _full_text = result.get("full_text", "") |
| _years = [int(y) for y in _re.findall(r'\b(19[5-9]\d|20[0-9]\d)\b', _full_text)] |
| _years = sorted(set(_years)) |
|
|
| _years = _years[:OUTPUT_CAPS["max_years"]] |
| _sessions = list(dict.fromkeys(_session_texts))[:OUTPUT_CAPS["max_sessions"]] |
| time_coverage = TimeCoverage( |
| year=_years[0] if _years else None, |
| years_mentioned=_years, |
| sessions=_sessions, |
| adoption_date=_adoption_texts[0] if _adoption_texts else None, |
| ) |
|
|
| |
| |
| |
| if not sdg_goals: |
| _full_text = result.get("full_text", "") |
| _sdg_pattern = _re.compile( |
| r'\b(?:SDG|ODD|Goal|Objectif|Objetivo|Π¦Π£Π )\s*(\d{1,2})\b', _re.IGNORECASE |
| ) |
| _sdg_nums = sorted({int(m) for m in _sdg_pattern.findall(_full_text) if 1 <= int(m) <= 17}) |
| sdg_goals = [{"goal": f"SDG{n}", "confidence": 0.65} for n in _sdg_nums] |
|
|
| |
| dcat_metadata = fmt.get("dcat_metadata") or { |
| "@context": ["https://www.w3.org/ns/dcat/v3", "http://purl.org/dc/terms/"], |
| "@id": f"http://unesdoc.unesco.org/{doc_id}", |
| "dcterms:title": {"@language": _primary_lang, "@value": f"Document {doc_id}"}, |
| "dcterms:subject": [{"@id": e.uri} for e in entities[:10]], |
| } |
| |
| if isinstance(dcat_metadata.get("dcterms:title"), dict): |
| dcat_metadata["dcterms:title"]["@language"] = _primary_lang |
|
|
| return DCATResult( |
| document_id=doc_id, |
| status="completed", |
| dcat_metadata=dcat_metadata, |
| entities=entities[:20], |
| sdg_goals=sdg_goals, |
| justifications=[], |
| processing_time_ms=result.get("processing_time_seconds", 0) * 1000, |
| created_at=datetime.now(timezone.utc), |
| geographical_coverage=geo_coverage, |
| time_coverage=time_coverage, |
| ) |
|
|
|
|
| |
|
|
| async def process_document_background( |
| submission: DocumentSubmission, |
| storage: LocalJSONStorage, |
| ): |
| """ |
| Background task: runs the real pipeline (text / URL / PDF), falls back |
| to a lightweight stub when no model is available. |
| |
| Bug-fix: result is saved BEFORE status is set to COMPLETED so that a |
| concurrent GET /result/{id} never receives 404 on a 'completed' doc. |
| """ |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| doc_id = submission.document_id |
|
|
| try: |
| |
| for proc_status, stage, pct in [ |
| (ProcessingStatus.PARSING, "parsing", 15), |
| (ProcessingStatus.EXTRACTING, "extracting", 40), |
| (ProcessingStatus.GROUNDING, "grounding", 65), |
| (ProcessingStatus.FORMATTING, "formatting", 85), |
| ]: |
| storage.save_status(document_id=doc_id, status=proc_status, |
| stage=stage, progress=pct) |
| await asyncio.sleep(0.05) |
|
|
| |
| has_input = (submission.text_content or submission.source_url |
| or submission.file_url or submission.file_content) |
|
|
| if has_input: |
| loop = asyncio.get_event_loop() |
| _sem = _PIPELINE_SEMAPHORE or _asyncio.Semaphore(_MAX_CONCURRENT) |
| async with _sem: |
| with ThreadPoolExecutor(max_workers=1) as pool: |
| _future = loop.run_in_executor(pool, _run_pipeline_sync, submission) |
| try: |
| pipeline_result = await asyncio.wait_for(_future, timeout=_PIPELINE_TIMEOUT) |
| except asyncio.TimeoutError: |
| raise RuntimeError( |
| f"Pipeline timed out after {int(_PIPELINE_TIMEOUT // 60)} minutes" |
| ) |
| dcat_result = _pipeline_to_dcat(pipeline_result, doc_id) |
| else: |
| |
| from src.api.models import GeographicalCoverage, TimeCoverage |
| dcat_result = DCATResult( |
| document_id=doc_id, |
| status="completed", |
| dcat_metadata={ |
| "@context": ["https://www.w3.org/ns/dcat/v3"], |
| "@id": f"http://unesdoc.unesco.org/{doc_id}", |
| }, |
| entities=[], |
| sdg_goals=[], |
| justifications=[], |
| processing_time_ms=0.0, |
| created_at=datetime.now(timezone.utc), |
| geographical_coverage=GeographicalCoverage(), |
| time_coverage=TimeCoverage(), |
| ) |
|
|
| |
| storage.save_result(doc_id, dcat_result) |
| storage.save_status(document_id=doc_id, status=ProcessingStatus.COMPLETED, |
| stage="completed", progress=100) |
|
|
| if submission.webhook_url: |
| logger.info(f"Would send webhook to {submission.webhook_url}") |
|
|
| logger.info(f"Completed processing {doc_id}") |
|
|
| except Exception as e: |
| logger.error(f"Processing failed for {doc_id}: {e}", exc_info=True) |
| |
| _err_msg = str(e) |
| try: |
| from src.config import config as _cfg |
| if _cfg.UNESDOC_SALT and _cfg.UNESDOC_SALT in _err_msg: |
| _err_msg = _err_msg.replace(_cfg.UNESDOC_SALT, "***") |
| except Exception: |
| pass |
| |
| import re as _re |
| _err_msg = _re.sub(r"(/[\w./\-]+\.py:\d+)", "[internal]", _err_msg) |
| _err_msg = _re.sub(r"(/tmp/[\w./\-]+)", "[tmp]", _err_msg) |
| storage.save_status( |
| document_id=doc_id, |
| status=ProcessingStatus.FAILED, |
| error_message=_err_msg, |
| ) |
|
|
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| |
| uvicorn.run( |
| "src.api.main:app", |
| host="0.0.0.0", |
| port=8000, |
| reload=True, |
| log_level="info" |
| ) |
|
|