Spaces:
Sleeping
Sleeping
| import asyncio | |
| from fastapi import APIRouter, Request, UploadFile, File, BackgroundTasks | |
| from fastapi.params import Depends | |
| from fastapi.responses import JSONResponse | |
| # Import service functions | |
| from services.file_validation import sanitize_for_display, validate_document | |
| from services.storage import calculate_file_hash, check_file_size, human_readable_size, save_file | |
| from services.text_extraction import extract_pages_from_pdf, validate_text, clean_txt | |
| from services.chunker import chunk_text | |
| from services.embedding import process_embeddings_background | |
| from services.rate_limiter import limiter | |
| #database imports | |
| from db.qdrant import get_chunk_context_by_index, get_document_chunks_by_uuid, get_qdrant | |
| from db.postgres import get_document_by_hash, get_document_text, get_pg_pool, insert_document, get_document_by_uuid, document_exists, update_document_activity | |
| route = APIRouter(prefix="/documents", tags=["Documents"]) | |
| async def upload(request: Request, file: UploadFile = File(...), background_tasks: BackgroundTasks = None, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)): | |
| file.filename = sanitize_for_display(file.filename) | |
| #Check file type | |
| if not file or not await validate_document(file): | |
| return JSONResponse(content={"success": False, "error": "Invalid or unsupported file"}, status_code=400) | |
| #save file to the uploads directory | |
| content_size, content = await check_file_size(file) | |
| if not content: | |
| return JSONResponse(content={"success": False, "error": "File size exceeds the limit of 10MB."}, status_code=400) | |
| #Calculate file hash and check for duplicates | |
| file_hash = await calculate_file_hash(content) | |
| try : | |
| existing_doc = await get_document_by_hash(file_hash, pg_pool) | |
| if existing_doc: | |
| chunks = await get_document_chunks_by_uuid(existing_doc.get("id"), qdrant, limit=1) | |
| if len(chunks) > 0: | |
| return JSONResponse(content={"success": True, "metadata": existing_doc}, status_code=200) | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return JSONResponse(content={"success": False, "error": "Database error, Please try again."}, status_code=500) | |
| document_id = await save_file(content) | |
| #extract text from the PDF (placeholder for actual extraction logic) | |
| pages = await asyncio.to_thread(extract_pages_from_pdf, document_id) | |
| clean_text = clean_txt(pages) | |
| validated_text = validate_text(clean_text) | |
| chunks = chunk_text(validated_text) | |
| #store document metadata and chunks in the database | |
| try: | |
| await insert_document(document_id, file.filename, validated_text, len(chunks), pages[-1]['page'], file_hash, pg_pool) | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return JSONResponse(content={"success": False, "error": "Database error, Please try again."}, status_code=500) | |
| #embed chunks and store embeddings in the vector database | |
| print(f"Scheduling background embedding for {document_id}...", flush=True) | |
| background_tasks.add_task(process_embeddings_background, chunks, document_id, qdrant) | |
| response = { | |
| "success": True, | |
| "metadata": { | |
| "id": document_id, | |
| "filename": file.filename, | |
| "chunk_count": len(chunks), | |
| "pages": pages[-1]['page'], | |
| "character_count": len(validated_text), | |
| "size": human_readable_size(content_size) | |
| } | |
| } | |
| return JSONResponse(content=response) | |
| async def check_processing_status(request: Request, uuid: str, qdrant = Depends(get_qdrant)): | |
| """Checks if Qdrant has finished saving the chunks""" | |
| try: | |
| chunks = await get_document_chunks_by_uuid(uuid, qdrant, limit=1) # Fetch just one chunk to check if embedding is done | |
| is_ready = len(chunks) > 0 | |
| return JSONResponse(content={"success": True, "ready": is_ready}) | |
| except Exception as e: | |
| print(str(e)) | |
| return JSONResponse(content={"success": True, "ready": False}) | |
| async def get_document(request: Request, uuid: str, pg_pool = Depends(get_pg_pool)): | |
| ''' Placeholder for fetching document metadata and chunks from the database ''' | |
| try: | |
| await update_document_activity(uuid, pg_pool) | |
| document = await get_document_by_uuid(uuid, pg_pool) | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return JSONResponse(content={"success": False, "message": "Database error"}, status_code=500) | |
| if not document: | |
| return JSONResponse(content={"success": False, "message": "Document not found"}, status_code=404) | |
| return JSONResponse(content={"success": True, "document": document}) | |
| async def get_document_info(request: Request, uuid: str, pg_pool = Depends(get_pg_pool)): | |
| ''' Placeholder for fetching document metadata and chunks from the database ''' | |
| try: | |
| await update_document_activity(uuid, pg_pool) | |
| document = await get_document_by_uuid(uuid, pg_pool) # Implement this function to fetch document metadata | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return JSONResponse(content={"success": False, "error": "Database error"}, status_code=500) | |
| if not document: | |
| return JSONResponse(content={"success": False, "error": "Document not found"}, status_code=404) | |
| return JSONResponse(content={"success": True, "metadata": document}) | |
| async def get_document_chunks(request: Request, uuid: str, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)): | |
| ''' Placeholder for fetching document chunks from the database ''' | |
| try: | |
| if not await document_exists(uuid, pg_pool): | |
| return JSONResponse(content={"success": False, "error": "Document not found. Please try again."}, status_code=404) | |
| await update_document_activity(uuid, pg_pool) | |
| chunks = await get_document_chunks_by_uuid(uuid, qdrant) | |
| text_preview = await get_document_text(uuid, pg_pool) | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return JSONResponse(content={"success": False, "error": "Database error. Please try again."}, status_code=500) | |
| return JSONResponse(content={"success": True, "text": text_preview, "chunks": chunks}) | |
| async def get_chunk_context(request: Request, uuid: str, chunk_index: int, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)): | |
| ''' Placeholder for fetching document chunks from the database ''' | |
| try: | |
| if not await document_exists(uuid, pg_pool): | |
| return JSONResponse(content={"success": False, "error": "Document not found. Please try again."}, status_code=404) | |
| await update_document_activity(uuid, pg_pool) | |
| index_pool = await get_chunk_context_by_index(uuid, chunk_index, qdrant) | |
| if not index_pool: | |
| return JSONResponse(content={"success": False, "error": "Chunk context not found. Please try again."}, status_code=404) | |
| context = await get_document_text(uuid, pg_pool, min(index_pool), max(index_pool) - min(index_pool)) | |
| if not context: | |
| return JSONResponse(content={"success": False, "error": "Chunk context not found. Please try again."}, status_code=404) | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return JSONResponse(content={"success": False, "error": "Database error. Please try again."}, status_code=500) | |
| return JSONResponse(content={"success": True, "context": {"text": context, "start_char": min(index_pool), "end_char": max(index_pool)}}) |