from __future__ import annotations import asyncio import time from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status from app.api.deps import get_vector_store_service, require_auth from app.core.logger import get_logger from app.models.domain import ConversionError from app.models.schemas import ( DeleteRequest, DeleteResponse, DocumentIngestRequest, DocumentIngestResponse, DocumentIngestUrlRequest, SearchRequest, SearchResponse, VectorStoreCreate, VectorStoreListResponse, VectorStoreResponse, ) from app.services.converter_service import ConverterService from app.services.text_cleaner_service import TextCleanerService from app.services.vector_store_service import VectorStoreService router = APIRouter() logger = get_logger(__name__) async def _process_pdf_bytes(raw: bytes, source: str, clean_content: bool) -> str | ConversionError: if len(raw) < 5 or raw[:5] != b"%PDF-": return ConversionError(source=source, error_type="ValueError", message="Not a valid PDF", duration_ms=0) loop = asyncio.get_running_loop() converter = ConverterService() result = await loop.run_in_executor(None, converter.convert_stream, raw, source) if isinstance(result, ConversionError): return result text = result.markdown if clean_content: text_cleaner = TextCleanerService() text = await loop.run_in_executor(None, text_cleaner.clean, text) return text @router.post( "/vector-stores", response_model=VectorStoreResponse, summary="Create a new vector store", status_code=status.HTTP_201_CREATED, ) async def create_vector_store( body: VectorStoreCreate, token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> VectorStoreResponse: store_id, app_id = await vector_store_service.create_store( name=body.name, description=body.description or "", metadata=body.metadata, ) stats = await vector_store_service.get_store_stats(store_id) return VectorStoreResponse( success=True, vector_store_id=store_id, app_id=app_id, name=body.name, description=body.description, embedding_dimension=stats["embedding_dimension"], document_count=stats["document_count"], created_at=stats["created_at"], metadata=stats["metadata"], ) @router.get( "/vector-stores", response_model=VectorStoreListResponse, summary="List all vector stores", ) async def list_vector_stores( token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> VectorStoreListResponse: records = vector_store_service.list_stores() stores = [] for r in records: try: stats = await vector_store_service.get_store_stats(r.store_id) except Exception: stats = {} stores.append(VectorStoreResponse( success=True, vector_store_id=r.store_id, app_id=stats.get("app_id", r.store_id), name=r.name, description=r.description, embedding_dimension=stats.get("embedding_dimension", 0), document_count=stats.get("document_count", 0), created_at=r.created_at, metadata=r.metadata, )) return VectorStoreListResponse(success=True, total=len(stores), stores=stores) @router.get( "/vector-stores/{store_id}", response_model=VectorStoreResponse, summary="Get vector store details", ) async def get_vector_store( store_id: str, token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> VectorStoreResponse: try: stats = await vector_store_service.get_store_stats(store_id) except ValueError as exc: raise HTTPException(status_code=404, detail=str(exc)) return VectorStoreResponse( success=True, vector_store_id=stats["store_id"], app_id=stats["app_id"], name=stats["name"], description=stats["description"], embedding_dimension=stats["embedding_dimension"], document_count=stats["document_count"], created_at=stats["created_at"], metadata=stats["metadata"], ) @router.delete( "/vector-stores/{store_id}", response_model=VectorStoreResponse, summary="Delete a vector store and all its data", ) async def delete_vector_store( store_id: str, token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> VectorStoreResponse: record = vector_store_service.get_store(store_id) if record is None: raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") ok = await vector_store_service.delete_store(store_id) if not ok: raise HTTPException(status_code=500, detail="Failed to delete vector store") return VectorStoreResponse( success=True, vector_store_id=store_id, app_id="", name=record.name, document_count=0, embedding_dimension=0, created_at=record.created_at, ) @router.post( "/vector-stores/{store_id}/documents", response_model=DocumentIngestResponse, summary="Ingest a document into the vector store with automatic chunking and embedding", ) async def ingest_document( store_id: str, body: DocumentIngestRequest, token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> DocumentIngestResponse: record = vector_store_service.get_store(store_id) if record is None: raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") try: chunks, elapsed = await vector_store_service.ingest_document( store_id=store_id, doc_id=body.doc_id, text=body.text, source=body.source, metadata=body.metadata, chunk_size=body.chunk_size, chunk_overlap=body.chunk_overlap, ) except Exception as exc: logger.error("Ingest failed for store %s: %s", store_id, exc) return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=0, time_ms=0, error=str(exc), ) return DocumentIngestResponse( success=True, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=chunks, time_ms=round(elapsed, 3), ) @router.post( "/vector-stores/{store_id}/documents/upload", response_model=DocumentIngestResponse, summary="Upload a PDF file and ingest it into the vector store", ) async def ingest_pdf_document( store_id: str, file: UploadFile = File(..., description="PDF file to ingest"), doc_id: str = Form(..., min_length=1, max_length=256), chunk_size: int = Form(512, ge=64, le=4096), chunk_overlap: int = Form(64, ge=0, le=512), clean_content: bool = Query(True, description="Clean markdown text after PDF conversion"), token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> DocumentIngestResponse: record = vector_store_service.get_store(store_id) if record is None: raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") if file.content_type != "application/pdf": raise HTTPException(status_code=400, detail="Only PDF files are accepted") if not file.filename or not file.filename.lower().endswith(".pdf"): raise HTTPException(status_code=400, detail="Only .pdf files are accepted") try: raw = await file.read() finally: await file.close() text = await _process_pdf_bytes(raw, file.filename, clean_content) if isinstance(text, ConversionError): return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=doc_id, chunks_ingested=0, time_ms=0, error=text.message, ) try: chunks, elapsed = await vector_store_service.ingest_document( store_id=store_id, doc_id=doc_id, text=text, source=file.filename, chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) except Exception as exc: logger.error("PDF ingest failed for store %s: %s", store_id, exc) return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=doc_id, chunks_ingested=0, time_ms=0, error=str(exc), ) return DocumentIngestResponse( success=True, vector_store_id=store_id, doc_id=doc_id, chunks_ingested=chunks, time_ms=round(elapsed, 3), ) @router.post( "/vector-stores/{store_id}/documents/upload-url", response_model=DocumentIngestResponse, summary="Ingest a PDF from a URL into the vector store", ) async def ingest_pdf_url( store_id: str, body: DocumentIngestUrlRequest, clean_content: bool = Query(True, description="Clean markdown text after PDF conversion"), token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> DocumentIngestResponse: record = vector_store_service.get_store(store_id) if record is None: raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") import httpx try: async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: resp = await client.get(body.url) resp.raise_for_status() raw = resp.content except httpx.HTTPStatusError as exc: return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=0, time_ms=0, error=f"Failed to fetch PDF from URL: HTTP {exc.response.status_code}", ) except httpx.RequestError as exc: return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=0, time_ms=0, error=f"Failed to fetch PDF from URL: {exc}", ) text = await _process_pdf_bytes(raw, body.url, clean_content) if isinstance(text, ConversionError): return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=0, time_ms=0, error=text.message, ) try: chunks, elapsed = await vector_store_service.ingest_document( store_id=store_id, doc_id=body.doc_id, text=text, source=body.url, chunk_size=body.chunk_size, chunk_overlap=body.chunk_overlap, ) except Exception as exc: logger.error("PDF URL ingest failed for store %s: %s", store_id, exc) return DocumentIngestResponse( success=False, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=0, time_ms=0, error=str(exc), ) return DocumentIngestResponse( success=True, vector_store_id=store_id, doc_id=body.doc_id, chunks_ingested=chunks, time_ms=round(elapsed, 3), ) @router.post( "/vector-stores/{store_id}/search", response_model=SearchResponse, summary="Search the vector store using natural language queries (RAG)", ) async def search_vector_store( store_id: str, body: SearchRequest, token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> SearchResponse: record = vector_store_service.get_store(store_id) if record is None: raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") try: items, elapsed = await vector_store_service.search( store_id=store_id, query_text=body.query, top_k=body.top_k, filter_expr=body.filter, min_score=body.min_score, include_vectors=body.include_vectors, include_metadata=body.include_metadata, ) except Exception as exc: logger.error("Search failed for store %s: %s", store_id, exc) return SearchResponse( success=False, vector_store_id=store_id, query=body.query, results=[], total_results=0, time_ms=0, error=str(exc), ) return SearchResponse( success=True, vector_store_id=store_id, query=body.query, results=items, total_results=len(items), time_ms=round(elapsed, 3), ) @router.post( "/vector-stores/{store_id}/delete", response_model=DeleteResponse, summary="Delete documents from the vector store by IDs or filter", ) async def delete_documents( store_id: str, body: DeleteRequest, token: str = Depends(require_auth), vector_store_service: VectorStoreService = Depends(get_vector_store_service), ) -> DeleteResponse: record = vector_store_service.get_store(store_id) if record is None: raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") start = time.perf_counter() try: deleted = await vector_store_service.delete_documents( store_id=store_id, ids=body.ids, filter_expr=body.filter, ) except Exception as exc: elapsed = (time.perf_counter() - start) * 1000 return DeleteResponse( success=False, vector_store_id=store_id, deleted_count=0, time_ms=round(elapsed, 3), error=str(exc), ) elapsed = (time.perf_counter() - start) * 1000 return DeleteResponse( success=True, vector_store_id=store_id, deleted_count=deleted, time_ms=round(elapsed, 3), )