Spaces:
Sleeping
Sleeping
File size: 7,411 Bytes
7815eb4 13f26b4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc e4662f9 7815eb4 7aec9cc 7815eb4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc 7815eb4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc 7815eb4 7aec9cc 7815eb4 13f26b4 7815eb4 7aec9cc 7815eb4 7aec9cc 7815eb4 e4662f9 7815eb4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | import asyncio
from fastapi import APIRouter, Request, UploadFile, File, BackgroundTasks
from fastapi.params import Depends
from fastapi.responses import JSONResponse
# Import service functions
from services.file_validation import sanitize_for_display, validate_document
from services.storage import calculate_file_hash, check_file_size, human_readable_size, save_file
from services.text_extraction import extract_pages_from_pdf, validate_text, clean_txt
from services.chunker import chunk_text
from services.embedding import process_embeddings_background
from services.rate_limiter import limiter
#database imports
from db.qdrant import get_chunk_context_by_index, get_document_chunks_by_uuid, get_qdrant
from db.postgres import get_document_by_hash, get_document_text, get_pg_pool, insert_document, get_document_by_uuid, document_exists, update_document_activity
route = APIRouter(prefix="/documents", tags=["Documents"])
@route.post("/upload")
@limiter.limit("3/minute")
async def upload(request: Request, file: UploadFile = File(...), background_tasks: BackgroundTasks = None, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)):
file.filename = sanitize_for_display(file.filename)
#Check file type
if not file or not await validate_document(file):
return JSONResponse(content={"success": False, "error": "Invalid or unsupported file"}, status_code=400)
#save file to the uploads directory
content_size, content = await check_file_size(file)
if not content:
return JSONResponse(content={"success": False, "error": "File size exceeds the limit of 10MB."}, status_code=400)
#Calculate file hash and check for duplicates
file_hash = await calculate_file_hash(content)
try :
existing_doc = await get_document_by_hash(file_hash, pg_pool)
if existing_doc:
chunks = await get_document_chunks_by_uuid(existing_doc.get("id"), qdrant, limit=1)
if len(chunks) > 0:
return JSONResponse(content={"success": True, "metadata": existing_doc}, status_code=200)
except Exception as e:
print(f"Database error: {e}")
return JSONResponse(content={"success": False, "error": "Database error, Please try again."}, status_code=500)
document_id = await save_file(content)
#extract text from the PDF (placeholder for actual extraction logic)
pages = await asyncio.to_thread(extract_pages_from_pdf, document_id)
clean_text = clean_txt(pages)
validated_text = validate_text(clean_text)
chunks = chunk_text(validated_text)
#store document metadata and chunks in the database
try:
await insert_document(document_id, file.filename, validated_text, len(chunks), pages[-1]['page'], file_hash, pg_pool)
except Exception as e:
print(f"Database error: {e}")
return JSONResponse(content={"success": False, "error": "Database error, Please try again."}, status_code=500)
#embed chunks and store embeddings in the vector database
print(f"Scheduling background embedding for {document_id}...", flush=True)
background_tasks.add_task(process_embeddings_background, chunks, document_id, qdrant)
response = {
"success": True,
"metadata": {
"id": document_id,
"filename": file.filename,
"chunk_count": len(chunks),
"pages": pages[-1]['page'],
"character_count": len(validated_text),
"size": human_readable_size(content_size)
}
}
return JSONResponse(content=response)
@route.get("/{uuid}/status")
@limiter.limit("20/minute")
async def check_processing_status(request: Request, uuid: str, qdrant = Depends(get_qdrant)):
"""Checks if Qdrant has finished saving the chunks"""
try:
chunks = await get_document_chunks_by_uuid(uuid, qdrant, limit=1) # Fetch just one chunk to check if embedding is done
is_ready = len(chunks) > 0
return JSONResponse(content={"success": True, "ready": is_ready})
except Exception as e:
print(str(e))
return JSONResponse(content={"success": True, "ready": False})
@route.get("/{uuid}")
@limiter.limit("30/minute")
async def get_document(request: Request, uuid: str, pg_pool = Depends(get_pg_pool)):
''' Placeholder for fetching document metadata and chunks from the database '''
try:
await update_document_activity(uuid, pg_pool)
document = await get_document_by_uuid(uuid, pg_pool)
except Exception as e:
print(f"Database error: {e}")
return JSONResponse(content={"success": False, "message": "Database error"}, status_code=500)
if not document:
return JSONResponse(content={"success": False, "message": "Document not found"}, status_code=404)
return JSONResponse(content={"success": True, "document": document})
@route.get("/{uuid}/info")
@limiter.limit("30/minute")
async def get_document_info(request: Request, uuid: str, pg_pool = Depends(get_pg_pool)):
''' Placeholder for fetching document metadata and chunks from the database '''
try:
await update_document_activity(uuid, pg_pool)
document = await get_document_by_uuid(uuid, pg_pool) # Implement this function to fetch document metadata
except Exception as e:
print(f"Database error: {e}")
return JSONResponse(content={"success": False, "error": "Database error"}, status_code=500)
if not document:
return JSONResponse(content={"success": False, "error": "Document not found"}, status_code=404)
return JSONResponse(content={"success": True, "metadata": document})
@route.get("/{uuid}/preview")
@limiter.limit("30/minute")
async def get_document_chunks(request: Request, uuid: str, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)):
''' Placeholder for fetching document chunks from the database '''
try:
if not await document_exists(uuid, pg_pool):
return JSONResponse(content={"success": False, "error": "Document not found. Please try again."}, status_code=404)
await update_document_activity(uuid, pg_pool)
chunks = await get_document_chunks_by_uuid(uuid, qdrant)
text_preview = await get_document_text(uuid, pg_pool)
except Exception as e:
print(f"Database error: {e}")
return JSONResponse(content={"success": False, "error": "Database error. Please try again."}, status_code=500)
return JSONResponse(content={"success": True, "text": text_preview, "chunks": chunks})
@route.get("/{uuid}/{chunk_index}/context")
@limiter.limit("30/minute")
async def get_chunk_context(request: Request, uuid: str, chunk_index: int, pg_pool = Depends(get_pg_pool), qdrant = Depends(get_qdrant)):
''' Placeholder for fetching document chunks from the database '''
try:
if not await document_exists(uuid, pg_pool):
return JSONResponse(content={"success": False, "error": "Document not found. Please try again."}, status_code=404)
await update_document_activity(uuid, pg_pool)
index_pool = await get_chunk_context_by_index(uuid, chunk_index, qdrant)
if not index_pool:
return JSONResponse(content={"success": False, "error": "Chunk context not found. Please try again."}, status_code=404)
context = await get_document_text(uuid, pg_pool, min(index_pool), max(index_pool) - min(index_pool))
if not context:
return JSONResponse(content={"success": False, "error": "Chunk context not found. Please try again."}, status_code=404)
except Exception as e:
print(f"Database error: {e}")
return JSONResponse(content={"success": False, "error": "Database error. Please try again."}, status_code=500)
return JSONResponse(content={"success": True, "context": {"text": context, "start_char": min(index_pool), "end_char": max(index_pool)}}) |