| |
| """ |
| pluto/server.py β FastAPI server bridging pipeline <-> web UI. |
| |
| Endpoints: |
| POST /api/run β start pipeline, return final JSON |
| POST /api/upload β upload files to the corpus |
| GET /api/corpus β list corpus documents |
| GET /api/stream β SSE stream of pipeline progress |
| GET / β serve the frontend dashboard |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| from functools import partial |
| import json |
| import os |
| import shutil |
| import tempfile |
| from uuid import uuid4 |
| from pathlib import Path |
| from typing import Any |
|
|
| from fastapi.encoders import jsonable_encoder |
| from fastapi import FastAPI, File, Request, UploadFile |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
|
|
| from pluto.pipeline import PipelineRunner |
| from pluto.extraction_cache import ExtractionCache |
| from pluto.doc_index import DocIndex |
|
|
| app = FastAPI(title="Pluto Pipeline", version="1.0.0") |
|
|
| |
|
|
| session_queues: dict[str, asyncio.Queue] = {} |
| session_results: dict[str, dict] = {} |
| session_cleanup_tasks: dict[str, asyncio.Task] = {} |
| SESSION_CLEANUP_DELAY_SECONDS = 300 |
|
|
| FRONTEND_DIR = Path(__file__).parent.parent / "frontend" |
| CORPUS_DIR = Path(__file__).parent.parent / "corpus" |
| OUTPUT_DIR = Path(__file__).parent.parent / "output" |
|
|
| |
| _extraction_cache = ExtractionCache(str(CORPUS_DIR)) |
| _doc_index = DocIndex(persist_path=CORPUS_DIR / ".doc_index.json") |
|
|
|
|
| def _docs_currently_understanding(doc_index: DocIndex) -> list[str]: |
| """Return doc_ids still running background understanding.""" |
| return sorted( |
| doc["doc_id"] |
| for doc in doc_index.list_docs() |
| if doc.get("processing_status") == "understanding" and not doc.get("is_processed") |
| ) |
|
|
|
|
| def _normalize_selected_doc_ids(raw_value: Any) -> list[str]: |
| if not isinstance(raw_value, list): |
| return [] |
| seen: set[str] = set() |
| selected_doc_ids: list[str] = [] |
| for raw_doc_id in raw_value: |
| doc_id = str(raw_doc_id or "").strip() |
| if not doc_id or doc_id in seen: |
| continue |
| seen.add(doc_id) |
| selected_doc_ids.append(doc_id) |
| return selected_doc_ids |
|
|
|
|
| def _normalize_detail_level(raw_value: Any) -> str: |
| return "detailed" if str(raw_value or "").strip().lower() == "detailed" else "standard" |
|
|
|
|
| def _processing_docs_for_scope(doc_index: DocIndex, selected_doc_ids: list[str] | None = None) -> list[str]: |
| processing_docs = _docs_currently_understanding(doc_index) |
| selected_doc_set = set(selected_doc_ids or []) |
| if not selected_doc_set: |
| return processing_docs |
| return [doc_id for doc_id in processing_docs if doc_id in selected_doc_set] |
|
|
|
|
| def _json_safe(value: Any) -> Any: |
| """Normalize Pydantic models and other rich objects into JSON-safe values.""" |
| return jsonable_encoder(value) |
|
|
|
|
| def _normalize_session_id(raw_value: Any) -> str: |
| session_id = str(raw_value or "").strip() |
| return session_id or str(uuid4()) |
|
|
|
|
| def _get_session_queue(session_id: str) -> asyncio.Queue: |
| cleanup_task = session_cleanup_tasks.pop(session_id, None) |
| if cleanup_task: |
| cleanup_task.cancel() |
|
|
| queue = session_queues.get(session_id) |
| if queue is None: |
| queue = asyncio.Queue() |
| session_queues[session_id] = queue |
| return queue |
|
|
|
|
| def _schedule_session_cleanup(session_id: str, queue: asyncio.Queue) -> None: |
| cleanup_task = session_cleanup_tasks.pop(session_id, None) |
| if cleanup_task: |
| cleanup_task.cancel() |
|
|
| async def cleanup_later() -> None: |
| try: |
| await asyncio.sleep(SESSION_CLEANUP_DELAY_SECONDS) |
| if session_queues.get(session_id) is queue: |
| session_queues.pop(session_id, None) |
| session_results.pop(session_id, None) |
| except asyncio.CancelledError: |
| pass |
| finally: |
| if session_cleanup_tasks.get(session_id) is task: |
| session_cleanup_tasks.pop(session_id, None) |
|
|
| task = asyncio.create_task(cleanup_later()) |
| session_cleanup_tasks[session_id] = task |
|
|
|
|
| def _session_doc_id(selected_doc_ids: list[str], result_data: dict | None = None) -> str: |
| if selected_doc_ids: |
| return selected_doc_ids[0] |
| trace = (result_data or {}).get("trace_summary", {}) |
| docs_opened = trace.get("docs_opened", []) if isinstance(trace, dict) else [] |
| if docs_opened: |
| return str(docs_opened[0]) |
| return "corpus" |
|
|
|
|
| def _schedule_session_compression(session_id: str) -> None: |
| result_data = session_results.get(session_id) |
| if not result_data: |
| return |
|
|
| doc_id = str(result_data.get("doc_id") or "corpus") |
|
|
| async def compress_later() -> None: |
| from pluto.session_memory import compress_session |
|
|
| await asyncio.to_thread(compress_session, session_id, doc_id, result_data, CORPUS_DIR) |
|
|
| asyncio.create_task(compress_later()) |
|
|
|
|
| |
|
|
| @app.on_event("startup") |
| async def startup_reindex(): |
| """On server start, index any corpus files not already in DocIndex.""" |
| import logging |
| from pluto.ingest import ingest_file, _split_into_chunks, _classify_and_tag_chunks |
| from pluto.doc_index import ChunkMeta |
|
|
| logger = logging.getLogger("pluto") |
| CORPUS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| for md_file in sorted(CORPUS_DIR.glob("*.md")): |
| doc_id = md_file.stem |
| if _doc_index.has_doc(doc_id): |
| continue |
|
|
| logger.info(f"Re-indexing existing corpus file: {doc_id}") |
| try: |
| content = md_file.read_text(encoding="utf-8", errors="replace") |
| chunks = _split_into_chunks(content) |
| chunk_meta_list = _classify_and_tag_chunks(chunks) |
| meta_objects = [ |
| ChunkMeta( |
| chunk_id=m["chunk_id"], |
| chunk_type=m["chunk_type"], |
| mode=m["mode"], |
| header=m["header"], |
| ) |
| for m in chunk_meta_list |
| ] |
| _doc_index.register_doc( |
| doc_id=doc_id, |
| filename=md_file.name, |
| chunks=chunks, |
| chunk_meta=meta_objects, |
| ) |
| _doc_index.set_overview( |
| doc_id, |
| "Preloaded corpus document re-indexed at startup; no generated overview is available yet.", |
| ) |
| except Exception as e: |
| logger.warning(f"Failed to re-index {doc_id}: {e}") |
|
|
| logger.info(f"DocIndex ready: {len(_doc_index.list_docs())} documents indexed") |
|
|
|
|
| |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(): |
| html_path = FRONTEND_DIR / "index.html" |
| return html_path.read_text(encoding="utf-8") |
|
|
|
|
|
|
| |
|
|
| @app.post("/api/run") |
| async def run_pipeline(request: Request): |
| """Run the full pipeline for a user query.""" |
| body = await request.json() |
| query = body.get("query", "") |
| corpus_dir = body.get("corpus_dir", str(CORPUS_DIR)) |
| selected_doc_ids = _normalize_selected_doc_ids(body.get("selected_doc_ids")) |
| detail_level = _normalize_detail_level(body.get("detail_level")) |
| session_id = _normalize_session_id(body.get("session_id")) |
| query_timestamp = body.get("query_timestamp") |
| prev_query = body.get("prev_query", "") |
| prev_query_timestamp = body.get("prev_query_timestamp") |
| prev_session_id = str(body.get("prev_session_id") or "").strip() |
| progress_queue = _get_session_queue(session_id) |
| doc_id = _session_doc_id(selected_doc_ids) |
| prior_session_context = [] |
| if selected_doc_ids: |
| from pluto.session_memory import list_session_context |
| prior_session_context = list_session_context(doc_id, CORPUS_DIR) |
|
|
| if not query: |
| return JSONResponse({"error": "No query provided", "session_id": session_id}, status_code=400) |
|
|
| _capture_behavioral_signals( |
| query=query, |
| query_timestamp=query_timestamp, |
| prev_query=prev_query, |
| prev_query_timestamp=prev_query_timestamp, |
| prev_session_id=prev_session_id, |
| fallback_session_id=session_id, |
| ) |
|
|
| processing_docs = _processing_docs_for_scope(_doc_index, selected_doc_ids) |
| if processing_docs: |
| return JSONResponse( |
| { |
| "error": "Please wait for document understanding to finish before running a query.", |
| "processing_docs": processing_docs, |
| "session_id": session_id, |
| }, |
| status_code=409, |
| headers={"Cache-Control": "no-store"}, |
| ) |
|
|
| |
| while not progress_queue.empty(): |
| try: |
| progress_queue.get_nowait() |
| except asyncio.QueueEmpty: |
| break |
|
|
| def progress_callback(stage: str, data: dict): |
| progress_queue.put_nowait(_json_safe({"stage": stage, **data})) |
|
|
| |
| loop = asyncio.get_event_loop() |
| runner = PipelineRunner( |
| corpus_dir=corpus_dir, output_dir=str(OUTPUT_DIR), |
| doc_index=_doc_index, |
| prior_session_context=prior_session_context, |
| ) |
| runner.on_progress(progress_callback) |
|
|
| try: |
| result = await loop.run_in_executor( |
| None, |
| partial( |
| runner.run, |
| query, |
| selected_doc_ids=selected_doc_ids, |
| detail_level=detail_level, |
| ), |
| ) |
| session_results[session_id] = result.model_dump() |
|
|
| |
| cache_stats = runner.cache.stats() |
| session_results[session_id]["cache_hits"] = cache_stats["hits"] |
| session_results[session_id]["cache_misses"] = cache_stats["misses"] |
| session_results[session_id]["session_id"] = session_id |
| session_results[session_id]["query"] = query |
| session_results[session_id]["doc_id"] = _session_doc_id(selected_doc_ids, session_results[session_id]) |
|
|
| |
| await progress_queue.put({"stage": "done", "status": "complete", "session_id": session_id}) |
|
|
| return JSONResponse(session_results[session_id]) |
|
|
| except Exception as e: |
| import traceback |
| err_msg = str(e) |
| traceback.print_exc() |
|
|
| |
| try: |
| await progress_queue.put( |
| {"stage": "error", "status": "failed", "detail": err_msg, "session_id": session_id} |
| ) |
| except Exception: |
| pass |
|
|
| |
| return JSONResponse( |
| {"error": f"Pipeline error: {err_msg}", "session_id": session_id}, |
| status_code=200 |
| ) |
|
|
|
|
| @app.get("/api/stream") |
| async def stream_progress(session_id: str): |
| """SSE stream of pipeline progress events.""" |
| progress_queue = _get_session_queue(session_id) |
|
|
| async def event_generator(): |
| |
| |
| yield f"data: {json.dumps({'stage': 'connected', 'session_id': session_id})}\n\n" |
|
|
| |
| try: |
| while True: |
| try: |
| event = await asyncio.wait_for(progress_queue.get(), timeout=120.0) |
| yield f"data: {json.dumps(_json_safe(event))}\n\n" |
| if event.get("stage") in ("done", "error"): |
| if event.get("stage") == "done": |
| _schedule_session_compression(session_id) |
| break |
| except asyncio.TimeoutError: |
| yield f"data: {json.dumps({'stage': 'heartbeat', 'session_id': session_id})}\n\n" |
| finally: |
| _schedule_session_cleanup(session_id, progress_queue) |
|
|
| return StreamingResponse( |
| event_generator(), |
| media_type="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| "X-Accel-Buffering": "no", |
| }, |
| ) |
|
|
|
|
| @app.get("/api/result") |
| async def get_result(session_id: str): |
| """Return the latest pipeline result for a session.""" |
| result = session_results.get(session_id) |
| if result: |
| return JSONResponse(result) |
| return JSONResponse({"error": "No result yet", "session_id": session_id}, status_code=404) |
|
|
|
|
| @app.get("/api/session-context/{doc_id}") |
| async def get_session_context(doc_id: str): |
| """Return recent compressed session context for a document.""" |
| from pluto.session_memory import list_session_context |
|
|
| sessions = list_session_context(doc_id, CORPUS_DIR, limit=10) |
| return JSONResponse({"doc_id": doc_id, "sessions": sessions}, headers={"Cache-Control": "no-store"}) |
|
|
|
|
| @app.post("/api/compare") |
| async def benchmark_compare(request: Request): |
| """Run benchmark: Pluto vs Single Model Baseline.""" |
| from benchmark.compare import ComparisonRunner |
| |
| body = await request.json() |
| query = body.get("query", "") |
| selected_doc_ids = _normalize_selected_doc_ids(body.get("selected_doc_ids")) |
| detail_level = _normalize_detail_level(body.get("detail_level")) |
| |
| if not query: |
| return JSONResponse({"error": "No query provided"}, status_code=400) |
|
|
| processing_docs = _processing_docs_for_scope(_doc_index, selected_doc_ids) |
| if processing_docs: |
| return JSONResponse( |
| { |
| "error": "Please wait for document understanding to finish before running the benchmark.", |
| "processing_docs": processing_docs, |
| }, |
| status_code=409, |
| headers={"Cache-Control": "no-store"}, |
| ) |
|
|
| try: |
| runner = ComparisonRunner(str(CORPUS_DIR), doc_index=_doc_index) |
| results = runner.compare( |
| query, |
| selected_doc_ids=selected_doc_ids, |
| detail_level=detail_level, |
| ) |
| return JSONResponse(results, headers={"Cache-Control": "no-store"}) |
| except Exception as e: |
| return JSONResponse( |
| {"error": f"Benchmark error: {e}"}, |
| status_code=200, |
| headers={"Cache-Control": "no-store"}, |
| ) |
|
|
|
|
| def _capture_behavioral_signals( |
| query: str, |
| query_timestamp: Any, |
| prev_query: str, |
| prev_query_timestamp: Any, |
| prev_session_id: str, |
| fallback_session_id: str, |
| ) -> None: |
| from pluto.signal_logger import check_prior_reference, check_rephrase, log_signal, query_hash |
|
|
| referenced_session_id = prev_session_id or fallback_session_id |
|
|
| if prev_query and prev_query_timestamp is not None and query_timestamp is not None: |
| try: |
| delta_seconds = (float(query_timestamp) - float(prev_query_timestamp)) / 1000.0 |
| except (TypeError, ValueError): |
| delta_seconds = -1 |
| if check_rephrase(query, prev_query, delta_seconds): |
| log_signal(referenced_session_id, query_hash(prev_query), "rephrase_fail") |
|
|
| if check_prior_reference(query): |
| log_signal(referenced_session_id, query_hash(query), "prior_reference") |
|
|
|
|
| |
|
|
| ALLOWED_EXTENSIONS = {".pdf", ".docx", ".doc", ".txt", ".md", ".markdown"} |
|
|
|
|
| @app.post("/api/upload") |
| async def upload_files(files: list[UploadFile] = File(...)): |
| """Upload one or more files to the corpus.""" |
| from pluto.ingest import ingest_file |
|
|
| results = [] |
| errors = [] |
|
|
| for file in files: |
| ext = Path(file.filename or "").suffix.lower() |
| if ext not in ALLOWED_EXTENSIONS: |
| errors.append({"filename": file.filename, "error": f"Unsupported type: {ext}"}) |
| continue |
|
|
| |
| tmp_dir = tempfile.mkdtemp() |
| try: |
| tmp_path = Path(tmp_dir) / (file.filename or "upload") |
| with open(tmp_path, "wb") as f: |
| content = await file.read() |
| f.write(content) |
|
|
| info = ingest_file(tmp_path, str(CORPUS_DIR), doc_index=_doc_index) |
|
|
| |
| doc_id = info["doc_id"] |
| if not _doc_index.is_processed(doc_id): |
| _doc_index.mark_processing(doc_id) |
| import threading |
| def _bg_understand(did): |
| try: |
| from pluto.stages.understand import run_understand |
| from pluto.tracer import Tracer |
| tracer = Tracer() |
| print(f" [SERVER] Starting background Phase A for {did}...") |
| run_understand(did, _doc_index, tracer) |
| from pluto.doc_summary import generate_doc_summary |
| generate_doc_summary(did, CORPUS_DIR) |
| print(f" [SERVER] Background Phase A COMPLETE for {did}") |
| except BaseException as e: |
| import traceback |
| print(f" [CRITICAL] Background Phase A failed for {did}: {e}") |
| _doc_index.mark_failed(did, str(e)) |
| traceback.print_exc() |
| |
| |
| threading.Thread(target=_bg_understand, args=(doc_id,), daemon=True).start() |
| info["understanding"] = "in_progress" |
| else: |
| info["understanding"] = "complete" |
|
|
| results.append(info) |
| except Exception as e: |
| errors.append({"filename": file.filename, "error": str(e)}) |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| return JSONResponse({ |
| "uploaded": results, |
| "errors": errors, |
| "corpus_size": len(list(CORPUS_DIR.glob("*.md"))), |
| }) |
|
|
|
|
| @app.get("/api/doc-status/{doc_id}") |
| async def doc_status(doc_id: str): |
| """Check if a document has been fully understood (Phase A complete).""" |
| if not _doc_index.has_doc(doc_id): |
| return JSONResponse( |
| {"doc_id": doc_id, "status": "not_found"}, |
| status_code=404, |
| headers={"Cache-Control": "no-store"}, |
| ) |
| status = _doc_index.get_effective_status(doc_id) |
| return JSONResponse({ |
| "doc_id": doc_id, |
| "status": status, |
| "has_overview": bool(_doc_index.get_overview(doc_id)), |
| "chunk_count": _doc_index.get_chunk_count(doc_id), |
| "error": _doc_index.get_last_error(doc_id), |
| }, headers={"Cache-Control": "no-store"}) |
|
|
|
|
| @app.get("/api/cache/stats") |
| async def cache_stats(): |
| """Return extraction cache statistics.""" |
| return JSONResponse(_extraction_cache.stats()) |
|
|
|
|
| @app.get("/api/corpus") |
| async def list_corpus(): |
| """List all documents in the corpus.""" |
| CORPUS_DIR.mkdir(parents=True, exist_ok=True) |
| docs = [] |
| for f in sorted(CORPUS_DIR.glob("*.md")): |
| doc_id = f.stem |
| has_doc = _doc_index.has_doc(doc_id) |
| display_name = _doc_index.get_filename(doc_id) if has_doc else "" |
| docs.append({ |
| "doc_id": doc_id, |
| "filename": display_name or f.name, |
| "stored_filename": f.name, |
| "size": f.stat().st_size, |
| "chunk_count": _doc_index.get_chunk_count(doc_id) if has_doc else 0, |
| "processing_status": _doc_index.get_effective_status(doc_id) if has_doc else "not_found", |
| "is_processed": _doc_index.is_processed(doc_id) if has_doc else False, |
| }) |
| return JSONResponse({"documents": docs, "total": len(docs)}, headers={"Cache-Control": "no-store"}) |
|
|
|
|
| @app.delete("/api/corpus/{doc_id}") |
| async def delete_corpus_doc(doc_id: str): |
| """Delete a document from the corpus.""" |
| target = CORPUS_DIR / f"{doc_id}.md" |
| if target.exists(): |
| target.unlink() |
| |
| _doc_index.remove_doc(doc_id) |
| |
| removed = _extraction_cache.invalidate_doc(doc_id) |
| _extraction_cache.save() |
| return JSONResponse({"deleted": doc_id, "cache_entries_cleared": removed}) |
| return JSONResponse({"error": f"Document {doc_id} not found"}, status_code=404) |
|
|
|
|
| |
| if FRONTEND_DIR.exists(): |
| app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|