| | """ |
| | SPARKNET Document API Routes |
| | Endpoints for document upload, processing, and management. |
| | """ |
| |
|
| | from fastapi import APIRouter, UploadFile, File, HTTPException, Query, Depends, BackgroundTasks |
| | from fastapi.responses import StreamingResponse |
| | from typing import List, Optional |
| | from pathlib import Path |
| | from datetime import datetime |
| | import hashlib |
| | import shutil |
| | import uuid |
| | import io |
| | import sys |
| |
|
| | |
| | PROJECT_ROOT = Path(__file__).parent.parent.parent |
| | sys.path.insert(0, str(PROJECT_ROOT)) |
| |
|
| | from api.schemas import ( |
| | DocumentUploadResponse, DocumentResponse, DocumentMetadata, |
| | DocumentDetailResponse, ChunksResponse, ChunkInfo, |
| | OCRRegionInfo, LayoutRegionInfo, DocumentStatus, |
| | IndexRequest, IndexResponse, BatchIndexRequest, BatchIndexResponse |
| | ) |
| | from loguru import logger |
| |
|
| | router = APIRouter() |
| |
|
| | |
| | _documents = {} |
| | _processing_tasks = {} |
| |
|
| | |
| | SUPPORTED_EXTENSIONS = { |
| | '.pdf': 'application/pdf', |
| | '.png': 'image/png', |
| | '.jpg': 'image/jpeg', |
| | '.jpeg': 'image/jpeg', |
| | '.tiff': 'image/tiff', |
| | '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', |
| | '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', |
| | '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', |
| | '.txt': 'text/plain', |
| | '.md': 'text/markdown', |
| | } |
| |
|
| | UPLOAD_DIR = PROJECT_ROOT / "uploads" / "documents" |
| | UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
| |
|
| |
|
| | def generate_doc_id(filename: str, content: bytes) -> str: |
| | """Generate unique document ID from filename and content hash.""" |
| | content_hash = hashlib.md5(content[:4096]).hexdigest()[:8] |
| | timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| | return f"doc_{timestamp}_{content_hash}" |
| |
|
| |
|
| | async def process_document_task(doc_id: str, file_path: Path, file_type: str): |
| | """Background task to process a document.""" |
| | try: |
| | logger.info(f"Processing document: {doc_id}") |
| | _documents[doc_id]["status"] = DocumentStatus.PROCESSING |
| |
|
| | |
| | try: |
| | from src.document.pipeline.processor import DocumentProcessor, PipelineConfig |
| |
|
| | config = PipelineConfig( |
| | ocr_enabled=True, |
| | layout_enabled=True, |
| | chunking_enabled=True, |
| | ) |
| | processor = DocumentProcessor(config) |
| | result = processor.process(str(file_path)) |
| |
|
| | |
| | chunks = [] |
| | for i, chunk in enumerate(getattr(result, 'chunks', [])): |
| | chunks.append({ |
| | "chunk_id": f"{doc_id}_chunk_{i}", |
| | "doc_id": doc_id, |
| | "text": getattr(chunk, 'text', str(chunk)), |
| | "chunk_type": getattr(chunk, 'chunk_type', 'text'), |
| | "page_num": getattr(chunk, 'page', 0), |
| | "confidence": getattr(chunk, 'confidence', 1.0), |
| | "bbox": getattr(chunk, 'bbox', None), |
| | }) |
| |
|
| | _documents[doc_id].update({ |
| | "status": DocumentStatus.COMPLETED, |
| | "raw_text": getattr(result, 'raw_text', ''), |
| | "chunks": chunks, |
| | "page_count": getattr(result, 'page_count', 1), |
| | "ocr_regions": getattr(result, 'ocr_regions', []), |
| | "layout_regions": getattr(result, 'layout_regions', []), |
| | "processing_time": getattr(result, 'processing_time', 0.0), |
| | "updated_at": datetime.now(), |
| | }) |
| |
|
| | logger.success(f"Document {doc_id} processed successfully: {len(chunks)} chunks") |
| |
|
| | except Exception as proc_error: |
| | logger.warning(f"Full processor unavailable: {proc_error}, using fallback") |
| | |
| | raw_text = "" |
| |
|
| | if file_type in ['.pdf']: |
| | try: |
| | import fitz |
| | doc = fitz.open(str(file_path)) |
| | for page in doc: |
| | raw_text += page.get_text() + "\n" |
| | page_count = len(doc) |
| | doc.close() |
| | except Exception as e: |
| | logger.error(f"PDF extraction failed: {e}") |
| | page_count = 1 |
| |
|
| | elif file_type in ['.txt', '.md']: |
| | raw_text = file_path.read_text(errors='ignore') |
| | page_count = 1 |
| |
|
| | elif file_type == '.docx': |
| | try: |
| | from docx import Document |
| | doc = Document(str(file_path)) |
| | raw_text = "\n".join([p.text for p in doc.paragraphs]) |
| | page_count = max(1, len(raw_text) // 3000) |
| | except Exception as e: |
| | logger.error(f"DOCX extraction failed: {e}") |
| | page_count = 1 |
| |
|
| | elif file_type == '.xlsx': |
| | try: |
| | import pandas as pd |
| | df_dict = pd.read_excel(str(file_path), sheet_name=None) |
| | for sheet_name, df in df_dict.items(): |
| | raw_text += f"\n=== Sheet: {sheet_name} ===\n" |
| | raw_text += df.to_string() + "\n" |
| | page_count = len(df_dict) |
| | except Exception as e: |
| | logger.error(f"XLSX extraction failed: {e}") |
| | page_count = 1 |
| |
|
| | elif file_type == '.pptx': |
| | try: |
| | from pptx import Presentation |
| | prs = Presentation(str(file_path)) |
| | for i, slide in enumerate(prs.slides): |
| | raw_text += f"\n=== Slide {i+1} ===\n" |
| | for shape in slide.shapes: |
| | if hasattr(shape, "text"): |
| | raw_text += shape.text + "\n" |
| | page_count = len(prs.slides) |
| | except Exception as e: |
| | logger.error(f"PPTX extraction failed: {e}") |
| | page_count = 1 |
| |
|
| | |
| | chunks = [] |
| | chunk_size = 1000 |
| | text_chunks = [raw_text[i:i+chunk_size] for i in range(0, len(raw_text), chunk_size - 100)] |
| | for i, text in enumerate(text_chunks): |
| | if text.strip(): |
| | chunks.append({ |
| | "chunk_id": f"{doc_id}_chunk_{i}", |
| | "doc_id": doc_id, |
| | "text": text.strip(), |
| | "chunk_type": "text", |
| | "page_num": min(i * chunk_size // 3000 + 1, page_count), |
| | "confidence": 1.0, |
| | "bbox": None, |
| | }) |
| |
|
| | _documents[doc_id].update({ |
| | "status": DocumentStatus.COMPLETED, |
| | "raw_text": raw_text, |
| | "chunks": chunks, |
| | "page_count": page_count, |
| | "ocr_regions": [], |
| | "layout_regions": [], |
| | "processing_time": 0.0, |
| | "updated_at": datetime.now(), |
| | }) |
| |
|
| | logger.info(f"Document {doc_id} processed with fallback: {len(chunks)} chunks") |
| |
|
| | except Exception as e: |
| | logger.error(f"Document processing failed for {doc_id}: {e}") |
| | _documents[doc_id]["status"] = DocumentStatus.ERROR |
| | _documents[doc_id]["error"] = str(e) |
| |
|
| |
|
| | @router.post("/upload", response_model=DocumentUploadResponse) |
| | async def upload_document( |
| | background_tasks: BackgroundTasks, |
| | file: UploadFile = File(...), |
| | auto_process: bool = Query(True, description="Automatically process after upload"), |
| | auto_index: bool = Query(False, description="Automatically index to RAG after processing"), |
| | ): |
| | """ |
| | Upload a document for processing. |
| | |
| | Supported formats: PDF, PNG, JPG, DOCX, XLSX, PPTX, TXT, MD |
| | """ |
| | |
| | file_ext = Path(file.filename).suffix.lower() |
| | if file_ext not in SUPPORTED_EXTENSIONS: |
| | raise HTTPException( |
| | status_code=400, |
| | detail=f"Unsupported file type: {file_ext}. Supported: {list(SUPPORTED_EXTENSIONS.keys())}" |
| | ) |
| |
|
| | |
| | content = await file.read() |
| | if len(content) == 0: |
| | raise HTTPException(status_code=400, detail="Empty file uploaded") |
| |
|
| | |
| | doc_id = generate_doc_id(file.filename, content) |
| |
|
| | |
| | file_path = UPLOAD_DIR / f"{doc_id}{file_ext}" |
| | with open(file_path, "wb") as f: |
| | f.write(content) |
| |
|
| | |
| | _documents[doc_id] = { |
| | "doc_id": doc_id, |
| | "filename": file.filename, |
| | "file_type": file_ext, |
| | "file_path": str(file_path), |
| | "status": DocumentStatus.PENDING, |
| | "raw_text": "", |
| | "chunks": [], |
| | "page_count": 0, |
| | "ocr_regions": [], |
| | "layout_regions": [], |
| | "indexed": False, |
| | "indexed_chunks": 0, |
| | "processing_time": None, |
| | "created_at": datetime.now(), |
| | "updated_at": None, |
| | "auto_index": auto_index, |
| | } |
| |
|
| | |
| | if auto_process: |
| | background_tasks.add_task(process_document_task, doc_id, file_path, file_ext) |
| | status = DocumentStatus.PROCESSING |
| | message = "Document uploaded and processing started" |
| | else: |
| | status = DocumentStatus.PENDING |
| | message = "Document uploaded successfully. Call /process to begin processing." |
| |
|
| | _documents[doc_id]["status"] = status |
| |
|
| | return DocumentUploadResponse( |
| | doc_id=doc_id, |
| | filename=file.filename, |
| | status=status, |
| | message=message, |
| | created_at=_documents[doc_id]["created_at"] |
| | ) |
| |
|
| |
|
| | @router.get("", response_model=List[DocumentMetadata]) |
| | async def list_documents( |
| | status: Optional[DocumentStatus] = Query(None, description="Filter by status"), |
| | indexed: Optional[bool] = Query(None, description="Filter by indexed status"), |
| | limit: int = Query(50, ge=1, le=200), |
| | offset: int = Query(0, ge=0), |
| | ): |
| | """List all documents with optional filtering.""" |
| | docs = list(_documents.values()) |
| |
|
| | |
| | if status: |
| | docs = [d for d in docs if d["status"] == status] |
| | if indexed is not None: |
| | docs = [d for d in docs if d.get("indexed", False) == indexed] |
| |
|
| | |
| | docs = docs[offset:offset + limit] |
| |
|
| | return [ |
| | DocumentMetadata( |
| | doc_id=d["doc_id"], |
| | filename=d["filename"], |
| | file_type=d["file_type"], |
| | page_count=d.get("page_count", 0), |
| | chunk_count=len(d.get("chunks", [])), |
| | text_length=len(d.get("raw_text", "")), |
| | status=d["status"], |
| | indexed=d.get("indexed", False), |
| | indexed_chunks=d.get("indexed_chunks", 0), |
| | processing_time=d.get("processing_time"), |
| | created_at=d["created_at"], |
| | updated_at=d.get("updated_at"), |
| | ) |
| | for d in docs |
| | ] |
| |
|
| |
|
| | @router.get("/{doc_id}", response_model=DocumentResponse) |
| | async def get_document( |
| | doc_id: str, |
| | include_text: bool = Query(False, description="Include full raw text"), |
| | ): |
| | """Get document by ID.""" |
| | if doc_id not in _documents: |
| | raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
| |
|
| | d = _documents[doc_id] |
| |
|
| | return DocumentResponse( |
| | doc_id=d["doc_id"], |
| | filename=d["filename"], |
| | file_type=d["file_type"], |
| | status=d["status"], |
| | metadata=DocumentMetadata( |
| | doc_id=d["doc_id"], |
| | filename=d["filename"], |
| | file_type=d["file_type"], |
| | page_count=d.get("page_count", 0), |
| | chunk_count=len(d.get("chunks", [])), |
| | text_length=len(d.get("raw_text", "")), |
| | status=d["status"], |
| | indexed=d.get("indexed", False), |
| | indexed_chunks=d.get("indexed_chunks", 0), |
| | processing_time=d.get("processing_time"), |
| | created_at=d["created_at"], |
| | updated_at=d.get("updated_at"), |
| | ), |
| | raw_text=d.get("raw_text") if include_text else None, |
| | preview=d.get("raw_text", "")[:500] if d.get("raw_text") else None, |
| | ) |
| |
|
| |
|
| | @router.get("/{doc_id}/detail", response_model=DocumentDetailResponse) |
| | async def get_document_detail(doc_id: str): |
| | """Get detailed document information including chunks and regions.""" |
| | if doc_id not in _documents: |
| | raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
| |
|
| | d = _documents[doc_id] |
| |
|
| | return DocumentDetailResponse( |
| | doc_id=d["doc_id"], |
| | filename=d["filename"], |
| | status=d["status"], |
| | metadata=DocumentMetadata( |
| | doc_id=d["doc_id"], |
| | filename=d["filename"], |
| | file_type=d["file_type"], |
| | page_count=d.get("page_count", 0), |
| | chunk_count=len(d.get("chunks", [])), |
| | text_length=len(d.get("raw_text", "")), |
| | status=d["status"], |
| | indexed=d.get("indexed", False), |
| | indexed_chunks=d.get("indexed_chunks", 0), |
| | processing_time=d.get("processing_time"), |
| | created_at=d["created_at"], |
| | updated_at=d.get("updated_at"), |
| | ), |
| | chunks=[ChunkInfo(**c) for c in d.get("chunks", [])], |
| | ocr_regions=[OCRRegionInfo(**r) for r in d.get("ocr_regions", []) if isinstance(r, dict)], |
| | layout_regions=[LayoutRegionInfo(**r) for r in d.get("layout_regions", []) if isinstance(r, dict)], |
| | ) |
| |
|
| |
|
| | @router.get("/{doc_id}/chunks", response_model=ChunksResponse) |
| | async def get_document_chunks( |
| | doc_id: str, |
| | page: Optional[int] = Query(None, description="Filter by page number"), |
| | chunk_type: Optional[str] = Query(None, description="Filter by chunk type"), |
| | ): |
| | """Get all chunks for a document.""" |
| | if doc_id not in _documents: |
| | raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
| |
|
| | d = _documents[doc_id] |
| | chunks = d.get("chunks", []) |
| |
|
| | |
| | if page is not None: |
| | chunks = [c for c in chunks if c.get("page_num") == page] |
| | if chunk_type: |
| | chunks = [c for c in chunks if c.get("chunk_type") == chunk_type] |
| |
|
| | return ChunksResponse( |
| | doc_id=doc_id, |
| | total_chunks=len(chunks), |
| | chunks=[ChunkInfo(**c) for c in chunks], |
| | ) |
| |
|
| |
|
| | @router.post("/{doc_id}/process") |
| | async def process_document( |
| | doc_id: str, |
| | background_tasks: BackgroundTasks, |
| | force: bool = Query(False, description="Force reprocessing"), |
| | ): |
| | """Trigger document processing.""" |
| | if doc_id not in _documents: |
| | raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
| |
|
| | d = _documents[doc_id] |
| |
|
| | if d["status"] == DocumentStatus.PROCESSING: |
| | raise HTTPException(status_code=400, detail="Document is already being processed") |
| |
|
| | if d["status"] == DocumentStatus.COMPLETED and not force: |
| | raise HTTPException( |
| | status_code=400, |
| | detail="Document already processed. Use force=true to reprocess." |
| | ) |
| |
|
| | file_path = Path(d["file_path"]) |
| | if not file_path.exists(): |
| | raise HTTPException(status_code=404, detail="Document file not found") |
| |
|
| | background_tasks.add_task(process_document_task, doc_id, file_path, d["file_type"]) |
| | _documents[doc_id]["status"] = DocumentStatus.PROCESSING |
| |
|
| | return {"doc_id": doc_id, "status": "processing", "message": "Processing started"} |
| |
|
| |
|
| | @router.delete("/{doc_id}") |
| | async def delete_document(doc_id: str): |
| | """Delete a document.""" |
| | if doc_id not in _documents: |
| | raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
| |
|
| | d = _documents[doc_id] |
| |
|
| | |
| | file_path = Path(d["file_path"]) |
| | if file_path.exists(): |
| | file_path.unlink() |
| |
|
| | |
| | del _documents[doc_id] |
| |
|
| | return {"doc_id": doc_id, "status": "deleted", "message": "Document deleted successfully"} |
| |
|
| |
|
| | @router.post("/{doc_id}/index", response_model=IndexResponse) |
| | async def index_document(doc_id: str, force_reindex: bool = Query(False)): |
| | """Index a document to the RAG vector store.""" |
| | if doc_id not in _documents: |
| | raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
| |
|
| | d = _documents[doc_id] |
| |
|
| | if d["status"] != DocumentStatus.COMPLETED: |
| | raise HTTPException( |
| | status_code=400, |
| | detail=f"Document not ready for indexing. Current status: {d['status']}" |
| | ) |
| |
|
| | if d.get("indexed") and not force_reindex: |
| | return IndexResponse( |
| | doc_id=doc_id, |
| | status="already_indexed", |
| | chunks_indexed=d.get("indexed_chunks", 0), |
| | message="Document already indexed. Use force_reindex=true to reindex." |
| | ) |
| |
|
| | try: |
| | |
| | from src.rag.indexer import DocumentIndexer |
| | from src.rag.embeddings import get_embedding_model |
| | from src.rag.store import get_vector_store |
| |
|
| | embeddings = get_embedding_model() |
| | store = get_vector_store() |
| | indexer = DocumentIndexer(embeddings, store) |
| |
|
| | |
| | chunks_to_index = d.get("chunks", []) |
| | indexed_count = 0 |
| |
|
| | for chunk in chunks_to_index: |
| | try: |
| | indexer.index_chunk( |
| | text=chunk["text"], |
| | document_id=doc_id, |
| | chunk_id=chunk["chunk_id"], |
| | metadata={ |
| | "filename": d["filename"], |
| | "page_num": chunk.get("page_num"), |
| | "chunk_type": chunk.get("chunk_type", "text"), |
| | } |
| | ) |
| | indexed_count += 1 |
| | except Exception as e: |
| | logger.warning(f"Failed to index chunk {chunk['chunk_id']}: {e}") |
| |
|
| | _documents[doc_id]["indexed"] = True |
| | _documents[doc_id]["indexed_chunks"] = indexed_count |
| | _documents[doc_id]["status"] = DocumentStatus.INDEXED |
| |
|
| | return IndexResponse( |
| | doc_id=doc_id, |
| | status="indexed", |
| | chunks_indexed=indexed_count, |
| | message=f"Successfully indexed {indexed_count} chunks" |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"Indexing failed for {doc_id}: {e}") |
| | raise HTTPException(status_code=500, detail=f"Indexing failed: {str(e)}") |
| |
|
| |
|
| | @router.post("/batch-index", response_model=BatchIndexResponse) |
| | async def batch_index_documents(request: BatchIndexRequest): |
| | """Batch index multiple documents.""" |
| | results = [] |
| | successful = 0 |
| | failed = 0 |
| |
|
| | for doc_id in request.doc_ids: |
| | try: |
| | result = await index_document(doc_id, request.force_reindex) |
| | results.append(result) |
| | if result.status in ["indexed", "already_indexed"]: |
| | successful += 1 |
| | else: |
| | failed += 1 |
| | except HTTPException as e: |
| | results.append(IndexResponse( |
| | doc_id=doc_id, |
| | status="error", |
| | chunks_indexed=0, |
| | message=e.detail |
| | )) |
| | failed += 1 |
| |
|
| | return BatchIndexResponse( |
| | total_requested=len(request.doc_ids), |
| | successful=successful, |
| | failed=failed, |
| | results=results |
| | ) |
| |
|
| |
|
| | |
| | def get_document_store(): |
| | """Get the in-memory document store.""" |
| | return _documents |
| |
|