BrejBala's picture
Deploy backend Docker app
e63c592
from typing import Any, Dict, List
from fastapi import APIRouter, Query
from fastapi.concurrency import run_in_threadpool
from langchain_core.documents import Document
from app.core.config import get_settings
from app.core.logging import get_logger
from app.schemas.documents import (
DocumentsStatsResponse,
NamespaceStat,
UploadTextRequest,
UploadTextResponse,
)
from app.services import chunking as chunking_service
from app.services import dedupe as dedupe_service
from app.services.normalize import make_doc_id, normalize_text, is_valid_document
from app.services.pinecone_store import describe_index_stats, upsert_records
logger = get_logger(__name__)
router = APIRouter(prefix="/documents", tags=["documents"])
@router.post(
"/upload-text",
response_model=UploadTextResponse,
summary="Upload raw text or Docling output",
description=(
"Accepts manual text uploads or Docling-converted content, normalizes and "
"chunks the text, and upserts it into Pinecone."
),
)
async def upload_text(payload: UploadTextRequest) -> UploadTextResponse:
settings = get_settings()
namespace = payload.namespace or settings.PINECONE_NAMESPACE
normalized = normalize_text(payload.text)
if not is_valid_document(normalized):
logger.info(
"Skipping manual upload for title='%s' due to insufficient length (len=%d)",
payload.title,
len(normalized),
)
return UploadTextResponse(
namespace=namespace,
source=payload.source,
ingested_documents=0,
ingested_chunks=0,
)
metadata: Dict[str, Any] = payload.metadata.copy() if payload.metadata else {}
url = metadata.get("url", "")
published = metadata.get("published", "")
doc_id = make_doc_id(source=payload.source, title=payload.title, url=url)
metadata.update(
{
"title": payload.title,
"source": payload.source,
"url": url,
"published": published,
"doc_id": doc_id,
}
)
document = Document(page_content=normalized, metadata=metadata)
records = chunking_service.documents_to_records([document])
records = dedupe_service.dedupe_records(records)
total_upserted = await run_in_threadpool(upsert_records, namespace, records)
return UploadTextResponse(
namespace=namespace,
source=payload.source,
ingested_documents=1,
ingested_chunks=total_upserted,
)
@router.get(
"/stats",
response_model=DocumentsStatsResponse,
summary="Get document statistics",
description="Returns vector counts per namespace from the configured Pinecone index.",
)
async def documents_stats(
namespace: str | None = Query(
default=None,
description="Optional namespace filter; if omitted, stats for all namespaces are returned",
),
) -> DocumentsStatsResponse:
raw_stats = await run_in_threadpool(describe_index_stats, namespace)
stats: Dict[str, NamespaceStat] = {
ns_name: NamespaceStat(vector_count=ns_info.get("vector_count", 0))
for ns_name, ns_info in raw_stats.items()
}
return DocumentsStatsResponse(namespaces=stats)