import asyncio from fastapi import APIRouter, Request, UploadFile, File, BackgroundTasks from fastapi.params import Depends from fastapi.responses import JSONResponse # Import service functions from services.file_validation import sanitize_for_display, validate_document from services.storage import calculate_file_hash, check_file_size, human_readable_size, save_file from services.text_extraction import extract_pages_from_pdf, validate_text, clean_txt from services.chunker import chunk_text from services.embedding import process_embeddings_background from services.rate_limiter import limiter #database imports from db.qdrant import get_chunk_context_by_index, get_document_chunks_by_uuid, get_qdrant from db.postgres import get_document_by_hash, get_document_text, get_pg_pool, insert_document, get_document_by_uuid, document_exists, update_document_activity route = APIRouter(prefix="/documents", tags=["Documents"]) @route.post("/upload") @limiter.limit("3/minute") async def upload(request: Request, file: UploadFile = File(...), background_tasks: BackgroundTasks = None, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)): file.filename = sanitize_for_display(file.filename) #Check file type if not file or not await validate_document(file): return JSONResponse(content={"success": False, "error": "Invalid or unsupported file"}, status_code=400) #save file to the uploads directory content_size, content = await check_file_size(file) if not content: return JSONResponse(content={"success": False, "error": "File size exceeds the limit of 10MB."}, status_code=400) #Calculate file hash and check for duplicates file_hash = await calculate_file_hash(content) try : existing_doc = await get_document_by_hash(file_hash, pg_pool) if existing_doc: chunks = await get_document_chunks_by_uuid(existing_doc.get("id"), qdrant, limit=1) if len(chunks) > 0: return JSONResponse(content={"success": True, "metadata": existing_doc}, status_code=200) except Exception as e: print(f"Database error: {e}") return JSONResponse(content={"success": False, "error": "Database error, Please try again."}, status_code=500) document_id = await save_file(content) #extract text from the PDF (placeholder for actual extraction logic) pages = await asyncio.to_thread(extract_pages_from_pdf, document_id) clean_text = clean_txt(pages) validated_text = validate_text(clean_text) chunks = chunk_text(validated_text) #store document metadata and chunks in the database try: await insert_document(document_id, file.filename, validated_text, len(chunks), pages[-1]['page'], file_hash, pg_pool) except Exception as e: print(f"Database error: {e}") return JSONResponse(content={"success": False, "error": "Database error, Please try again."}, status_code=500) #embed chunks and store embeddings in the vector database print(f"Scheduling background embedding for {document_id}...", flush=True) background_tasks.add_task(process_embeddings_background, chunks, document_id, qdrant) response = { "success": True, "metadata": { "id": document_id, "filename": file.filename, "chunk_count": len(chunks), "pages": pages[-1]['page'], "character_count": len(validated_text), "size": human_readable_size(content_size) } } return JSONResponse(content=response) @route.get("/{uuid}/status") @limiter.limit("20/minute") async def check_processing_status(request: Request, uuid: str, qdrant = Depends(get_qdrant)): """Checks if Qdrant has finished saving the chunks""" try: chunks = await get_document_chunks_by_uuid(uuid, qdrant, limit=1) # Fetch just one chunk to check if embedding is done is_ready = len(chunks) > 0 return JSONResponse(content={"success": True, "ready": is_ready}) except Exception as e: print(str(e)) return JSONResponse(content={"success": True, "ready": False}) @route.get("/{uuid}") @limiter.limit("30/minute") async def get_document(request: Request, uuid: str, pg_pool = Depends(get_pg_pool)): ''' Placeholder for fetching document metadata and chunks from the database ''' try: await update_document_activity(uuid, pg_pool) document = await get_document_by_uuid(uuid, pg_pool) except Exception as e: print(f"Database error: {e}") return JSONResponse(content={"success": False, "message": "Database error"}, status_code=500) if not document: return JSONResponse(content={"success": False, "message": "Document not found"}, status_code=404) return JSONResponse(content={"success": True, "document": document}) @route.get("/{uuid}/info") @limiter.limit("30/minute") async def get_document_info(request: Request, uuid: str, pg_pool = Depends(get_pg_pool)): ''' Placeholder for fetching document metadata and chunks from the database ''' try: await update_document_activity(uuid, pg_pool) document = await get_document_by_uuid(uuid, pg_pool) # Implement this function to fetch document metadata except Exception as e: print(f"Database error: {e}") return JSONResponse(content={"success": False, "error": "Database error"}, status_code=500) if not document: return JSONResponse(content={"success": False, "error": "Document not found"}, status_code=404) return JSONResponse(content={"success": True, "metadata": document}) @route.get("/{uuid}/preview") @limiter.limit("30/minute") async def get_document_chunks(request: Request, uuid: str, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)): ''' Placeholder for fetching document chunks from the database ''' try: if not await document_exists(uuid, pg_pool): return JSONResponse(content={"success": False, "error": "Document not found. Please try again."}, status_code=404) await update_document_activity(uuid, pg_pool) chunks = await get_document_chunks_by_uuid(uuid, qdrant) text_preview = await get_document_text(uuid, pg_pool) except Exception as e: print(f"Database error: {e}") return JSONResponse(content={"success": False, "error": "Database error. Please try again."}, status_code=500) return JSONResponse(content={"success": True, "text": text_preview, "chunks": chunks}) @route.get("/{uuid}/{chunk_index}/context") @limiter.limit("30/minute") async def get_chunk_context(request: Request, uuid: str, chunk_index: int, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)): ''' Placeholder for fetching document chunks from the database ''' try: if not await document_exists(uuid, pg_pool): return JSONResponse(content={"success": False, "error": "Document not found. Please try again."}, status_code=404) await update_document_activity(uuid, pg_pool) index_pool = await get_chunk_context_by_index(uuid, chunk_index, qdrant) if not index_pool: return JSONResponse(content={"success": False, "error": "Chunk context not found. Please try again."}, status_code=404) context = await get_document_text(uuid, pg_pool, min(index_pool), max(index_pool) - min(index_pool)) if not context: return JSONResponse(content={"success": False, "error": "Chunk context not found. Please try again."}, status_code=404) except Exception as e: print(f"Database error: {e}") return JSONResponse(content={"success": False, "error": "Database error. Please try again."}, status_code=500) return JSONResponse(content={"success": True, "context": {"text": context, "start_char": min(index_pool), "end_char": max(index_pool)}})