llm-ready-data / app /api /v1 /vector_stores.py
light-infer-chat's picture
ok
08240ea
Raw
History Blame Contribute Delete
14.5 kB
from __future__ import annotations
import asyncio
import time
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status
from app.api.deps import get_vector_store_service, require_auth
from app.core.logger import get_logger
from app.models.domain import ConversionError
from app.models.schemas import (
DeleteRequest,
DeleteResponse,
DocumentIngestRequest,
DocumentIngestResponse,
DocumentIngestUrlRequest,
SearchRequest,
SearchResponse,
VectorStoreCreate,
VectorStoreListResponse,
VectorStoreResponse,
)
from app.services.converter_service import ConverterService
from app.services.text_cleaner_service import TextCleanerService
from app.services.vector_store_service import VectorStoreService
router = APIRouter()
logger = get_logger(__name__)
async def _process_pdf_bytes(raw: bytes, source: str, clean_content: bool) -> str | ConversionError:
if len(raw) < 5 or raw[:5] != b"%PDF-":
return ConversionError(source=source, error_type="ValueError", message="Not a valid PDF", duration_ms=0)
loop = asyncio.get_running_loop()
converter = ConverterService()
result = await loop.run_in_executor(None, converter.convert_stream, raw, source)
if isinstance(result, ConversionError):
return result
text = result.markdown
if clean_content:
text_cleaner = TextCleanerService()
text = await loop.run_in_executor(None, text_cleaner.clean, text)
return text
@router.post(
"/vector-stores",
response_model=VectorStoreResponse,
summary="Create a new vector store",
status_code=status.HTTP_201_CREATED,
)
async def create_vector_store(
body: VectorStoreCreate,
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> VectorStoreResponse:
store_id, app_id = await vector_store_service.create_store(
name=body.name,
description=body.description or "",
metadata=body.metadata,
)
stats = await vector_store_service.get_store_stats(store_id)
return VectorStoreResponse(
success=True,
vector_store_id=store_id,
app_id=app_id,
name=body.name,
description=body.description,
embedding_dimension=stats["embedding_dimension"],
document_count=stats["document_count"],
created_at=stats["created_at"],
metadata=stats["metadata"],
)
@router.get(
"/vector-stores",
response_model=VectorStoreListResponse,
summary="List all vector stores",
)
async def list_vector_stores(
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> VectorStoreListResponse:
records = vector_store_service.list_stores()
stores = []
for r in records:
try:
stats = await vector_store_service.get_store_stats(r.store_id)
except Exception:
stats = {}
stores.append(VectorStoreResponse(
success=True,
vector_store_id=r.store_id,
app_id=stats.get("app_id", r.store_id),
name=r.name,
description=r.description,
embedding_dimension=stats.get("embedding_dimension", 0),
document_count=stats.get("document_count", 0),
created_at=r.created_at,
metadata=r.metadata,
))
return VectorStoreListResponse(success=True, total=len(stores), stores=stores)
@router.get(
"/vector-stores/{store_id}",
response_model=VectorStoreResponse,
summary="Get vector store details",
)
async def get_vector_store(
store_id: str,
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> VectorStoreResponse:
try:
stats = await vector_store_service.get_store_stats(store_id)
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc))
return VectorStoreResponse(
success=True,
vector_store_id=stats["store_id"],
app_id=stats["app_id"],
name=stats["name"],
description=stats["description"],
embedding_dimension=stats["embedding_dimension"],
document_count=stats["document_count"],
created_at=stats["created_at"],
metadata=stats["metadata"],
)
@router.delete(
"/vector-stores/{store_id}",
response_model=VectorStoreResponse,
summary="Delete a vector store and all its data",
)
async def delete_vector_store(
store_id: str,
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> VectorStoreResponse:
record = vector_store_service.get_store(store_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found")
ok = await vector_store_service.delete_store(store_id)
if not ok:
raise HTTPException(status_code=500, detail="Failed to delete vector store")
return VectorStoreResponse(
success=True,
vector_store_id=store_id,
app_id="",
name=record.name,
document_count=0,
embedding_dimension=0,
created_at=record.created_at,
)
@router.post(
"/vector-stores/{store_id}/documents",
response_model=DocumentIngestResponse,
summary="Ingest a document into the vector store with automatic chunking and embedding",
)
async def ingest_document(
store_id: str,
body: DocumentIngestRequest,
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> DocumentIngestResponse:
record = vector_store_service.get_store(store_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found")
try:
chunks, elapsed = await vector_store_service.ingest_document(
store_id=store_id,
doc_id=body.doc_id,
text=body.text,
source=body.source,
metadata=body.metadata,
chunk_size=body.chunk_size,
chunk_overlap=body.chunk_overlap,
)
except Exception as exc:
logger.error("Ingest failed for store %s: %s", store_id, exc)
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=0,
time_ms=0,
error=str(exc),
)
return DocumentIngestResponse(
success=True,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=chunks,
time_ms=round(elapsed, 3),
)
@router.post(
"/vector-stores/{store_id}/documents/upload",
response_model=DocumentIngestResponse,
summary="Upload a PDF file and ingest it into the vector store",
)
async def ingest_pdf_document(
store_id: str,
file: UploadFile = File(..., description="PDF file to ingest"),
doc_id: str = Form(..., min_length=1, max_length=256),
chunk_size: int = Form(512, ge=64, le=4096),
chunk_overlap: int = Form(64, ge=0, le=512),
clean_content: bool = Query(True, description="Clean markdown text after PDF conversion"),
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> DocumentIngestResponse:
record = vector_store_service.get_store(store_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found")
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Only PDF files are accepted")
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only .pdf files are accepted")
try:
raw = await file.read()
finally:
await file.close()
text = await _process_pdf_bytes(raw, file.filename, clean_content)
if isinstance(text, ConversionError):
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=doc_id,
chunks_ingested=0,
time_ms=0,
error=text.message,
)
try:
chunks, elapsed = await vector_store_service.ingest_document(
store_id=store_id,
doc_id=doc_id,
text=text,
source=file.filename,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
except Exception as exc:
logger.error("PDF ingest failed for store %s: %s", store_id, exc)
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=doc_id,
chunks_ingested=0,
time_ms=0,
error=str(exc),
)
return DocumentIngestResponse(
success=True,
vector_store_id=store_id,
doc_id=doc_id,
chunks_ingested=chunks,
time_ms=round(elapsed, 3),
)
@router.post(
"/vector-stores/{store_id}/documents/upload-url",
response_model=DocumentIngestResponse,
summary="Ingest a PDF from a URL into the vector store",
)
async def ingest_pdf_url(
store_id: str,
body: DocumentIngestUrlRequest,
clean_content: bool = Query(True, description="Clean markdown text after PDF conversion"),
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> DocumentIngestResponse:
record = vector_store_service.get_store(store_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found")
import httpx
try:
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(body.url)
resp.raise_for_status()
raw = resp.content
except httpx.HTTPStatusError as exc:
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=0,
time_ms=0,
error=f"Failed to fetch PDF from URL: HTTP {exc.response.status_code}",
)
except httpx.RequestError as exc:
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=0,
time_ms=0,
error=f"Failed to fetch PDF from URL: {exc}",
)
text = await _process_pdf_bytes(raw, body.url, clean_content)
if isinstance(text, ConversionError):
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=0,
time_ms=0,
error=text.message,
)
try:
chunks, elapsed = await vector_store_service.ingest_document(
store_id=store_id,
doc_id=body.doc_id,
text=text,
source=body.url,
chunk_size=body.chunk_size,
chunk_overlap=body.chunk_overlap,
)
except Exception as exc:
logger.error("PDF URL ingest failed for store %s: %s", store_id, exc)
return DocumentIngestResponse(
success=False,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=0,
time_ms=0,
error=str(exc),
)
return DocumentIngestResponse(
success=True,
vector_store_id=store_id,
doc_id=body.doc_id,
chunks_ingested=chunks,
time_ms=round(elapsed, 3),
)
@router.post(
"/vector-stores/{store_id}/search",
response_model=SearchResponse,
summary="Search the vector store using natural language queries (RAG)",
)
async def search_vector_store(
store_id: str,
body: SearchRequest,
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> SearchResponse:
record = vector_store_service.get_store(store_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found")
try:
items, elapsed = await vector_store_service.search(
store_id=store_id,
query_text=body.query,
top_k=body.top_k,
filter_expr=body.filter,
min_score=body.min_score,
include_vectors=body.include_vectors,
include_metadata=body.include_metadata,
)
except Exception as exc:
logger.error("Search failed for store %s: %s", store_id, exc)
return SearchResponse(
success=False,
vector_store_id=store_id,
query=body.query,
results=[],
total_results=0,
time_ms=0,
error=str(exc),
)
return SearchResponse(
success=True,
vector_store_id=store_id,
query=body.query,
results=items,
total_results=len(items),
time_ms=round(elapsed, 3),
)
@router.post(
"/vector-stores/{store_id}/delete",
response_model=DeleteResponse,
summary="Delete documents from the vector store by IDs or filter",
)
async def delete_documents(
store_id: str,
body: DeleteRequest,
token: str = Depends(require_auth),
vector_store_service: VectorStoreService = Depends(get_vector_store_service),
) -> DeleteResponse:
record = vector_store_service.get_store(store_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Vector store {store_id} not found")
start = time.perf_counter()
try:
deleted = await vector_store_service.delete_documents(
store_id=store_id,
ids=body.ids,
filter_expr=body.filter,
)
except Exception as exc:
elapsed = (time.perf_counter() - start) * 1000
return DeleteResponse(
success=False,
vector_store_id=store_id,
deleted_count=0,
time_ms=round(elapsed, 3),
error=str(exc),
)
elapsed = (time.perf_counter() - start) * 1000
return DeleteResponse(
success=True,
vector_store_id=store_id,
deleted_count=deleted,
time_ms=round(elapsed, 3),
)