Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import time | |
| from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status | |
| from app.api.deps import get_vector_store_service, require_auth | |
| from app.core.logger import get_logger | |
| from app.models.domain import ConversionError | |
| from app.models.schemas import ( | |
| DeleteRequest, | |
| DeleteResponse, | |
| DocumentIngestRequest, | |
| DocumentIngestResponse, | |
| DocumentIngestUrlRequest, | |
| SearchRequest, | |
| SearchResponse, | |
| VectorStoreCreate, | |
| VectorStoreListResponse, | |
| VectorStoreResponse, | |
| ) | |
| from app.services.converter_service import ConverterService | |
| from app.services.text_cleaner_service import TextCleanerService | |
| from app.services.vector_store_service import VectorStoreService | |
| router = APIRouter() | |
| logger = get_logger(__name__) | |
| async def _process_pdf_bytes(raw: bytes, source: str, clean_content: bool) -> str | ConversionError: | |
| if len(raw) < 5 or raw[:5] != b"%PDF-": | |
| return ConversionError(source=source, error_type="ValueError", message="Not a valid PDF", duration_ms=0) | |
| loop = asyncio.get_running_loop() | |
| converter = ConverterService() | |
| result = await loop.run_in_executor(None, converter.convert_stream, raw, source) | |
| if isinstance(result, ConversionError): | |
| return result | |
| text = result.markdown | |
| if clean_content: | |
| text_cleaner = TextCleanerService() | |
| text = await loop.run_in_executor(None, text_cleaner.clean, text) | |
| return text | |
| async def create_vector_store( | |
| body: VectorStoreCreate, | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> VectorStoreResponse: | |
| store_id, app_id = await vector_store_service.create_store( | |
| name=body.name, | |
| description=body.description or "", | |
| metadata=body.metadata, | |
| ) | |
| stats = await vector_store_service.get_store_stats(store_id) | |
| return VectorStoreResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| app_id=app_id, | |
| name=body.name, | |
| description=body.description, | |
| embedding_dimension=stats["embedding_dimension"], | |
| document_count=stats["document_count"], | |
| created_at=stats["created_at"], | |
| metadata=stats["metadata"], | |
| ) | |
| async def list_vector_stores( | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> VectorStoreListResponse: | |
| records = vector_store_service.list_stores() | |
| stores = [] | |
| for r in records: | |
| try: | |
| stats = await vector_store_service.get_store_stats(r.store_id) | |
| except Exception: | |
| stats = {} | |
| stores.append(VectorStoreResponse( | |
| success=True, | |
| vector_store_id=r.store_id, | |
| app_id=stats.get("app_id", r.store_id), | |
| name=r.name, | |
| description=r.description, | |
| embedding_dimension=stats.get("embedding_dimension", 0), | |
| document_count=stats.get("document_count", 0), | |
| created_at=r.created_at, | |
| metadata=r.metadata, | |
| )) | |
| return VectorStoreListResponse(success=True, total=len(stores), stores=stores) | |
| async def get_vector_store( | |
| store_id: str, | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> VectorStoreResponse: | |
| try: | |
| stats = await vector_store_service.get_store_stats(store_id) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) | |
| return VectorStoreResponse( | |
| success=True, | |
| vector_store_id=stats["store_id"], | |
| app_id=stats["app_id"], | |
| name=stats["name"], | |
| description=stats["description"], | |
| embedding_dimension=stats["embedding_dimension"], | |
| document_count=stats["document_count"], | |
| created_at=stats["created_at"], | |
| metadata=stats["metadata"], | |
| ) | |
| async def delete_vector_store( | |
| store_id: str, | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> VectorStoreResponse: | |
| record = vector_store_service.get_store(store_id) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") | |
| ok = await vector_store_service.delete_store(store_id) | |
| if not ok: | |
| raise HTTPException(status_code=500, detail="Failed to delete vector store") | |
| return VectorStoreResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| app_id="", | |
| name=record.name, | |
| document_count=0, | |
| embedding_dimension=0, | |
| created_at=record.created_at, | |
| ) | |
| async def ingest_document( | |
| store_id: str, | |
| body: DocumentIngestRequest, | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> DocumentIngestResponse: | |
| record = vector_store_service.get_store(store_id) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") | |
| try: | |
| chunks, elapsed = await vector_store_service.ingest_document( | |
| store_id=store_id, | |
| doc_id=body.doc_id, | |
| text=body.text, | |
| source=body.source, | |
| metadata=body.metadata, | |
| chunk_size=body.chunk_size, | |
| chunk_overlap=body.chunk_overlap, | |
| ) | |
| except Exception as exc: | |
| logger.error("Ingest failed for store %s: %s", store_id, exc) | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=str(exc), | |
| ) | |
| return DocumentIngestResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=chunks, | |
| time_ms=round(elapsed, 3), | |
| ) | |
| async def ingest_pdf_document( | |
| store_id: str, | |
| file: UploadFile = File(..., description="PDF file to ingest"), | |
| doc_id: str = Form(..., min_length=1, max_length=256), | |
| chunk_size: int = Form(512, ge=64, le=4096), | |
| chunk_overlap: int = Form(64, ge=0, le=512), | |
| clean_content: bool = Query(True, description="Clean markdown text after PDF conversion"), | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> DocumentIngestResponse: | |
| record = vector_store_service.get_store(store_id) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") | |
| if file.content_type != "application/pdf": | |
| raise HTTPException(status_code=400, detail="Only PDF files are accepted") | |
| if not file.filename or not file.filename.lower().endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="Only .pdf files are accepted") | |
| try: | |
| raw = await file.read() | |
| finally: | |
| await file.close() | |
| text = await _process_pdf_bytes(raw, file.filename, clean_content) | |
| if isinstance(text, ConversionError): | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=text.message, | |
| ) | |
| try: | |
| chunks, elapsed = await vector_store_service.ingest_document( | |
| store_id=store_id, | |
| doc_id=doc_id, | |
| text=text, | |
| source=file.filename, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| except Exception as exc: | |
| logger.error("PDF ingest failed for store %s: %s", store_id, exc) | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=str(exc), | |
| ) | |
| return DocumentIngestResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| doc_id=doc_id, | |
| chunks_ingested=chunks, | |
| time_ms=round(elapsed, 3), | |
| ) | |
| async def ingest_pdf_url( | |
| store_id: str, | |
| body: DocumentIngestUrlRequest, | |
| clean_content: bool = Query(True, description="Clean markdown text after PDF conversion"), | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> DocumentIngestResponse: | |
| record = vector_store_service.get_store(store_id) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") | |
| import httpx | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: | |
| resp = await client.get(body.url) | |
| resp.raise_for_status() | |
| raw = resp.content | |
| except httpx.HTTPStatusError as exc: | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=f"Failed to fetch PDF from URL: HTTP {exc.response.status_code}", | |
| ) | |
| except httpx.RequestError as exc: | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=f"Failed to fetch PDF from URL: {exc}", | |
| ) | |
| text = await _process_pdf_bytes(raw, body.url, clean_content) | |
| if isinstance(text, ConversionError): | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=text.message, | |
| ) | |
| try: | |
| chunks, elapsed = await vector_store_service.ingest_document( | |
| store_id=store_id, | |
| doc_id=body.doc_id, | |
| text=text, | |
| source=body.url, | |
| chunk_size=body.chunk_size, | |
| chunk_overlap=body.chunk_overlap, | |
| ) | |
| except Exception as exc: | |
| logger.error("PDF URL ingest failed for store %s: %s", store_id, exc) | |
| return DocumentIngestResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=0, | |
| time_ms=0, | |
| error=str(exc), | |
| ) | |
| return DocumentIngestResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| doc_id=body.doc_id, | |
| chunks_ingested=chunks, | |
| time_ms=round(elapsed, 3), | |
| ) | |
| async def search_vector_store( | |
| store_id: str, | |
| body: SearchRequest, | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> SearchResponse: | |
| record = vector_store_service.get_store(store_id) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") | |
| try: | |
| items, elapsed = await vector_store_service.search( | |
| store_id=store_id, | |
| query_text=body.query, | |
| top_k=body.top_k, | |
| filter_expr=body.filter, | |
| min_score=body.min_score, | |
| include_vectors=body.include_vectors, | |
| include_metadata=body.include_metadata, | |
| ) | |
| except Exception as exc: | |
| logger.error("Search failed for store %s: %s", store_id, exc) | |
| return SearchResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| query=body.query, | |
| results=[], | |
| total_results=0, | |
| time_ms=0, | |
| error=str(exc), | |
| ) | |
| return SearchResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| query=body.query, | |
| results=items, | |
| total_results=len(items), | |
| time_ms=round(elapsed, 3), | |
| ) | |
| async def delete_documents( | |
| store_id: str, | |
| body: DeleteRequest, | |
| token: str = Depends(require_auth), | |
| vector_store_service: VectorStoreService = Depends(get_vector_store_service), | |
| ) -> DeleteResponse: | |
| record = vector_store_service.get_store(store_id) | |
| if record is None: | |
| raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found") | |
| start = time.perf_counter() | |
| try: | |
| deleted = await vector_store_service.delete_documents( | |
| store_id=store_id, | |
| ids=body.ids, | |
| filter_expr=body.filter, | |
| ) | |
| except Exception as exc: | |
| elapsed = (time.perf_counter() - start) * 1000 | |
| return DeleteResponse( | |
| success=False, | |
| vector_store_id=store_id, | |
| deleted_count=0, | |
| time_ms=round(elapsed, 3), | |
| error=str(exc), | |
| ) | |
| elapsed = (time.perf_counter() - start) * 1000 | |
| return DeleteResponse( | |
| success=True, | |
| vector_store_id=store_id, | |
| deleted_count=deleted, | |
| time_ms=round(elapsed, 3), | |
| ) | |