Spaces:
Running
Running
| """Tests Sprint 6 — Interface web FastAPI, import HTR-United, HuggingFace, serve CLI. | |
| Classes de tests | |
| ---------------- | |
| TestHTRUnitedEntry (8 tests) — dataclass, as_dict, from_dict, century_str | |
| TestHTRUnitedCatalogue (10 tests) — from_demo, search, get_by_id, available_languages/scripts | |
| TestHTRUnitedSearch (8 tests) — recherche textuelle, filtre langue, script, siècle | |
| TestHTRUnitedImport (4 tests) — import_htr_united_corpus crée les fichiers meta | |
| TestHuggingFaceDataset (7 tests) — dataclass, as_dict, from_dict, hf_url | |
| TestHuggingFaceImporter (10 tests) — search référence, filtres, import | |
| TestHuggingFaceReferenceData (4 tests) — datasets de référence pré-intégrés | |
| TestNormalizationProfiles (8 tests) — profils disponibles via API route | |
| TestFastAPIStatus (3 tests) — GET /api/status | |
| TestFastAPIEngines (8 tests) — GET /api/engines | |
| TestFastAPICorpusBrowse (6 tests) — GET /api/corpus/browse | |
| TestFastAPIReports (5 tests) — GET /api/reports | |
| TestFastAPIHTRUnited (7 tests) — GET /api/htr-united/catalogue + POST import | |
| TestFastAPIHuggingFace (6 tests) — GET /api/huggingface/search + POST import | |
| TestFastAPIBenchmark (8 tests) — POST start, GET status, GET stream, POST cancel | |
| TestFastAPIHTML (5 tests) — GET / retourne HTML valide | |
| TestFastAPIReportServe (4 tests) — GET /reports/{filename} | |
| TestCLIServeCommand (5 tests) — commande picarones serve enregistrée | |
| TestRunnerProgressCallback (5 tests) — progress_callback injecté dans run_benchmark | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| from pathlib import Path | |
| from unittest.mock import patch | |
| import pytest | |
| from click.testing import CliRunner | |
| from fastapi.testclient import TestClient | |
| from PIL import Image as _PILImage | |
| def _minimal_image_bytes(fmt: str) -> bytes: | |
| """Génère une image 1×1 valide qui passe ``validate_image_safe``. | |
| Le durcissement Phase 1 du chantier post-rewrite appelle | |
| ``Pillow.verify()`` sur chaque image extraite d'un ZIP — les | |
| anciens placeholders ``b"\\xff\\xd8\\xff"`` (signature seule) sont | |
| désormais rejetés. Cette fonction produit l'image minimale au | |
| setup des fixtures. | |
| """ | |
| buf = io.BytesIO() | |
| _PILImage.new("RGB", (1, 1), color=(200, 200, 200)).save(buf, fmt) | |
| return buf.getvalue() | |
| _MINIMAL_PNG_BYTES = _minimal_image_bytes("PNG") | |
| _MINIMAL_JPEG_BYTES = _minimal_image_bytes("JPEG") | |
| # --------------------------------------------------------------------------- | |
| # Fixtures | |
| # --------------------------------------------------------------------------- | |
| def tmp_corpus(tmp_path): | |
| """Crée un corpus minimal avec 2 documents.""" | |
| from PIL import Image | |
| for i in range(2): | |
| img = Image.new("RGB", (100, 50), color=(200, 200, 200)) | |
| img.save(tmp_path / f"doc_{i:02d}.jpg") | |
| (tmp_path / f"doc_{i:02d}.gt.txt").write_text(f"Texte vérité terrain {i}", encoding="utf-8") | |
| return tmp_path | |
| def client(): | |
| from picarones.interfaces.web.app import app | |
| return TestClient(app) | |
| def hf_importer(): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceImporter | |
| return HuggingFaceImporter() | |
| # =========================================================================== | |
| # TestHuggingFaceDataset | |
| # =========================================================================== | |
| class TestHuggingFaceDataset: | |
| def test_from_dict_basic(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| d = { | |
| "dataset_id": "test/dataset", "title": "Test Dataset", | |
| "description": "A test dataset.", "language": ["French"], | |
| "tags": ["ocr", "french"], "license": "cc-by-4.0", | |
| "institution": "Test Lab", "downloads": 500, | |
| } | |
| ds = HuggingFaceDataset.from_dict(d) | |
| assert ds.dataset_id == "test/dataset" | |
| assert ds.language == ["French"] | |
| assert ds.downloads == 500 | |
| def test_as_dict_roundtrip(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| ds = HuggingFaceDataset( | |
| dataset_id="a/b", title="AB", description="desc", | |
| language=["Latin"], tags=["htr"], | |
| ) | |
| d = ds.as_dict() | |
| assert d["dataset_id"] == "a/b" | |
| assert d["language"] == ["Latin"] | |
| def test_hf_url(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| ds = HuggingFaceDataset(dataset_id="CATMuS/medieval", title="CATMuS") | |
| assert ds.hf_url == "https://huggingface.co/datasets/CATMuS/medieval" | |
| def test_as_dict_has_all_keys(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| ds = HuggingFaceDataset(dataset_id="x/y", title="XY") | |
| d = ds.as_dict() | |
| for k in ["dataset_id", "title", "description", "language", "tags", | |
| "license", "size_category", "task", "institution", "downloads", "source"]: | |
| assert k in d, f"Missing: {k}" | |
| def test_default_source(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| ds = HuggingFaceDataset(dataset_id="x/y", title="XY") | |
| assert ds.source == "reference" | |
| def test_from_dict_uses_id_as_fallback_title(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| ds = HuggingFaceDataset.from_dict({"dataset_id": "owner/repo"}) | |
| assert ds.title == "owner/repo" | |
| def test_replace_source_helper(self): | |
| from picarones.adapters.corpus.huggingface import HuggingFaceDataset | |
| ds = HuggingFaceDataset(dataset_id="x/y", title="XY", source="reference") | |
| ds2 = ds._replace_source("api") | |
| assert ds2.source == "api" | |
| assert ds.source == "reference" # original unchanged | |
| # =========================================================================== | |
| # TestHuggingFaceImporter | |
| # =========================================================================== | |
| class TestHuggingFaceImporter: | |
| def test_search_returns_list(self, hf_importer): | |
| results = hf_importer.search() | |
| assert isinstance(results, list) | |
| assert len(results) > 0 | |
| def test_search_reference_datasets(self, hf_importer): | |
| results = hf_importer.search(use_reference=True) | |
| assert len(results) >= 5 | |
| def test_search_query_filter(self, hf_importer): | |
| results = hf_importer.search(query="RIMES", use_reference=True) | |
| assert len(results) >= 1 | |
| assert any("RIMES" in ds.title or "rimes" in ds.dataset_id.lower() for ds in results) | |
| def test_search_language_filter(self, hf_importer): | |
| results = hf_importer.search(language="French", use_reference=True) | |
| assert len(results) > 0 | |
| def test_search_tag_filter(self, hf_importer): | |
| results = hf_importer.search(tags=["historical"], use_reference=True) | |
| assert isinstance(results, list) | |
| def test_search_limit(self, hf_importer): | |
| results = hf_importer.search(limit=3) | |
| assert len(results) <= 3 | |
| def test_search_no_api_fallback(self, hf_importer): | |
| # Même sans accès réseau, on a les datasets de référence | |
| results = hf_importer.search(query="medieval", use_reference=True) | |
| assert len(results) >= 1 | |
| def test_import_creates_meta(self, tmp_path, hf_importer): | |
| result = hf_importer.import_dataset("CATMuS/medieval", output_dir=tmp_path, max_samples=5) | |
| assert Path(result["metadata_file"]).exists() | |
| def test_import_meta_content(self, tmp_path, hf_importer): | |
| result = hf_importer.import_dataset("CATMuS/medieval", output_dir=tmp_path, max_samples=5) | |
| meta = json.loads(Path(result["metadata_file"]).read_text()) | |
| assert meta["dataset_id"] == "CATMuS/medieval" | |
| assert meta["source"] == "huggingface" | |
| def test_import_returns_dict_keys(self, tmp_path, hf_importer): | |
| result = hf_importer.import_dataset("x/y", output_dir=tmp_path, max_samples=5) | |
| for k in ["dataset_id", "output_dir", "files_imported", "metadata_file"]: | |
| assert k in result | |
| # =========================================================================== | |
| # TestHuggingFaceReferenceData | |
| # =========================================================================== | |
| class TestHuggingFaceReferenceData: | |
| def test_reference_datasets_loaded(self): | |
| from picarones.adapters.corpus.huggingface import _REFERENCE_DATASETS | |
| assert len(_REFERENCE_DATASETS) >= 5 | |
| def test_catmus_present(self): | |
| from picarones.adapters.corpus.huggingface import _REFERENCE_DATASETS | |
| ids = [d["dataset_id"] for d in _REFERENCE_DATASETS] | |
| assert any("CATMuS" in did or "catmus" in did.lower() for did in ids) | |
| def test_all_have_required_fields(self): | |
| from picarones.adapters.corpus.huggingface import _REFERENCE_DATASETS | |
| for d in _REFERENCE_DATASETS: | |
| assert "dataset_id" in d | |
| assert "title" in d | |
| assert "language" in d | |
| def test_all_are_image_to_text(self): | |
| from picarones.adapters.corpus.huggingface import _REFERENCE_DATASETS | |
| for d in _REFERENCE_DATASETS: | |
| assert d.get("task", "image-to-text") == "image-to-text" | |
| # =========================================================================== | |
| # TestNormalizationProfiles | |
| # =========================================================================== | |
| class TestNormalizationProfiles: | |
| def test_api_returns_profiles(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert "profiles" in d | |
| assert len(d["profiles"]) >= 4 | |
| def test_nfc_profile_present(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| ids = [p["id"] for p in r.json()["profiles"]] | |
| assert "nfc" in ids | |
| def test_medieval_french_present(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| ids = [p["id"] for p in r.json()["profiles"]] | |
| assert "medieval_french" in ids | |
| def test_profiles_have_required_fields(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| for p in r.json()["profiles"]: | |
| assert "id" in p | |
| assert "name" in p | |
| assert "description" in p | |
| assert "caseless" in p | |
| assert "diplomatic_rules" in p | |
| def test_caseless_profile(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| profiles = {p["id"]: p for p in r.json()["profiles"]} | |
| assert "caseless" in profiles | |
| assert profiles["caseless"]["caseless"] is True | |
| def test_medieval_french_has_diplomatic_rules(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| profiles = {p["id"]: p for p in r.json()["profiles"]} | |
| assert profiles["medieval_french"]["diplomatic_rules"] > 0 | |
| def test_nfc_no_diplomatic_rules(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| profiles = {p["id"]: p for p in r.json()["profiles"]} | |
| assert profiles["nfc"]["diplomatic_rules"] == 0 | |
| def test_early_modern_french_present(self, client): | |
| r = client.get("/api/normalization/profiles") | |
| ids = [p["id"] for p in r.json()["profiles"]] | |
| assert "early_modern_french" in ids | |
| # =========================================================================== | |
| # TestFastAPIStatus | |
| # =========================================================================== | |
| class TestFastAPIStatus: | |
| def test_status_200(self, client): | |
| r = client.get("/api/status") | |
| assert r.status_code == 200 | |
| def test_status_has_version(self, client): | |
| r = client.get("/api/status") | |
| d = r.json() | |
| assert "version" in d | |
| assert d["version"] | |
| def test_status_ok(self, client): | |
| r = client.get("/api/status") | |
| assert r.json()["status"] == "ok" | |
| # =========================================================================== | |
| # TestFastAPIEngines | |
| # =========================================================================== | |
| class TestFastAPIEngines: | |
| def test_engines_200(self, client): | |
| r = client.get("/api/engines") | |
| assert r.status_code == 200 | |
| def test_engines_has_engines_key(self, client): | |
| r = client.get("/api/engines") | |
| assert "engines" in r.json() | |
| def test_engines_has_llms_key(self, client): | |
| r = client.get("/api/engines") | |
| assert "llms" in r.json() | |
| def test_engines_list_not_empty(self, client): | |
| r = client.get("/api/engines") | |
| assert len(r.json()["engines"]) > 0 | |
| def test_llms_list_not_empty(self, client): | |
| r = client.get("/api/engines") | |
| assert len(r.json()["llms"]) > 0 | |
| def test_tesseract_in_engines(self, client): | |
| r = client.get("/api/engines") | |
| ids = [e["id"] for e in r.json()["engines"]] | |
| assert "tesseract" in ids | |
| def test_ollama_in_llms(self, client): | |
| r = client.get("/api/engines") | |
| ids = [e["id"] for e in r.json()["llms"]] | |
| assert "ollama" in ids | |
| def test_engine_has_required_fields(self, client): | |
| r = client.get("/api/engines") | |
| for eng in r.json()["engines"]: | |
| assert "id" in eng | |
| assert "label" in eng | |
| assert "available" in eng | |
| assert "status" in eng | |
| # =========================================================================== | |
| # TestFastAPICorpusBrowse | |
| # =========================================================================== | |
| class TestFastAPICorpusBrowse: | |
| def test_browse_current_dir(self, client): | |
| r = client.get("/api/corpus/browse?path=.") | |
| assert r.status_code == 200 | |
| def test_browse_has_required_keys(self, client): | |
| r = client.get("/api/corpus/browse?path=.") | |
| d = r.json() | |
| assert "current_path" in d | |
| assert "items" in d | |
| def test_browse_items_are_dirs(self, client, tmp_path): | |
| r = client.get(f"/api/corpus/browse?path={tmp_path}") | |
| assert r.status_code == 200 | |
| assert r.json()["items"] == [] | |
| def test_browse_with_corpus(self, client, tmp_corpus): | |
| r = client.get(f"/api/corpus/browse?path={tmp_corpus.parent}") | |
| assert r.status_code == 200 | |
| items = r.json()["items"] | |
| assert any(i["name"] == tmp_corpus.name for i in items) | |
| def test_browse_404_for_nonexistent(self, client): | |
| r = client.get("/api/corpus/browse?path=/nonexistent/path/xyz") | |
| assert r.status_code == 404 | |
| def test_browse_corpus_gt_count(self, client, tmp_corpus): | |
| r = client.get(f"/api/corpus/browse?path={tmp_corpus.parent}") | |
| items = {i["name"]: i for i in r.json()["items"] if i["is_dir"]} | |
| if tmp_corpus.name in items: | |
| assert items[tmp_corpus.name]["gt_count"] >= 2 | |
| # =========================================================================== | |
| # TestFastAPIReports | |
| # =========================================================================== | |
| class TestFastAPIReports: | |
| def test_reports_200(self, client): | |
| r = client.get("/api/reports") | |
| assert r.status_code == 200 | |
| def test_reports_has_reports_key(self, client): | |
| r = client.get("/api/reports") | |
| assert "reports" in r.json() | |
| def test_reports_returns_list(self, client): | |
| r = client.get("/api/reports") | |
| assert isinstance(r.json()["reports"], list) | |
| def test_reports_finds_existing_html(self, client, tmp_path): | |
| # Crée un rapport HTML fictif | |
| html_file = tmp_path / "test_rapport.html" | |
| html_file.write_text("<html><body>Test rapport</body></html>") | |
| r = client.get(f"/api/reports?reports_dir={tmp_path}") | |
| reports = r.json()["reports"] | |
| assert any(rep["filename"] == "test_rapport.html" for rep in reports) | |
| def test_report_entry_has_fields(self, client, tmp_path): | |
| html_file = tmp_path / "my_report.html" | |
| html_file.write_text("<html></html>") | |
| r = client.get(f"/api/reports?reports_dir={tmp_path}") | |
| rep = next(rep for rep in r.json()["reports"] if rep["filename"] == "my_report.html") | |
| assert "filename" in rep | |
| assert "path" in rep | |
| assert "size_kb" in rep | |
| assert "modified" in rep | |
| assert "url" in rep | |
| def test_reports_dir_outside_roots_rejected(self, client): | |
| """P0 — confinement : un reports_dir hors des racines | |
| autorisées renvoie 403 (pas d'énumération filesystem | |
| arbitraire) au lieu de lister ``/etc``.""" | |
| r = client.get("/api/reports?reports_dir=/etc") | |
| assert r.status_code == 403 | |
| assert "autoris" in r.json()["detail"].lower() | |
| # =========================================================================== | |
| # TestFastAPIHTRUnited | |
| # =========================================================================== | |
| class TestFastAPIHTRUnited: | |
| def test_catalogue_200(self, client): | |
| r = client.get("/api/htr-united/catalogue") | |
| assert r.status_code == 200 | |
| def test_catalogue_has_entries(self, client): | |
| r = client.get("/api/htr-united/catalogue") | |
| d = r.json() | |
| assert "entries" in d | |
| assert len(d["entries"]) >= 4 | |
| def test_catalogue_has_filters(self, client): | |
| r = client.get("/api/htr-united/catalogue") | |
| d = r.json() | |
| assert "available_languages" in d | |
| assert "available_scripts" in d | |
| def test_catalogue_search_query(self, client): | |
| r = client.get("/api/htr-united/catalogue?query=médiéval") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert d["total"] >= 0 # Can be 0 if no match — no error | |
| def test_catalogue_search_language(self, client): | |
| r = client.get("/api/htr-united/catalogue?language=French") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| for e in d["entries"]: | |
| assert any("french" in lg.lower() for lg in e["language"]) | |
| def test_import_valid_entry(self, client, tmp_path): | |
| # Get first NON-EMPTY entry id. Phase 4.4 du chantier | |
| # post-rewrite : le router utilise désormais ``from_remote()`` | |
| # avec fallback ; en mode remote certaines entrées peuvent | |
| # avoir un ``id`` vide (schéma YAML distant évolutif). On | |
| # filtre pour récupérer un id réellement importable. | |
| r = client.get("/api/htr-united/catalogue") | |
| entries = r.json()["entries"] | |
| non_empty = [e for e in entries if e.get("id")] | |
| if not non_empty: | |
| pytest.skip("Catalogue HTR-United sans entrée avec id non-vide") | |
| entry_id = non_empty[0]["id"] | |
| # On patch ``import_htr_united_corpus`` pour éviter le | |
| # téléchargement réseau réel (peut prendre 30s+ par fichier). | |
| with patch( | |
| "picarones.adapters.corpus.htr_united.import_htr_united_corpus", | |
| ) as mock_import: | |
| mock_import.return_value = { | |
| "entry_id": entry_id, | |
| "title": "Test", | |
| "output_dir": str(tmp_path), | |
| "files_imported": 0, | |
| "metadata_file": str(tmp_path / "meta.json"), | |
| } | |
| r2 = client.post("/api/htr-united/import", json={ | |
| "entry_id": entry_id, | |
| "output_dir": str(tmp_path), | |
| "max_samples": 5, | |
| }) | |
| assert r2.status_code == 200, r2.text | |
| assert "entry_id" in r2.json() | |
| def test_import_invalid_entry(self, client, tmp_path): | |
| r = client.post("/api/htr-united/import", json={ | |
| "entry_id": "this-does-not-exist-xyz", | |
| "output_dir": str(tmp_path), | |
| "max_samples": 5, | |
| }) | |
| assert r.status_code == 404 | |
| # =========================================================================== | |
| # TestFastAPIHuggingFace | |
| # =========================================================================== | |
| class TestFastAPIHuggingFace: | |
| def test_search_200(self, client): | |
| r = client.get("/api/huggingface/search") | |
| assert r.status_code == 200 | |
| def test_search_has_datasets(self, client): | |
| r = client.get("/api/huggingface/search") | |
| d = r.json() | |
| assert "datasets" in d | |
| assert d["total"] >= 1 | |
| def test_search_with_query(self, client): | |
| r = client.get("/api/huggingface/search?query=RIMES") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert isinstance(d["datasets"], list) | |
| def test_search_with_language(self, client): | |
| r = client.get("/api/huggingface/search?language=French") | |
| assert r.status_code == 200 | |
| def test_import_creates_meta(self, client, tmp_path): | |
| r = client.post("/api/huggingface/import", json={ | |
| "dataset_id": "CATMuS/medieval", | |
| "output_dir": str(tmp_path), | |
| "split": "train", | |
| "max_samples": 5, | |
| }) | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert Path(d["metadata_file"]).exists() | |
| def test_import_returns_keys(self, client, tmp_path): | |
| r = client.post("/api/huggingface/import", json={ | |
| "dataset_id": "test/dataset", | |
| "output_dir": str(tmp_path), | |
| }) | |
| assert r.status_code == 200 | |
| for k in ["dataset_id", "output_dir", "files_imported", "metadata_file"]: | |
| assert k in r.json() | |
| # =========================================================================== | |
| # TestFastAPIBenchmark | |
| # =========================================================================== | |
| class TestFastAPIBenchmark: | |
| # Phase 4.2 audit code-quality (2026-05) : tous les tests migrés | |
| # depuis ``/api/benchmark/start`` (legacy v1.x, supprimé v2.0) | |
| # vers ``/api/benchmark/run`` avec format ``competitors``. | |
| def _tesseract_competitor(self): | |
| return {"name": "tesseract", "engine_name": "tesseract"} | |
| def test_start_missing_corpus(self, client): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": "/nonexistent/path/xyz", | |
| "competitors": [self._tesseract_competitor()], | |
| }) | |
| assert r.status_code == 400 | |
| def test_start_valid_corpus(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [self._tesseract_competitor()], | |
| }) | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert "job_id" in d | |
| assert d["status"] in ("pending", "running") | |
| def test_status_nonexistent_job(self, client): | |
| r = client.get("/api/benchmark/nonexistent-job-id/status") | |
| assert r.status_code == 404 | |
| def test_status_valid_job(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [self._tesseract_competitor()], | |
| }) | |
| job_id = r.json()["job_id"] | |
| r2 = client.get(f"/api/benchmark/{job_id}/status") | |
| assert r2.status_code == 200 | |
| d = r2.json() | |
| assert d["job_id"] == job_id | |
| assert "status" in d | |
| assert "progress" in d | |
| def test_cancel_nonexistent_job(self, client): | |
| r = client.post("/api/benchmark/nonexistent-id/cancel") | |
| assert r.status_code == 404 | |
| def test_cancel_valid_job(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [self._tesseract_competitor()], | |
| }) | |
| job_id = r.json()["job_id"] | |
| r2 = client.post(f"/api/benchmark/{job_id}/cancel") | |
| assert r2.status_code == 200 | |
| def test_job_status_fields(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [self._tesseract_competitor()], | |
| }) | |
| job_id = r.json()["job_id"] | |
| r2 = client.get(f"/api/benchmark/{job_id}/status") | |
| d = r2.json() | |
| for k in ["job_id", "status", "progress", "total_docs", "processed_docs", "output_path"]: | |
| assert k in d, f"Missing key: {k}" | |
| def test_stream_nonexistent_job(self, client): | |
| r = client.get("/api/benchmark/nonexistent-id/stream") | |
| assert r.status_code == 404 | |
| # =========================================================================== | |
| # TestFastAPIHTML | |
| # =========================================================================== | |
| class TestFastAPIHTML: | |
| def test_root_200(self, client): | |
| r = client.get("/") | |
| assert r.status_code == 200 | |
| def test_root_is_html(self, client): | |
| r = client.get("/") | |
| assert "text/html" in r.headers["content-type"] | |
| def test_html_has_picarones_title(self, client): | |
| r = client.get("/") | |
| assert "Picarones" in r.text | |
| def test_html_has_nav_sections(self, client): | |
| r = client.get("/") | |
| for section in ["benchmark", "reports", "engines", "import"]: | |
| assert section in r.text.lower() | |
| def test_html_has_french_content(self, client): | |
| r = client.get("/") | |
| assert "Moteurs" in r.text or "moteurs" in r.text.lower() | |
| # =========================================================================== | |
| # TestFastAPIReportServe | |
| # =========================================================================== | |
| class TestFastAPIReportServe: | |
| def test_serve_nonexistent_report(self, client): | |
| r = client.get("/reports/nonexistent_report.html") | |
| assert r.status_code == 404 | |
| def test_serve_existing_report(self, client, tmp_path, monkeypatch): | |
| # Crée un rapport HTML dans le répertoire courant | |
| import os | |
| orig_cwd = os.getcwd() | |
| os.chdir(tmp_path) | |
| try: | |
| html_file = tmp_path / "test_serve.html" | |
| html_file.write_text("<html><body>Test</body></html>") | |
| r = client.get("/reports/test_serve.html") | |
| assert r.status_code == 200 | |
| finally: | |
| os.chdir(orig_cwd) | |
| def test_serve_non_html_rejected(self, client): | |
| # Tente de servir un .py — doit retourner 404 (extension non-html) | |
| r = client.get("/reports/malicious.py") | |
| assert r.status_code == 404 | |
| def test_serve_report_content_type(self, client, tmp_path): | |
| import os | |
| orig_cwd = os.getcwd() | |
| os.chdir(tmp_path) | |
| try: | |
| html_file = tmp_path / "report_ct.html" | |
| html_file.write_text("<html><body>Content</body></html>") | |
| r = client.get("/reports/report_ct.html") | |
| if r.status_code == 200: | |
| assert "html" in r.headers.get("content-type", "").lower() | |
| finally: | |
| os.chdir(orig_cwd) | |
| # =========================================================================== | |
| # TestCLIServeCommand | |
| # =========================================================================== | |
| class TestCLIServeCommand: | |
| def test_serve_command_registered(self): | |
| from picarones.interfaces.cli import cli | |
| commands = cli.commands | |
| assert "serve" in commands | |
| def test_serve_help_text(self): | |
| from picarones.interfaces.cli import cli | |
| runner = CliRunner() | |
| result = runner.invoke(cli, ["serve", "--help"]) | |
| assert result.exit_code == 0 | |
| assert "serve" in result.output.lower() or "localhost" in result.output.lower() | |
| def test_serve_default_port_in_help(self): | |
| from picarones.interfaces.cli import cli | |
| runner = CliRunner() | |
| result = runner.invoke(cli, ["serve", "--help"]) | |
| assert "8000" in result.output | |
| def test_serve_help_has_port_option(self): | |
| from picarones.interfaces.cli import cli | |
| runner = CliRunner() | |
| result = runner.invoke(cli, ["serve", "--help"]) | |
| assert "--port" in result.output | |
| def test_serve_missing_uvicorn_exits_gracefully(self): | |
| from picarones.interfaces.cli import cli | |
| runner = CliRunner() | |
| # Avec uvicorn installé, cela démarrerait le serveur — on teste juste que | |
| # la commande existe et est invocable (pas qu'elle démare le serveur) | |
| # On vérifie juste le help | |
| result = runner.invoke(cli, ["serve", "--help"]) | |
| assert result.exit_code == 0 | |
| # =========================================================================== | |
| # TestRunnerProgressCallback | |
| # =========================================================================== | |
| class TestRunnerProgressCallback: | |
| def test_callback_signature_accepted(self): | |
| """Phase B3-final — ``RunOrchestrator.execute_preset`` accepte | |
| un kwarg ``progress_callback``.""" | |
| import inspect | |
| from picarones.app.services import RunOrchestrator | |
| sig = inspect.signature(RunOrchestrator.execute_preset) | |
| assert "progress_callback" in sig.parameters | |
| def test_callback_is_optional(self): | |
| """``progress_callback`` est optionnel (valeur par défaut None).""" | |
| import inspect | |
| from picarones.app.services import RunOrchestrator | |
| sig = inspect.signature(RunOrchestrator.execute_preset) | |
| param = sig.parameters["progress_callback"] | |
| assert param.default is None | |
| def _make_mock_adapter(self, name: str = "mock"): | |
| """Sprint H.2.b — mock canonique ``BaseOCRAdapter``.""" | |
| from picarones.adapters.ocr.base import BaseOCRAdapter | |
| from picarones.domain.artifacts import Artifact, ArtifactType | |
| class _MockAdapter(BaseOCRAdapter): | |
| def __init__(self, n: str) -> None: | |
| self._n = n | |
| def name(self) -> str: | |
| return self._n | |
| def execute(self, inputs, params, context): | |
| from pathlib import Path | |
| out_dir = Path(context.workspace_uri) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = out_dir / f"{context.document_id}_mock.txt" | |
| out_path.write_text("texte mock", encoding="utf-8") | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:{self._n}:raw_text", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| produced_by_step="ocr", | |
| uri=str(out_path), | |
| ), | |
| } | |
| return _MockAdapter(name) | |
| def test_callback_called_with_mock_engine(self, tmp_corpus): | |
| """Le callback est appelé pour chaque document.""" | |
| from picarones.evaluation.corpus import load_corpus_from_directory | |
| from tests._migration_helpers import run_via_orchestrator | |
| corpus = load_corpus_from_directory(str(tmp_corpus)) | |
| calls = [] | |
| def my_callback(engine_name, doc_idx, doc_id): | |
| calls.append((engine_name, doc_idx, doc_id)) | |
| run_via_orchestrator( | |
| corpus, [self._make_mock_adapter()], progress_callback=my_callback, | |
| ) | |
| assert len(calls) == len(corpus), f"Expected {len(corpus)} calls, got {len(calls)}" | |
| def test_callback_receives_engine_name(self, tmp_corpus): | |
| """Le callback reçoit le nom du moteur.""" | |
| from picarones.evaluation.corpus import load_corpus_from_directory | |
| from tests._migration_helpers import run_via_orchestrator | |
| corpus = load_corpus_from_directory(str(tmp_corpus)) | |
| engine_names = [] | |
| def my_callback(engine_name, doc_idx, doc_id): | |
| engine_names.append(engine_name) | |
| run_via_orchestrator( | |
| corpus, [self._make_mock_adapter("test_engine_name")], | |
| progress_callback=my_callback, | |
| ) | |
| assert all(n == "test_engine_name" for n in engine_names) | |
| def test_callback_exception_does_not_crash(self, tmp_corpus): | |
| """Une exception dans le callback ne plante pas le benchmark.""" | |
| from picarones.evaluation.corpus import load_corpus_from_directory | |
| from tests._migration_helpers import run_via_orchestrator | |
| corpus = load_corpus_from_directory(str(tmp_corpus)) | |
| def bad_callback(engine_name, doc_idx, doc_id): | |
| raise RuntimeError("Callback error!") | |
| result = run_via_orchestrator( | |
| corpus, [self._make_mock_adapter()], progress_callback=bad_callback, | |
| ) | |
| assert result is not None | |
| # =========================================================================== | |
| # TestFastAPIModels — GET /api/models/{provider} | |
| # =========================================================================== | |
| class TestFastAPIModels: | |
| def test_models_tesseract_200(self, client): | |
| r = client.get("/api/models/tesseract") | |
| assert r.status_code == 200 | |
| def test_models_tesseract_has_models_list(self, client): | |
| r = client.get("/api/models/tesseract") | |
| d = r.json() | |
| assert "models" in d | |
| assert isinstance(d["models"], list) | |
| def test_models_tesseract_has_provider_field(self, client): | |
| r = client.get("/api/models/tesseract") | |
| assert r.json()["provider"] == "tesseract" | |
| def test_models_tesseract_has_languages(self, client): | |
| r = client.get("/api/models/tesseract") | |
| models = r.json()["models"] | |
| # Tesseract est installé dans le CI, au moins fra ou eng doit être présent | |
| assert len(models) > 0 | |
| def test_models_google_vision_200(self, client): | |
| r = client.get("/api/models/google_vision") | |
| assert r.status_code == 200 | |
| model_ids = r.json().get("model_ids", r.json()["models"]) | |
| assert "document_text_detection" in model_ids | |
| def test_models_azure_doc_intel_200(self, client): | |
| r = client.get("/api/models/azure_doc_intel") | |
| assert r.status_code == 200 | |
| model_ids = r.json().get("model_ids", r.json()["models"]) | |
| assert "prebuilt-document" in model_ids | |
| def test_models_ollama_200(self, client): | |
| r = client.get("/api/models/ollama") | |
| assert r.status_code == 200 | |
| assert isinstance(r.json()["models"], list) | |
| def test_models_prompts_200(self, client): | |
| r = client.get("/api/models/prompts") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert isinstance(d["models"], list) | |
| assert len(d["models"]) >= 5 # 16 prompts intégrés (seuil souple) | |
| def test_models_prompts_are_txt_files(self, client): | |
| r = client.get("/api/models/prompts") | |
| for name in r.json()["models"]: | |
| assert name.endswith(".txt") | |
| def test_models_openai_no_key_returns_empty(self, client): | |
| # Sans clé, doit renvoyer liste vide + champ error | |
| with patch.dict(os.environ, {k: v for k, v in os.environ.items() if k != "OPENAI_API_KEY"}, clear=True): | |
| r = client.get("/api/models/openai") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert d["models"] == [] or "error" in d | |
| def test_models_anthropic_no_key_returns_empty(self, client): | |
| with patch.dict(os.environ, {k: v for k, v in os.environ.items() if k != "ANTHROPIC_API_KEY"}, clear=True): | |
| r = client.get("/api/models/anthropic") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert d["models"] == [] or "error" in d | |
| def test_models_unknown_provider_404(self, client): | |
| r = client.get("/api/models/provider_xyz_unknown") | |
| assert r.status_code == 404 | |
| def test_models_mistral_ocr_no_key_returns_empty(self, client): | |
| """Sans MISTRAL_API_KEY, /api/models/mistral_ocr renvoie liste vide + erreur.""" | |
| with patch.dict(os.environ, {k: v for k, v in os.environ.items() if k != "MISTRAL_API_KEY"}, clear=True): | |
| r = client.get("/api/models/mistral_ocr") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert d["models"] == [] | |
| assert "error" in d | |
| def test_models_mistral_ocr_with_key_uses_fallback_on_network_error(self, client): | |
| """Avec une clé invalide, l'endpoint renvoie les modèles de fallback.""" | |
| with patch.dict(os.environ, {"MISTRAL_API_KEY": "test-key-invalid"}): | |
| with patch("urllib.request.urlopen", side_effect=Exception("connection refused")): | |
| r = client.get("/api/models/mistral_ocr") | |
| assert r.status_code == 200 | |
| d = r.json() | |
| models = d.get("model_ids", d["models"]) | |
| assert isinstance(models, list) | |
| assert len(models) > 0 | |
| # Les modèles de fallback doivent contenir pixtral ou mistral-ocr | |
| # models peut contenir des strings ou des dicts | |
| model_ids = " ".join( | |
| m if isinstance(m, str) else m.get("id", str(m)) for m in models | |
| ).lower() | |
| assert "pixtral" in model_ids or "mistral-ocr" in model_ids | |
| def test_models_mistral_ocr_filters_vision_only(self, client): | |
| """Avec une réponse API mockée, seuls les modèles vision (pixtral/mistral-ocr) sont renvoyés.""" | |
| fake_response = { | |
| "data": [ | |
| {"id": "mistral-ocr-latest"}, | |
| {"id": "pixtral-12b-2409"}, | |
| {"id": "pixtral-large-latest"}, | |
| {"id": "mistral-large-latest"}, # LLM text-only → doit être exclu | |
| {"id": "mistral-small-latest"}, # idem | |
| ] | |
| } | |
| import json as _json | |
| class _FakeHTTPResponse: | |
| def read(self): return _json.dumps(fake_response).encode() | |
| def __enter__(self): return self | |
| def __exit__(self, *a): pass | |
| with patch.dict(os.environ, {"MISTRAL_API_KEY": "test-key"}): | |
| with patch("urllib.request.urlopen", return_value=_FakeHTTPResponse()): | |
| r = client.get("/api/models/mistral_ocr") | |
| assert r.status_code == 200 | |
| model_ids = r.json().get("model_ids", r.json()["models"]) | |
| # model_ids peut contenir des strings ou des dicts | |
| ids = [m if isinstance(m, str) else m.get("id", str(m)) for m in model_ids] | |
| assert "mistral-ocr-latest" in ids | |
| assert "pixtral-12b-2409" in ids | |
| assert "pixtral-large-latest" in ids | |
| assert "mistral-large-latest" not in ids | |
| assert "mistral-small-latest" not in ids | |
| # =========================================================================== | |
| # TestFastAPIBenchmarkRun — POST /api/benchmark/run | |
| # =========================================================================== | |
| class TestFastAPIBenchmarkRun: | |
| def test_run_400_missing_corpus(self, client): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": "/nonexistent/path/xyz", | |
| "competitors": [{"ocr_engine": "tesseract", "ocr_model": "fra"}], | |
| }) | |
| assert r.status_code == 400 | |
| def test_run_400_no_competitors(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [], | |
| }) | |
| # Pydantic ``min_length=1`` rejette en 422 Unprocessable Entity | |
| # (code HTTP standard pour payload invalide). | |
| assert r.status_code == 422 | |
| def test_run_missing_ocr_engine_accepted(self, client, tmp_corpus): | |
| """ocr_engine est désormais optionnel (vide = post-correction corpus).""" | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [{"ocr_model": "fra"}], # ocr_engine vide = valide | |
| }) | |
| # Accepté par Pydantic (200), mais le benchmark échouera à l'exécution | |
| # car ni ocr_engine ni llm_provider ne sont définis | |
| assert r.status_code == 200 | |
| def test_run_returns_job_id(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [{"ocr_engine": "tesseract", "ocr_model": "fra"}], | |
| }) | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert "job_id" in d | |
| assert "status" in d | |
| def test_run_job_status_reachable(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [{"ocr_engine": "tesseract", "ocr_model": "fra"}], | |
| }) | |
| job_id = r.json()["job_id"] | |
| r2 = client.get(f"/api/benchmark/{job_id}/status") | |
| assert r2.status_code == 200 | |
| d = r2.json() | |
| assert d["job_id"] == job_id | |
| def test_run_with_named_competitor(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [{"name": "Mon Tesseract", "ocr_engine": "tesseract", "ocr_model": "fra"}], | |
| }) | |
| assert r.status_code == 200 | |
| def test_run_multiple_competitors(self, client, tmp_corpus): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [ | |
| {"ocr_engine": "tesseract", "ocr_model": "fra"}, | |
| {"ocr_engine": "tesseract", "ocr_model": "eng"}, | |
| ], | |
| }) | |
| assert r.status_code == 200 | |
| def test_run_with_output_options(self, client, tmp_corpus, tmp_path): | |
| r = client.post("/api/benchmark/run", json={ | |
| "corpus_path": str(tmp_corpus), | |
| "competitors": [{"ocr_engine": "tesseract", "ocr_model": "fra"}], | |
| "output_dir": str(tmp_path), | |
| "report_name": "test_run_report", | |
| }) | |
| assert r.status_code == 200 | |
| # =========================================================================== | |
| # TestFastAPIEnginesExtended — champs ajoutés dans api_engines() | |
| # =========================================================================== | |
| class TestFastAPIEnginesExtended: | |
| def test_tesseract_has_langs_field(self, client): | |
| r = client.get("/api/engines") | |
| tess = next(e for e in r.json()["engines"] if e["id"] == "tesseract") | |
| assert "langs" in tess | |
| assert isinstance(tess["langs"], list) | |
| def test_mistral_ocr_in_engines(self, client): | |
| r = client.get("/api/engines") | |
| ids = [e["id"] for e in r.json()["engines"]] | |
| assert "mistral_ocr" in ids | |
| def test_google_vision_in_engines(self, client): | |
| r = client.get("/api/engines") | |
| ids = [e["id"] for e in r.json()["engines"]] | |
| assert "google_vision" in ids | |
| def test_azure_doc_intel_in_engines(self, client): | |
| r = client.get("/api/engines") | |
| ids = [e["id"] for e in r.json()["engines"]] | |
| assert "azure_doc_intel" in ids | |
| def test_cloud_engines_have_key_env(self, client): | |
| r = client.get("/api/engines") | |
| for eng in r.json()["engines"]: | |
| if eng.get("type") == "ocr_cloud": | |
| assert "key_env" in eng | |
| def test_mistral_llm_label_updated(self, client): | |
| r = client.get("/api/engines") | |
| mistral_llm = next(e for e in r.json()["llms"] if e["id"] == "mistral") | |
| assert "LLM" in mistral_llm["label"] | |
| # Section retirée au sprint H.2.d : ``MistralOCREngine`` (legacy) | |
| # n'existe plus. Les tests équivalents pour ``MistralOCRAdapter`` | |
| # (canonique) vivent dans ``tests/adapters/ocr/test_sprint_a14_s32_mistral_ocr_adapter.py`` | |
| # et ``tests/adapters/ocr/test_sprint_a14_s53_mistral_normalize.py``. | |
| # =========================================================================== | |
| # TestFastAPICorpusUpload — POST /api/corpus/upload, GET/DELETE uploads | |
| # =========================================================================== | |
| class TestFastAPICorpusUpload: | |
| def tmp_corpus_zip(self, tmp_path): | |
| """Crée un ZIP contenant 2 paires image/.gt.txt.""" | |
| import io | |
| import zipfile | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("page001.jpg", _MINIMAL_JPEG_BYTES) | |
| zf.writestr("page001.gt.txt", "Texte de la page 1") | |
| zf.writestr("page002.png", _MINIMAL_PNG_BYTES) | |
| zf.writestr("page002.gt.txt", "Texte de la page 2") | |
| buf.seek(0) | |
| return buf.getvalue() | |
| def tmp_zip_missing_gt(self): | |
| """ZIP avec une image sans GT.""" | |
| import io | |
| import zipfile | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("page001.jpg", _MINIMAL_JPEG_BYTES) | |
| zf.writestr("page001.gt.txt", "GT ok") | |
| zf.writestr("page002.png", _MINIMAL_PNG_BYTES) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| def test_upload_zip_returns_200(self, client, tmp_corpus_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| assert r.status_code == 200 | |
| def test_upload_zip_doc_count(self, client, tmp_corpus_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| d = r.json() | |
| assert d["doc_count"] == 2 | |
| def test_upload_zip_has_corpus_id(self, client, tmp_corpus_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| d = r.json() | |
| assert "corpus_id" in d | |
| assert "corpus_path" in d | |
| def test_upload_zip_has_pairs(self, client, tmp_corpus_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| d = r.json() | |
| assert len(d["pairs"]) == 2 | |
| def test_upload_zip_missing_gt_reported(self, client, tmp_zip_missing_gt): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_zip_missing_gt, "application/zip"))], | |
| ) | |
| assert r.status_code == 200 | |
| d = r.json() | |
| assert d["has_missing_gt"] is True | |
| assert len(d["missing_gt"]) == 1 | |
| def test_upload_individual_files(self, client): | |
| # Sprint 24 — la validation Pillow exige une image décodable. | |
| import io | |
| from PIL import Image | |
| buf = io.BytesIO() | |
| Image.new("RGB", (10, 10), color=(120, 120, 120)).save(buf, format="JPEG") | |
| files = [ | |
| ("files", ("img001.jpg", buf.getvalue(), "image/jpeg")), | |
| ("files", ("img001.gt.txt", b"Texte GT", "text/plain")), | |
| ] | |
| r = client.post("/api/corpus/upload", files=files) | |
| assert r.status_code == 200 | |
| assert r.json()["doc_count"] == 1 | |
| def test_upload_empty_zip_returns_422(self, client): | |
| import io | |
| import zipfile | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("readme.txt", "no images here") | |
| buf.seek(0) | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("empty.zip", buf.getvalue(), "application/zip"))], | |
| ) | |
| assert r.status_code == 422 | |
| def test_list_uploads_returns_list(self, client): | |
| r = client.get("/api/corpus/uploads") | |
| assert r.status_code == 200 | |
| assert "uploads" in r.json() | |
| def test_list_uploads_includes_uploaded_corpus(self, client, tmp_corpus_zip): | |
| client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| r = client.get("/api/corpus/uploads") | |
| uploads = r.json()["uploads"] | |
| assert len(uploads) >= 1 | |
| assert all("corpus_path" in u for u in uploads) | |
| def test_delete_corpus(self, client, tmp_corpus_zip): | |
| upload_r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| corpus_id = upload_r.json()["corpus_id"] | |
| del_r = client.delete(f"/api/corpus/uploads/{corpus_id}") | |
| assert del_r.status_code == 200 | |
| assert del_r.json()["deleted"] == corpus_id | |
| def test_delete_nonexistent_corpus_returns_404(self, client): | |
| r = client.delete("/api/corpus/uploads/nonexistent-id-xyz") | |
| assert r.status_code == 404 | |
| def test_delete_path_traversal_returns_400(self, client): | |
| # corpus_id containing ".." (without slash — FastAPI strips slashes from path params) | |
| r = client.delete("/api/corpus/uploads/..malicious..") | |
| assert r.status_code in (400, 404) | |
| # --- ALTO XML --- | |
| def alto_xml_bytes(self): | |
| """Contenu d'un fichier ALTO XML minimal valide.""" | |
| return ( | |
| b'<?xml version="1.0" encoding="UTF-8"?>' | |
| b'<alto xmlns="http://www.loc.gov/standards/alto/ns-v4#">' | |
| b"<Layout><Page><PrintSpace>" | |
| b"<TextBlock><TextLine>" | |
| b'<String CONTENT="Bonjour"/>' | |
| b'<String CONTENT="monde"/>' | |
| b"</TextLine></TextBlock>" | |
| b"</PrintSpace></Page></Layout>" | |
| b"</alto>" | |
| ) | |
| def tmp_alto_zip(self, alto_xml_bytes): | |
| """ZIP contenant une paire image + ALTO XML.""" | |
| import io | |
| import zipfile | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("page001.png", _MINIMAL_PNG_BYTES) | |
| zf.writestr("page001.xml", alto_xml_bytes) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| def test_upload_alto_zip_returns_200(self, client, tmp_alto_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_alto_zip, "application/zip"))], | |
| ) | |
| assert r.status_code == 200 | |
| def test_upload_alto_zip_doc_count(self, client, tmp_alto_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_alto_zip, "application/zip"))], | |
| ) | |
| assert r.json()["doc_count"] == 1 | |
| def test_upload_alto_zip_format(self, client, tmp_alto_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_alto_zip, "application/zip"))], | |
| ) | |
| d = r.json() | |
| assert d["gt_format"] == "ALTO XML" | |
| assert d["pairs"][0]["gt_format"] == "ALTO XML" | |
| def test_upload_alto_individual_files(self, client, alto_xml_bytes): | |
| # Sprint 24 — la validation Pillow exige un PNG complet. | |
| import io | |
| from PIL import Image | |
| buf = io.BytesIO() | |
| Image.new("RGB", (10, 10), color=(120, 120, 120)).save(buf, format="PNG") | |
| files = [ | |
| ("files", ("img001.png", buf.getvalue(), "image/png")), | |
| ("files", ("img001.xml", alto_xml_bytes, "application/xml")), | |
| ] | |
| r = client.post("/api/corpus/upload", files=files) | |
| assert r.status_code == 200 | |
| assert r.json()["doc_count"] == 1 | |
| assert r.json()["gt_format"] == "ALTO XML" | |
| def test_alto_text_extraction(self, alto_xml_bytes): | |
| """_detect_xml_gt extrait correctement le texte depuis un ALTO XML.""" | |
| from picarones.interfaces.web.corpus_utils import detect_xml_gt as _detect_xml_gt | |
| result = _detect_xml_gt(alto_xml_bytes) | |
| assert result is not None | |
| fmt, text = result | |
| assert fmt == "ALTO XML" | |
| assert "Bonjour" in text | |
| assert "monde" in text | |
| # --- PAGE XML --- | |
| def page_xml_bytes(self): | |
| """Contenu d'un fichier PAGE XML minimal valide.""" | |
| return ( | |
| b'<?xml version="1.0" encoding="UTF-8"?>' | |
| b'<PcGts xmlns="http://schema.primaresearch.org/PAGE/gts/pagecontent/2013-07-15">' | |
| b"<Page><TextRegion><TextLine>" | |
| b"<TextEquiv><Unicode>Texte de la ligne</Unicode></TextEquiv>" | |
| b"</TextLine></TextRegion></Page>" | |
| b"</PcGts>" | |
| ) | |
| def tmp_page_zip(self, page_xml_bytes): | |
| """ZIP contenant une paire image + PAGE XML.""" | |
| import io | |
| import zipfile | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("page002.png", _MINIMAL_PNG_BYTES) | |
| zf.writestr("page002.xml", page_xml_bytes) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| def test_upload_page_zip_returns_200(self, client, tmp_page_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_page_zip, "application/zip"))], | |
| ) | |
| assert r.status_code == 200 | |
| def test_upload_page_zip_format(self, client, tmp_page_zip): | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_page_zip, "application/zip"))], | |
| ) | |
| d = r.json() | |
| assert d["gt_format"] == "PAGE XML" | |
| assert d["pairs"][0]["gt_format"] == "PAGE XML" | |
| def test_page_text_extraction(self, page_xml_bytes): | |
| """_detect_xml_gt extrait correctement le texte depuis un PAGE XML.""" | |
| from picarones.interfaces.web.corpus_utils import detect_xml_gt as _detect_xml_gt | |
| result = _detect_xml_gt(page_xml_bytes) | |
| assert result is not None | |
| fmt, text = result | |
| assert fmt == "PAGE XML" | |
| assert "Texte de la ligne" in text | |
| # --- Texte brut --- | |
| def test_upload_plain_txt_format_reported(self, client, tmp_corpus_zip): | |
| """Un corpus .gt.txt classique doit indiquer 'texte brut' dans le résumé.""" | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", tmp_corpus_zip, "application/zip"))], | |
| ) | |
| assert r.status_code == 200 | |
| assert r.json()["gt_format"] == "texte brut" | |
| # --- XML inconnu ignoré --- | |
| def test_unknown_xml_not_valid_pair(self, client): | |
| """Un XML non ALTO/PAGE ne crée pas de paire valide.""" | |
| import io | |
| import zipfile | |
| unknown_xml = b'<?xml version="1.0"?><root><item>foo</item></root>' | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("pageX.png", _MINIMAL_PNG_BYTES) | |
| zf.writestr("pageX.xml", unknown_xml) | |
| buf.seek(0) | |
| r = client.post( | |
| "/api/corpus/upload", | |
| files=[("files", ("corpus.zip", buf.getvalue(), "application/zip"))], | |
| ) | |
| assert r.status_code == 422 | |