Spaces:
Running
Running
| """ | |
| VoiceVault REST API Routes | |
| ========================== | |
| All FastAPI endpoints for the custom web frontend. | |
| Singletons are injected once at startup via init_routes(). | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import logging | |
| import shutil | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import APIRouter, File, Form, HTTPException, UploadFile | |
| from pydantic import BaseModel | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api") | |
| # Singletons injected at startup | |
| _kb_manager = None | |
| _transcriber = None | |
| _answer_chain = None | |
| _db_path: Optional[Path] = None | |
| def init_routes(kb_manager, transcriber, answer_chain, db_path: Path) -> None: | |
| """Inject pipeline singletons into the routes module.""" | |
| global _kb_manager, _transcriber, _answer_chain, _db_path | |
| _kb_manager = kb_manager | |
| _transcriber = transcriber | |
| _answer_chain = answer_chain | |
| _db_path = db_path | |
| # ------------------------------------------------------------------ # | |
| # Knowledge Base Management # | |
| # ------------------------------------------------------------------ # | |
| async def list_kbs(): | |
| """Return all knowledge bases.""" | |
| kbs = _kb_manager.list_kbs() | |
| return [ | |
| { | |
| "kb_name": kb.kb_name, | |
| "display_name": kb.display_name, | |
| "is_protected": kb.is_protected, | |
| "doc_count": kb.doc_count, | |
| "chunk_count": kb.chunk_count, | |
| "created_at": kb.created_at.isoformat() if kb.created_at else None, | |
| } | |
| for kb in kbs | |
| ] | |
| class CreateKBRequest(BaseModel): | |
| kb_name: str | |
| display_name: str | |
| password: Optional[str] = None | |
| async def create_kb(req: CreateKBRequest): | |
| """Create a new knowledge base.""" | |
| from voicevault.kb.kb_manager import KBManagerError | |
| try: | |
| kb = _kb_manager.create_kb(req.kb_name, req.display_name, req.password) | |
| return {"ok": True, "kb_name": kb.kb_name, "display_name": kb.display_name} | |
| except KBManagerError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def delete_kb(kb_name: str): | |
| """Delete a knowledge base and all its data.""" | |
| from voicevault.kb.kb_manager import KBManagerError | |
| try: | |
| _kb_manager.delete_kb(kb_name) | |
| return {"ok": True} | |
| except KBManagerError as e: | |
| raise HTTPException(status_code=404, detail=str(e)) | |
| async def upload_documents( | |
| kb_name: str, | |
| files: list[UploadFile] = File(...), | |
| password: Optional[str] = Form(None), | |
| ): | |
| """Upload and index documents into a knowledge base.""" | |
| from voicevault.kb.kb_manager import KBManagerError | |
| if not _kb_manager.verify_password(kb_name, password): | |
| raise HTTPException(status_code=403, detail="Invalid password") | |
| tmp_dir = Path(tempfile.mkdtemp()) | |
| try: | |
| saved_paths: list[Path] = [] | |
| for file in files: | |
| dest = tmp_dir / (file.filename or f"upload_{uuid.uuid4()}") | |
| with open(dest, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| saved_paths.append(dest) | |
| reports = _kb_manager.ingest_documents(kb_name, saved_paths, password) | |
| return { | |
| "ok": True, | |
| "reports": [ | |
| { | |
| "filename": r.filename, | |
| "chunk_count": r.chunk_count, | |
| "page_count": r.page_count, | |
| "status": r.status, | |
| "message": r.message, | |
| "duration_ms": r.duration_ms, | |
| } | |
| for r in reports | |
| ], | |
| } | |
| except KBManagerError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| finally: | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| # ------------------------------------------------------------------ # | |
| # Transcription # | |
| # ------------------------------------------------------------------ # | |
| async def transcribe_audio(audio: UploadFile = File(...)): | |
| """Transcribe an audio file using Whisper.""" | |
| tmp_dir = Path(tempfile.mkdtemp()) | |
| try: | |
| suffix = Path(audio.filename or "audio.webm").suffix or ".webm" | |
| tmp_path = tmp_dir / f"recording{suffix}" | |
| with open(tmp_path, "wb") as f: | |
| shutil.copyfileobj(audio.file, f) | |
| result = _transcriber.transcribe(tmp_path) | |
| return { | |
| "transcript": result.transcript, | |
| "language": result.language, | |
| "confidence": result.confidence, | |
| "latency_ms": result.latency_ms, | |
| } | |
| except Exception as e: | |
| logger.exception("Transcription failed") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| # ------------------------------------------------------------------ # | |
| # Ask / Q&A # | |
| # ------------------------------------------------------------------ # | |
| class AskRequest(BaseModel): | |
| query: str | |
| kb_names: list[str] | |
| history: list[tuple[str, str]] = [] | |
| async def ask(req: AskRequest): | |
| """Run the full RAG pipeline for a query and return the answer.""" | |
| from voicevault.asr.query_preprocessor import QueryPreprocessor | |
| from voicevault.retrieval.context_builder import ContextBuilder | |
| from voicevault.retrieval.hybrid_retriever import HybridRetriever | |
| from voicevault.storage import sqlite_store as db_mod | |
| from voicevault.tts.web_speech import prepare_for_tts | |
| from ui.components.citation_panel import format_citations_markdown | |
| if not req.query.strip(): | |
| raise HTTPException(status_code=400, detail="Empty query") | |
| if not req.kb_names: | |
| raise HTTPException(status_code=400, detail="No knowledge bases selected") | |
| pq = QueryPreprocessor().process(req.query) | |
| search_query = pq.processed_query or req.query | |
| retriever = HybridRetriever(kb_names=req.kb_names) | |
| results = retriever.retrieve(search_query) | |
| context, citation_map = ContextBuilder().build(results) | |
| generation = _answer_chain.generate( | |
| query=search_query, | |
| context=context, | |
| citation_map=citation_map, | |
| history=req.history, | |
| query_type=pq.query_type, | |
| ) | |
| # Log query — SHA-256 hash only, no raw query stored | |
| try: | |
| db_mod.log_query( | |
| db_path=_db_path, | |
| log_id=str(uuid.uuid4()), | |
| session_id=str(uuid.uuid4()), | |
| kb_names=req.kb_names, | |
| voice_query_hash=hashlib.sha256(req.query.encode()).hexdigest(), | |
| processed_query=pq.processed_query, | |
| query_type=pq.query_type, | |
| answer_length=len(generation.answer), | |
| citation_count=len(generation.citations), | |
| latency_asr_ms=0, | |
| latency_ret_ms=0, | |
| latency_llm_ms=generation.latency_ms, | |
| total_latency_ms=generation.latency_ms, | |
| groq_tokens_used=generation.tokens_used, | |
| ) | |
| except Exception as log_exc: | |
| logger.warning("Query logging failed (non-critical): %s", log_exc) | |
| return { | |
| "answer": generation.answer, | |
| "citations_markdown": format_citations_markdown(generation.citations), | |
| "citations": [ | |
| { | |
| "source_file": c.source_file, | |
| "page_number": c.page_number, | |
| "section": c.section, | |
| "excerpt": c.excerpt, | |
| "relevance_score": c.relevance_score, | |
| } | |
| for c in generation.citations | |
| ], | |
| "confidence_level": generation.confidence_level, | |
| "is_refusal": generation.is_refusal, | |
| "model_used": generation.model_used, | |
| "tokens_used": generation.tokens_used, | |
| "latency_ms": generation.latency_ms, | |
| "tts_text": prepare_for_tts(generation.answer, generation.is_refusal), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # Analytics # | |
| # ------------------------------------------------------------------ # | |
| async def get_analytics(): | |
| """Return query statistics and KB inventory.""" | |
| from voicevault.storage import sqlite_store as db_mod | |
| stats = db_mod.get_query_stats(_db_path, days=7) | |
| kbs = _kb_manager.list_kbs() | |
| return { | |
| "stats": stats, | |
| "kbs": [ | |
| { | |
| "kb_name": kb.kb_name, | |
| "display_name": kb.display_name, | |
| "doc_count": kb.doc_count, | |
| "chunk_count": kb.chunk_count, | |
| } | |
| for kb in kbs | |
| ], | |
| } | |