""" VoiceVault REST API Routes ========================== All FastAPI endpoints for the custom web frontend. Singletons are injected once at startup via init_routes(). """ from __future__ import annotations import hashlib import logging import shutil import tempfile import uuid from pathlib import Path from typing import Optional from fastapi import APIRouter, File, Form, HTTPException, UploadFile from pydantic import BaseModel logger = logging.getLogger(__name__) router = APIRouter(prefix="/api") # Singletons injected at startup _kb_manager = None _transcriber = None _answer_chain = None _db_path: Optional[Path] = None def init_routes(kb_manager, transcriber, answer_chain, db_path: Path) -> None: """Inject pipeline singletons into the routes module.""" global _kb_manager, _transcriber, _answer_chain, _db_path _kb_manager = kb_manager _transcriber = transcriber _answer_chain = answer_chain _db_path = db_path # ------------------------------------------------------------------ # # Knowledge Base Management # # ------------------------------------------------------------------ # @router.get("/kbs") async def list_kbs(): """Return all knowledge bases.""" kbs = _kb_manager.list_kbs() return [ { "kb_name": kb.kb_name, "display_name": kb.display_name, "is_protected": kb.is_protected, "doc_count": kb.doc_count, "chunk_count": kb.chunk_count, "created_at": kb.created_at.isoformat() if kb.created_at else None, } for kb in kbs ] class CreateKBRequest(BaseModel): kb_name: str display_name: str password: Optional[str] = None @router.post("/kbs") async def create_kb(req: CreateKBRequest): """Create a new knowledge base.""" from voicevault.kb.kb_manager import KBManagerError try: kb = _kb_manager.create_kb(req.kb_name, req.display_name, req.password) return {"ok": True, "kb_name": kb.kb_name, "display_name": kb.display_name} except KBManagerError as e: raise HTTPException(status_code=400, detail=str(e)) @router.delete("/kbs/{kb_name}") async def delete_kb(kb_name: str): """Delete a knowledge base and all its data.""" from voicevault.kb.kb_manager import KBManagerError try: _kb_manager.delete_kb(kb_name) return {"ok": True} except KBManagerError as e: raise HTTPException(status_code=404, detail=str(e)) @router.post("/kbs/{kb_name}/documents") async def upload_documents( kb_name: str, files: list[UploadFile] = File(...), password: Optional[str] = Form(None), ): """Upload and index documents into a knowledge base.""" from voicevault.kb.kb_manager import KBManagerError if not _kb_manager.verify_password(kb_name, password): raise HTTPException(status_code=403, detail="Invalid password") tmp_dir = Path(tempfile.mkdtemp()) try: saved_paths: list[Path] = [] for file in files: dest = tmp_dir / (file.filename or f"upload_{uuid.uuid4()}") with open(dest, "wb") as f: shutil.copyfileobj(file.file, f) saved_paths.append(dest) reports = _kb_manager.ingest_documents(kb_name, saved_paths, password) return { "ok": True, "reports": [ { "filename": r.filename, "chunk_count": r.chunk_count, "page_count": r.page_count, "status": r.status, "message": r.message, "duration_ms": r.duration_ms, } for r in reports ], } except KBManagerError as e: raise HTTPException(status_code=400, detail=str(e)) finally: shutil.rmtree(tmp_dir, ignore_errors=True) # ------------------------------------------------------------------ # # Transcription # # ------------------------------------------------------------------ # @router.post("/transcribe") async def transcribe_audio(audio: UploadFile = File(...)): """Transcribe an audio file using Whisper.""" tmp_dir = Path(tempfile.mkdtemp()) try: suffix = Path(audio.filename or "audio.webm").suffix or ".webm" tmp_path = tmp_dir / f"recording{suffix}" with open(tmp_path, "wb") as f: shutil.copyfileobj(audio.file, f) result = _transcriber.transcribe(tmp_path) return { "transcript": result.transcript, "language": result.language, "confidence": result.confidence, "latency_ms": result.latency_ms, } except Exception as e: logger.exception("Transcription failed") raise HTTPException(status_code=500, detail=str(e)) finally: shutil.rmtree(tmp_dir, ignore_errors=True) # ------------------------------------------------------------------ # # Ask / Q&A # # ------------------------------------------------------------------ # class AskRequest(BaseModel): query: str kb_names: list[str] history: list[tuple[str, str]] = [] @router.post("/ask") async def ask(req: AskRequest): """Run the full RAG pipeline for a query and return the answer.""" from voicevault.asr.query_preprocessor import QueryPreprocessor from voicevault.retrieval.context_builder import ContextBuilder from voicevault.retrieval.hybrid_retriever import HybridRetriever from voicevault.storage import sqlite_store as db_mod from voicevault.tts.web_speech import prepare_for_tts from ui.components.citation_panel import format_citations_markdown if not req.query.strip(): raise HTTPException(status_code=400, detail="Empty query") if not req.kb_names: raise HTTPException(status_code=400, detail="No knowledge bases selected") pq = QueryPreprocessor().process(req.query) search_query = pq.processed_query or req.query retriever = HybridRetriever(kb_names=req.kb_names) results = retriever.retrieve(search_query) context, citation_map = ContextBuilder().build(results) generation = _answer_chain.generate( query=search_query, context=context, citation_map=citation_map, history=req.history, query_type=pq.query_type, ) # Log query — SHA-256 hash only, no raw query stored try: db_mod.log_query( db_path=_db_path, log_id=str(uuid.uuid4()), session_id=str(uuid.uuid4()), kb_names=req.kb_names, voice_query_hash=hashlib.sha256(req.query.encode()).hexdigest(), processed_query=pq.processed_query, query_type=pq.query_type, answer_length=len(generation.answer), citation_count=len(generation.citations), latency_asr_ms=0, latency_ret_ms=0, latency_llm_ms=generation.latency_ms, total_latency_ms=generation.latency_ms, groq_tokens_used=generation.tokens_used, ) except Exception as log_exc: logger.warning("Query logging failed (non-critical): %s", log_exc) return { "answer": generation.answer, "citations_markdown": format_citations_markdown(generation.citations), "citations": [ { "source_file": c.source_file, "page_number": c.page_number, "section": c.section, "excerpt": c.excerpt, "relevance_score": c.relevance_score, } for c in generation.citations ], "confidence_level": generation.confidence_level, "is_refusal": generation.is_refusal, "model_used": generation.model_used, "tokens_used": generation.tokens_used, "latency_ms": generation.latency_ms, "tts_text": prepare_for_tts(generation.answer, generation.is_refusal), } # ------------------------------------------------------------------ # # Analytics # # ------------------------------------------------------------------ # @router.get("/analytics") async def get_analytics(): """Return query statistics and KB inventory.""" from voicevault.storage import sqlite_store as db_mod stats = db_mod.get_query_stats(_db_path, days=7) kbs = _kb_manager.list_kbs() return { "stats": stats, "kbs": [ { "kb_name": kb.kb_name, "display_name": kb.display_name, "doc_count": kb.doc_count, "chunk_count": kb.chunk_count, } for kb in kbs ], }