Submit a document (PDF, UNESDOC ID, raw text, or URL) for metadata extraction.
GET
/api/v1/status/{{id}}
Check processing status with stage and progress percentage.
GET
/api/v1/result/{{id}}
Retrieve the complete DCAT-AP 3.0 JSON-LD metadata output.
POST
/api/v1/batch
Submit up to 100 documents in a single batch request.
GET
/api/v1/documents
List processed documents with filtering by country, year, or region.
GET
/health
Public health check — returns service status and version. No auth required.
Authentication
All endpoints except /health require an API key via the X-API-Key request header.
Example: curl -H "X-API-Key: your-key" https://<space-url>/api/v1/status/test
"""
return HTMLResponse(content=html)
# Health check endpoint (public)
@app.get(
"/health",
response_model=HealthCheck,
summary="Health check",
description="Check API health status. Public endpoint, no authentication required.",
tags=["Health"]
)
async def health_check():
"""Get API health status."""
return HealthCheck(
status="healthy",
version=API_VERSION,
timestamp=datetime.now(timezone.utc),
components={
"api": "ok",
"storage": "ok",
"pipeline": "ready"
}
)
# Protected endpoints
@app.post(
"/api/v1/process",
response_model=ProcessingResult,
summary="Submit document for processing",
description="Submit a single document for metadata extraction.",
tags=["Documents"],
responses={
401: {"model": ErrorResponse, "description": "Invalid API key"},
422: {"model": ErrorResponse, "description": "Validation error"}
}
)
async def process_document(
submission: DocumentSubmission,
background_tasks: BackgroundTasks,
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""
Submit a document for processing.
The document will be processed through:
1. PDF parsing
2. Entity extraction (GLiNER2)
3. Knowledge graph grounding
4. DCAT-AP formatting
Returns immediately with status. Use `/status/{document_id}` to check progress.
"""
logger.info(f"Processing request for {submission.document_id} (API key: {api_key.name})")
# Reject payloads that would exhaust server RAM (~50 MB binary after base64 decode)
_MAX_FILE_CONTENT = 67_000_000 # 50 MB binary × 4/3 base64 overhead
if submission.file_content and len(submission.file_content) > _MAX_FILE_CONTENT:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="PDF exceeds the 50 MB limit. Use file_url to provide a download link instead.",
)
# Check if document already exists (any status except failed means it's in progress or done)
existing = storage.get_status(submission.document_id)
if existing:
current_status = existing.get("status")
# Allow re-processing only if previous attempt failed
if current_status not in [ProcessingStatus.FAILED.value]:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Document {submission.document_id} already exists with status: {current_status}"
)
# Save initial status
storage.save_status(
document_id=submission.document_id,
status=ProcessingStatus.PENDING,
stage="queued",
progress=0,
metadata={
"languages": submission.languages,
"document_family": submission.document_family,
"priority": submission.priority,
"webhook_url": str(submission.webhook_url) if submission.webhook_url else None
}
)
# Trigger background processing
background_tasks.add_task(
process_document_background,
submission,
storage
)
return ProcessingResult(
document_id=submission.document_id,
status=ProcessingStatus.PENDING,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
current_stage="queued",
progress_percent=0
)
@app.post(
"/api/v1/batch",
response_model=BatchResult,
summary="Submit batch of documents",
description="Submit multiple documents for batch processing.",
tags=["Documents"],
responses={
401: {"model": ErrorResponse, "description": "Invalid API key"}
}
)
async def process_batch(
batch: BatchSubmission,
background_tasks: BackgroundTasks,
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""
Submit a batch of documents for processing.
Maximum 100 documents per batch.
"""
logger.info(f"Batch submission: {len(batch.documents)} documents (API key: {api_key.name})")
batch_id = f"batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
doc_ids = [doc.document_id for doc in batch.documents]
# Queue each document
for doc in batch.documents:
storage.save_status(
document_id=doc.document_id,
status=ProcessingStatus.PENDING,
metadata={"batch_id": batch_id}
)
background_tasks.add_task(process_document_background, doc, storage)
return BatchResult(
batch_id=batch_id,
total_documents=len(batch.documents),
submitted_documents=doc_ids,
status="submitted"
)
@app.get(
"/api/v1/status/{document_id}",
response_model=ProcessingResult,
summary="Get processing status",
description="Check the current processing status of a document.",
tags=["Documents"],
responses={
401: {"model": ErrorResponse, "description": "Invalid API key"},
404: {"model": ErrorResponse, "description": "Document not found"}
}
)
async def get_status(
document_id: str,
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""
Get the processing status of a document.
Returns current stage, progress percentage, and any error messages.
"""
status_data = storage.get_status(document_id)
if not status_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Document {document_id} not found"
)
return ProcessingResult(
document_id=document_id,
status=ProcessingStatus(status_data.get("status", "pending")),
created_at=datetime.fromisoformat(status_data.get("created_at", datetime.now(timezone.utc).isoformat())),
updated_at=datetime.fromisoformat(status_data.get("updated_at", datetime.now(timezone.utc).isoformat())),
current_stage=status_data.get("stage"),
progress_percent=status_data.get("progress", 0),
error_message=status_data.get("error_message")
)
@app.get(
"/api/v1/result/{document_id}",
response_model=DCATResult,
summary="Get processing result",
description="Retrieve the DCAT-AP formatted result for a completed document.",
tags=["Documents"],
responses={
401: {"model": ErrorResponse, "description": "Invalid API key"},
404: {"model": ErrorResponse, "description": "Result not found"},
409: {"model": ErrorResponse, "description": "Processing not complete"}
}
)
async def get_result(
document_id: str,
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""
Get the DCAT-AP formatted result for a document.
Only available after processing is complete.
"""
# Check status first
status_data = storage.get_status(document_id)
if not status_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Document {document_id} not found"
)
if status_data.get("status") != ProcessingStatus.COMPLETED.value:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Document {document_id} processing not complete. Current status: {status_data.get('status')}"
)
# Get result
result = storage.get_result(document_id)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Result for {document_id} not found"
)
return result
@app.get(
"/api/v1/documents",
summary="List documents",
description="List all documents with optional filtering by country, year, and region.",
tags=["Documents"]
)
async def list_documents(
status: ProcessingStatus = None,
country: Optional[str] = Query(
None,
description="Filter by ISO3 code (e.g., FRA, USA, BRA)",
min_length=3,
max_length=3,
pattern=r"^[A-Z]{3}$",
examples={"france": {"summary": "France", "value": "FRA"}}
),
year: Optional[int] = Query(
None,
description="Filter by year (e.g., 2024)",
ge=1945,
le=2100,
examples={"2024": {"summary": "Year 2024", "value": 2024}}
),
region: Optional[str] = Query(
None,
description="Filter by UNESCO region (e.g., Africa, Europe)"
),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""
List documents with optional filtering.
## Filtering Options
- **country**: Filter by ISO3 code (e.g., "FRA" for France)
- **year**: Filter by document year (e.g., 2024)
- **region**: Filter by UNESCO region (e.g., "Africa", "Europe")
- **status**: Filter by processing status
## Pagination
- **limit**: Maximum results to return (1-1000)
- **offset**: Number of results to skip
## Examples
- `/api/v1/documents?country=FRA` - Documents related to France
- `/api/v1/documents?year=2024` - Documents from 2024
- `/api/v1/documents?region=Africa` - Documents about Africa
- `/api/v1/documents?country=USA&year=2024` - US documents from 2024
"""
# Get all documents
documents = storage.list_documents(status=status, limit=1000, offset=0)
# Filter by ISO3 code
if country:
country_upper = country.upper()
documents = [
d for d in documents
if _document_has_country(d, country_upper)
]
# Filter by year
if year:
documents = [
d for d in documents
if _document_has_year(d, year)
]
# Filter by region
if region:
region_lower = region.lower()
documents = [
d for d in documents
if _document_has_region(d, region_lower)
]
# Apply pagination
total = len(documents)
documents = documents[offset:offset + limit]
return {
"documents": documents,
"total": total,
"limit": limit,
"offset": offset,
"filters_applied": {
"country": country,
"year": year,
"region": region,
"status": status.value if status else None
}
}
def _document_has_country(document: Dict, iso3: str) -> bool:
"""Check if document has a specific country by ISO3 code."""
# Check in geographical_coverage
geo = document.get("geographical_coverage", {})
if iso3 in geo.get("iso3_codes", []):
return True
# Check in countries list
for country in geo.get("countries", []):
if country.get("iso3") == iso3:
return True
return False
def _document_has_year(document: Dict, year: int) -> bool:
"""Check if document has a specific year."""
# Check in time_coverage
time_cov = document.get("time_coverage", {})
if time_cov.get("year") == year:
return True
if year in time_cov.get("years_mentioned", []):
return True
# Check in temporal_coverage (legacy)
temp = document.get("temporal_coverage", {})
if temp.get("year") == year:
return True
# Check in result metadata
result = document.get("result", {})
if result.get("year") == year:
return True
return False
def _document_has_region(document: Dict, region: str) -> bool:
"""Check if document has a specific region."""
# Check in geographical_coverage
geo = document.get("geographical_coverage", {})
for r in geo.get("regions", []):
region_name = r.get("name", "") if isinstance(r, dict) else str(r)
if region in region_name.lower():
return True
# Check in unesco_regions (legacy)
for r in document.get("unesco_regions", []):
if isinstance(r, str) and region in r.lower():
return True
return False
@app.delete(
"/api/v1/documents/{document_id}",
summary="Delete document",
description="Delete a document and all its results.",
tags=["Documents"]
)
async def delete_document(
document_id: str,
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""Delete a document and all associated data."""
deleted = storage.delete_document(document_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Document {document_id} not found"
)
return {"message": f"Document {document_id} deleted"}
@app.get(
"/api/v1/stats",
summary="Get statistics",
description="Get system statistics and metrics.",
tags=["System"]
)
async def get_stats(
api_key: APIKeyInfo = Depends(verify_api_key),
storage: LocalJSONStorage = Depends(get_storage)
):
"""Get processing statistics."""
return storage.get_stats()
# ── Pipeline helpers ────────────────────────────────────────────────────────
def _build_extracted_document(submission: DocumentSubmission):
"""Build a pre-parsed ExtractedDocument from text_content, source_url, file_url, or file_content."""
from src.parsing.pdf_extractor import ExtractedDocument, DocumentSection, PDFExtractor
from pathlib import Path as _Path
import os
import tempfile
if submission.text_content:
# Auto-detect language when caller did not specify
if submission.languages:
langs = submission.languages
else:
try:
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0 # reproducible detection
langs = [detect(submission.text_content[:2000])]
except Exception:
langs = ["en"]
sections = [DocumentSection(
text=submission.text_content,
section_type="body",
page_number=1,
language=langs[0],
)]
return ExtractedDocument(
document_id=submission.document_id,
file_path=_Path("."),
sections=sections,
languages=langs,
total_pages=1,
metadata={},
)
langs = submission.languages or ["en"]
if submission.source_url:
from src.parsing.web_scraper import WebScraper
scraper = WebScraper()
return scraper.scrape(str(submission.source_url), submission.document_id)
if submission.file_url:
_url = str(submission.file_url)
logger.info("Downloading PDF from file_url: %s", _url)
if "unesdoc.unesco.org" in _url:
# UNESDOC requires MD5 signature auth: x-signature = MD5(url + salt)
from src.utils.pdf_downloader import UNESDOCPDFDownloader
try:
downloader = UNESDOCPDFDownloader(output_folder=tempfile.mkdtemp())
pdf_path = downloader.download_pdf(_url)
try:
extractor = PDFExtractor()
return extractor.extract(pdf_path, submission.document_id)
finally:
if pdf_path and pdf_path.exists():
pdf_path.unlink()
except (ValueError, RuntimeError) as exc:
logger.error("UNESDOC download failed: %s", exc)
raise ValueError(str(exc)) from exc
else:
# Generic PDF URL — SSRF guard + streamed download with size cap
if not _is_ssrf_safe(_url):
raise HTTPException(status_code=400, detail="URL not allowed")
import requests as _req_lib
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp_path = tmp.name
try:
_MAX_PDF = 50 * 1024 * 1024 # 50 MB
resp = _req_lib.get(_url, timeout=30, stream=True)
resp.raise_for_status()
_size = 0
with open(tmp_path, "wb") as _fout:
for _chunk in resp.iter_content(65536):
_size += len(_chunk)
if _size > _MAX_PDF:
raise ValueError(f"PDF exceeds {_MAX_PDF // 1024 // 1024} MB limit")
_fout.write(_chunk)
extractor = PDFExtractor()
return extractor.extract(_Path(tmp_path), submission.document_id)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
if submission.file_content:
import base64
logger.info("Decoding base64 PDF for document %s", submission.document_id)
pdf_bytes = base64.b64decode(submission.file_content)
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp.write(pdf_bytes)
tmp_path = tmp.name
try:
extractor = PDFExtractor()
return extractor.extract(_Path(tmp_path), submission.document_id)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
return None
def _run_pipeline_sync(submission: DocumentSubmission) -> dict:
"""Run PipelineRunner synchronously; raises on failure."""
from src.pipeline.runner import PipelineRunner
extracted = _build_extracted_document(submission)
# Use caller-supplied languages; fall back to those detected by _build_extracted_document
_langs = (
submission.languages
or (getattr(extracted, "languages", None) if extracted else None)
or ["en"]
)
runner = PipelineRunner(
document_id=submission.document_id,
languages=_langs,
extracted_document=extracted,
)
return runner.run()
def _pipeline_to_dcat(result: dict, doc_id: str) -> DCATResult:
"""Convert PipelineRunner result dict → DCATResult."""
from src.api.models import DCATEntity, GeographicalCoverage, TimeCoverage
stages = result.get("stages", {})
extract = stages.get("extract", {})
ground = stages.get("ground", {})
fmt = stages.get("format", {})
# ── Grounded entities (real UNESCO Thesaurus URIs + labels) ──────────
entities: list[DCATEntity] = []
seen_uris: set[str] = set()
from src.config.extraction_rules import MAX_OUTPUT_ENTITIES, OUTPUT_CAPS, THESAURUS_LABEL_BLOCKLIST
for item in ground.get("grounded_entities", []):
concept = item.get("concept", {})
entity = item.get("entity", {})
uri = concept.get("uri", "")
if not uri or uri in seen_uris:
continue
seen_uris.add(uri)
label = (concept.get("label") or entity.get("text", "")).strip()
if label.lower() in THESAURUS_LABEL_BLOCKLIST:
continue
# Geo / temporal already populate dedicated panels (geographical_coverage,
# time_coverage). Keep entities[] reserved for topical concepts so the
# DCAT-AP output has one channel per concept type.
entity_type = entity.get("label", "")
if entity_type.startswith(("geo.", "temporal.")):
continue
entities.append(DCATEntity(
uri=uri,
label=label,
entity_type=entity_type or None,
source="GLiNER2+Thesaurus",
confidence=float(item.get("match_score", entity.get("score", 0.8))),
))
# Supplement with ungrounded raw entities (no duplicate text)
grounded_texts = {e.label.lower() for e in entities}
for ent in extract.get("entities", [])[:30]:
text = ent.get("text", "").strip()
if not text or text.lower() in grounded_texts:
continue
entity_type = ent.get("label", "")
if entity_type.startswith(("geo.", "temporal.")):
continue
grounded_texts.add(text.lower())
uri = f"http://vocabularies.unesco.org/thesaurus/extracted/{text.replace(' ','_')}"
entities.append(DCATEntity(
uri=uri,
label=text,
entity_type=ent.get("label", "") or None,
source="GLiNER2",
confidence=float(ent.get("score", 0.8)),
))
# Final cap on entity count, by confidence.
# Source of truth for the limit: config/extraction_rules.py (loader).
entities.sort(key=lambda e: e.confidence, reverse=True)
entities = entities[:MAX_OUTPUT_ENTITIES]
# ── SDG goals (stored as sdg_predictions, key sdg not goal) ──────────
sdg_goals = [
{"goal": s.get("sdg", ""), "confidence": float(s.get("confidence", 0.8))}
for s in extract.get("sdg_predictions", [])
if s.get("sdg")
]
# ── Detected language ─────────────────────────────────────────────────
_primary_lang = (result.get("detected_languages") or ["en"])[0]
# ── Geographical coverage ─────────────────────────────────────────────
# Primary: use GLiNER2 geo.country / geo.region labels
# Fallback: try all entity texts + capitalized tokens from full_text via CountryNormalizer
import re as _re
from src.utils.country_normalizer import EntityType as _GeoType, get_country_normalizer as _get_norm
from src.api.models import CountryEntity as _CountryEntity, RegionEntity as _RegionEntity
_all_ents = extract.get("entities", [])
_country_names = [e["text"] for e in _all_ents if e.get("label") == "geo.country" and e.get("text")]
_region_names = [e["text"] for e in _all_ents if e.get("label") == "geo.region" and e.get("text")]
if not _country_names:
# Fallback: run all entity texts through CountryNormalizer + scan full_text
_full_text = result.get("full_text", "")
_candidate_texts = [e["text"] for e in _all_ents if e.get("text")]
# Tokens to reject: stopwords, tech abbreviations that share an ISO2 code,
# and UNESCO document section words that appear capitalised.
_GEO_BLOCKLIST = frozenset({
"the", "an", "this", "that", "these", "those", "its", "their", "our",
"a", "and", "or", "of", "in", "on", "to", "for", "by", "at", "from",
"ai", "ml", "ict", "it", "ar", "id", "io", "as", "do",
"is", "me", "my", "no", "ok", "so", "up", "be",
"annex", "chapter", "part", "section", "resolution", "decision",
"note", "report", "table", "figure", "appendix", "document",
})
# Require 4+ chars on first word — kills "The", "An", "It", etc.
_cap_tokens = _re.findall(
r'\b[A-ZÀ-Ÿ][a-zA-ZÀ-ÿ\u0400-\u04FF\u0600-\u06FF\'-]{3,}(?:\s+[A-ZÀ-Ÿ][a-zA-ZÀ-ÿ\u0400-\u04FF\u0600-\u06FF\'-]{2,}){0,2}\b',
_full_text
)
_candidate_texts += [t for t in _cap_tokens if t.lower() not in _GEO_BLOCKLIST]
_norm_fb = _get_norm()
for _cand in dict.fromkeys(_candidate_texts): # deduplicate while preserving order
_ent = _norm_fb.normalize(_cand)
if _ent.entity_type == _GeoType.COUNTRY and _ent.iso3 and _ent.confidence >= 0.9:
_country_names.append(_cand)
elif _ent.entity_type == _GeoType.REGION and _ent.confidence >= 0.9:
_region_names.append(_cand)
from src.utils.country_normalizer import get_geo_thesaurus_index as _get_geo_idx
_geo_idx = _get_geo_idx()
_norm = _get_norm()
_cov = _norm.normalize_multiple(_country_names)
_countries = [
_CountryEntity(
name=c.name, iso3=c.iso3, iso2=c.iso2, confidence=c.confidence,
thesaurus_uri=_geo_idx.lookup_any_lang(c.name),
)
for c in _cov.countries if c.iso3
]
_regions = [
_RegionEntity(
name=r.name, entity_type="region", confidence=r.confidence,
thesaurus_uri=_geo_idx.lookup_any_lang(r.name),
)
for r in _cov.regions
]
_seen_regions = {r.name.lower() for r in _regions}
for _rn in dict.fromkeys(_region_names):
if _rn.lower() not in _seen_regions:
_regions.append(_RegionEntity(
name=_rn, entity_type="region",
thesaurus_uri=_geo_idx.lookup_any_lang(_rn),
))
_seen_regions.add(_rn.lower())
_countries = _countries[:OUTPUT_CAPS["max_countries"]]
_regions = _regions[:OUTPUT_CAPS["max_regions"]]
geo_coverage = GeographicalCoverage(
countries=_countries,
regions=_regions,
iso3_codes=[c.iso3 for c in _countries],
)
# ── Temporal coverage ─────────────────────────────────────────────────
# Primary: use GLiNER2 temporal.year / session / adoption_date labels
# Fallback: regex year extraction from full_text
_year_texts = [e["text"] for e in _all_ents if e.get("label") == "temporal.year" and e.get("text")]
_session_texts = [e["text"] for e in _all_ents if e.get("label") == "temporal.session" and e.get("text")]
_adoption_texts = [e["text"] for e in _all_ents if e.get("label") == "temporal.adoption_date" and e.get("text")]
_years: list[int] = []
for _y in _year_texts:
try:
_years.append(int(_y.strip()))
except ValueError:
pass
if not _years:
# Fallback: extract 4-digit years (1950-2099) from full text
_full_text = result.get("full_text", "")
_years = [int(y) for y in _re.findall(r'\b(19[5-9]\d|20[0-9]\d)\b', _full_text)]
_years = sorted(set(_years))
_years = _years[:OUTPUT_CAPS["max_years"]]
_sessions = list(dict.fromkeys(_session_texts))[:OUTPUT_CAPS["max_sessions"]]
time_coverage = TimeCoverage(
year=_years[0] if _years else None,
years_mentioned=_years,
sessions=_sessions,
adoption_date=_adoption_texts[0] if _adoption_texts else None,
)
# ── SDG goals ─────────────────────────────────────────────────────────
# Primary: GLiNER2 sdg_predictions
# Fallback: regex search for "SDG N" / "ODD N" patterns in full text
if not sdg_goals:
_full_text = result.get("full_text", "")
_sdg_pattern = _re.compile(
r'\b(?:SDG|ODD|Goal|Objectif|Objetivo|ЦУР)\s*(\d{1,2})\b', _re.IGNORECASE
)
_sdg_nums = sorted({int(m) for m in _sdg_pattern.findall(_full_text) if 1 <= int(m) <= 17})
sdg_goals = [{"goal": f"SDG{n}", "confidence": 0.65} for n in _sdg_nums]
# ── DCAT metadata from format stage ──────────────────────────────────
dcat_metadata = fmt.get("dcat_metadata") or {
"@context": ["https://www.w3.org/ns/dcat/v3", "http://purl.org/dc/terms/"],
"@id": f"http://unesdoc.unesco.org/{doc_id}",
"dcterms:title": {"@language": _primary_lang, "@value": f"Document {doc_id}"},
"dcterms:subject": [{"@id": e.uri} for e in entities[:10]],
}
# Override @language in title with detected language (formatter defaults to "en")
if isinstance(dcat_metadata.get("dcterms:title"), dict):
dcat_metadata["dcterms:title"]["@language"] = _primary_lang
return DCATResult(
document_id=doc_id,
status="completed",
dcat_metadata=dcat_metadata,
entities=entities[:20],
sdg_goals=sdg_goals,
justifications=[],
processing_time_ms=result.get("processing_time_seconds", 0) * 1000,
created_at=datetime.now(timezone.utc),
geographical_coverage=geo_coverage,
time_coverage=time_coverage,
)
# ── Background task ──────────────────────────────────────────────────────────
async def process_document_background(
submission: DocumentSubmission,
storage: LocalJSONStorage,
):
"""
Background task: runs the real pipeline (text / URL / PDF), falls back
to a lightweight stub when no model is available.
Bug-fix: result is saved BEFORE status is set to COMPLETED so that a
concurrent GET /result/{id} never receives 404 on a 'completed' doc.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
doc_id = submission.document_id
try:
# ── Progress ticks (fast, non-blocking) ──────────────────────────
for proc_status, stage, pct in [
(ProcessingStatus.PARSING, "parsing", 15),
(ProcessingStatus.EXTRACTING, "extracting", 40),
(ProcessingStatus.GROUNDING, "grounding", 65),
(ProcessingStatus.FORMATTING, "formatting", 85),
]:
storage.save_status(document_id=doc_id, status=proc_status,
stage=stage, progress=pct)
await asyncio.sleep(0.05)
# ── Real pipeline in thread pool (semaphore-limited + timeout) ──────
has_input = (submission.text_content or submission.source_url
or submission.file_url or submission.file_content)
if has_input:
loop = asyncio.get_event_loop()
_sem = _PIPELINE_SEMAPHORE or _asyncio.Semaphore(_MAX_CONCURRENT)
async with _sem:
with ThreadPoolExecutor(max_workers=1) as pool:
_future = loop.run_in_executor(pool, _run_pipeline_sync, submission)
try:
pipeline_result = await asyncio.wait_for(_future, timeout=_PIPELINE_TIMEOUT)
except asyncio.TimeoutError:
raise RuntimeError(
f"Pipeline timed out after {int(_PIPELINE_TIMEOUT // 60)} minutes"
)
dcat_result = _pipeline_to_dcat(pipeline_result, doc_id)
else:
# No input provided – return an empty skeleton result
from src.api.models import GeographicalCoverage, TimeCoverage
dcat_result = DCATResult(
document_id=doc_id,
status="completed",
dcat_metadata={
"@context": ["https://www.w3.org/ns/dcat/v3"],
"@id": f"http://unesdoc.unesco.org/{doc_id}",
},
entities=[],
sdg_goals=[],
justifications=[],
processing_time_ms=0.0,
created_at=datetime.now(timezone.utc),
geographical_coverage=GeographicalCoverage(),
time_coverage=TimeCoverage(),
)
# ── Save result FIRST, then mark complete (fixes race condition) ──
storage.save_result(doc_id, dcat_result)
storage.save_status(document_id=doc_id, status=ProcessingStatus.COMPLETED,
stage="completed", progress=100)
if submission.webhook_url:
logger.info(f"Would send webhook to {submission.webhook_url}")
logger.info(f"Completed processing {doc_id}")
except Exception as e:
logger.error(f"Processing failed for {doc_id}: {e}", exc_info=True)
# Sanitize error message before storing — never expose secrets or internal paths
_err_msg = str(e)
try:
from src.config import config as _cfg
if _cfg.UNESDOC_SALT and _cfg.UNESDOC_SALT in _err_msg:
_err_msg = _err_msg.replace(_cfg.UNESDOC_SALT, "***")
except Exception:
pass
# Strip Python file paths (e.g. /tmp/..., /app/src/...)
import re as _re
_err_msg = _re.sub(r"(/[\w./\-]+\.py:\d+)", "[internal]", _err_msg)
_err_msg = _re.sub(r"(/tmp/[\w./\-]+)", "[tmp]", _err_msg)
storage.save_status(
document_id=doc_id,
status=ProcessingStatus.FAILED,
error_message=_err_msg,
)
# Run the application
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.api.main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)