| from pathlib import Path |
| import shutil |
|
|
| from pluto.doc_index import ChunkMeta, DocIndex |
|
|
|
|
| def test_doc_index_persists_failed_processing_state(): |
| base_dir = Path(__file__).resolve().parent / "output" |
| base_dir.mkdir(parents=True, exist_ok=True) |
| temp_dir = base_dir / "doc-index-test" |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
| try: |
| persist_path = temp_dir / "doc_index.json" |
| index = DocIndex(persist_path=persist_path) |
| index.register_doc( |
| doc_id="paper", |
| filename="paper.md", |
| chunks=["chunk text"], |
| chunk_meta=[ChunkMeta(chunk_id="C0", chunk_type="text", mode="MODE_REASONING")], |
| ) |
|
|
| index.mark_processing("paper") |
| index.mark_failed("paper", "Invalid API key") |
|
|
| reloaded = DocIndex(persist_path=persist_path) |
| assert reloaded.get_status("paper") == "failed" |
| assert reloaded.get_last_error("paper") == "Invalid API key" |
| assert not reloaded.is_processed("paper") |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|