| """Document management API endpoints.""" |
|
|
| from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from src.db.postgres.connection import get_db |
| from src.document.document_service import document_service |
| from src.knowledge.processing_service import knowledge_processor |
| from src.storage.az_blob.az_blob import blob_storage |
| from src.middlewares.logging import get_logger, log_execution |
| from src.middlewares.rate_limit import limiter |
| from pydantic import BaseModel |
| from typing import List |
|
|
| logger = get_logger("document_api") |
|
|
| router = APIRouter(prefix="/api/v1", tags=["Documents"]) |
|
|
|
|
| class DocumentResponse(BaseModel): |
| id: str |
| filename: str |
| status: str |
| file_size: int |
| file_type: str |
| created_at: str |
|
|
|
|
| @router.get("/documents/{user_id}", response_model=List[DocumentResponse]) |
| @log_execution(logger) |
| async def list_documents( |
| user_id: str, |
| db: AsyncSession = Depends(get_db) |
| ): |
| """List all documents for a user.""" |
| documents = await document_service.get_user_documents(db, user_id) |
| return [ |
| DocumentResponse( |
| id=doc.id, |
| filename=doc.filename, |
| status=doc.status, |
| file_size=doc.file_size or 0, |
| file_type=doc.file_type, |
| created_at=doc.created_at.isoformat() |
| ) |
| for doc in documents |
| ] |
|
|
|
|
| @router.post("/document/upload") |
| @limiter.limit("10/minute") |
| @log_execution(logger) |
| async def upload_document( |
| request: Request, |
| file: UploadFile = File(...), |
| user_id: str = None, |
| db: AsyncSession = Depends(get_db) |
| ): |
| """Upload a document.""" |
| if not user_id: |
| raise HTTPException( |
| status_code=400, |
| detail="user_id is required" |
| ) |
|
|
| try: |
| |
| content = await file.read() |
| file_size = len(content) |
|
|
| |
| filename = file.filename |
| file_type = filename.split('.')[-1].lower() if '.' in filename else 'txt' |
|
|
| if file_type not in ['pdf', 'docx', 'txt']: |
| raise HTTPException( |
| status_code=400, |
| detail="Unsupported file type. Supported: pdf, docx, txt" |
| ) |
|
|
| |
| blob_name = await blob_storage.upload_file(content, filename, user_id) |
|
|
| |
| document = await document_service.create_document( |
| db=db, |
| user_id=user_id, |
| filename=filename, |
| blob_name=blob_name, |
| file_size=file_size, |
| file_type=file_type |
| ) |
|
|
| return { |
| "status": "success", |
| "message": "Document uploaded successfully", |
| "data": { |
| "id": document.id, |
| "filename": document.filename, |
| "status": document.status |
| } |
| } |
|
|
| except Exception as e: |
| logger.error(f"Upload failed for user {user_id}", error=str(e)) |
| raise HTTPException( |
| status_code=500, |
| detail=f"Upload failed: {str(e)}" |
| ) |
|
|
|
|
| @router.delete("/document/delete") |
| @log_execution(logger) |
| async def delete_document( |
| document_id: str, |
| user_id: str, |
| db: AsyncSession = Depends(get_db) |
| ): |
| """Delete a document.""" |
| document = await document_service.get_document(db, document_id) |
|
|
| if not document: |
| raise HTTPException( |
| status_code=404, |
| detail="Document not found" |
| ) |
|
|
| if document.user_id != user_id: |
| raise HTTPException( |
| status_code=403, |
| detail="Access denied" |
| ) |
|
|
| success = await document_service.delete_document(db, document_id) |
|
|
| if success: |
| return {"status": "success", "message": "Document deleted successfully"} |
| else: |
| raise HTTPException( |
| status_code=500, |
| detail="Failed to delete document" |
| ) |
|
|
|
|
| @router.post("/document/process") |
| @log_execution(logger) |
| async def process_document( |
| document_id: str, |
| user_id: str, |
| db: AsyncSession = Depends(get_db) |
| ): |
| """Process document and ingest to vector index.""" |
| document = await document_service.get_document(db, document_id) |
|
|
| if not document: |
| raise HTTPException( |
| status_code=404, |
| detail="Document not found" |
| ) |
|
|
| if document.user_id != user_id: |
| raise HTTPException( |
| status_code=403, |
| detail="Access denied" |
| ) |
|
|
| try: |
| |
| await document_service.update_document_status(db, document_id, "processing") |
|
|
| |
| chunks_count = await knowledge_processor.process_document(document, db) |
|
|
| |
| await document_service.update_document_status(db, document_id, "completed") |
|
|
| return { |
| "status": "success", |
| "message": "Document processed successfully", |
| "data": { |
| "document_id": document_id, |
| "chunks_processed": chunks_count |
| } |
| } |
|
|
| except Exception as e: |
| logger.error(f"Processing failed for document {document_id}", error=str(e)) |
| await document_service.update_document_status( |
| db, document_id, "failed", str(e) |
| ) |
| raise HTTPException( |
| status_code=500, |
| detail=f"Processing failed: {str(e)}" |
| ) |
|
|