File size: 5,479 Bytes
bef5e76 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | """Document management API endpoints."""
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.postgres.connection import get_db
from src.document.document_service import document_service
from src.knowledge.processing_service import knowledge_processor
from src.storage.az_blob.az_blob import blob_storage
from src.middlewares.logging import get_logger, log_execution
from src.middlewares.rate_limit import limiter
from pydantic import BaseModel
from typing import List
logger = get_logger("document_api")
router = APIRouter(prefix="/api/v1", tags=["Documents"])
class DocumentResponse(BaseModel):
id: str
filename: str
status: str
file_size: int
file_type: str
created_at: str
@router.get("/documents/{user_id}", response_model=List[DocumentResponse])
@log_execution(logger)
async def list_documents(
user_id: str,
db: AsyncSession = Depends(get_db)
):
"""List all documents for a user."""
documents = await document_service.get_user_documents(db, user_id)
return [
DocumentResponse(
id=doc.id,
filename=doc.filename,
status=doc.status,
file_size=doc.file_size or 0,
file_type=doc.file_type,
created_at=doc.created_at.isoformat()
)
for doc in documents
]
@router.post("/document/upload")
@limiter.limit("10/minute")
@log_execution(logger)
async def upload_document(
request: Request,
file: UploadFile = File(...),
user_id: str = None,
db: AsyncSession = Depends(get_db)
):
"""Upload a document."""
if not user_id:
raise HTTPException(
status_code=400,
detail="user_id is required"
)
try:
# Read file content
content = await file.read()
file_size = len(content)
# Get file type
filename = file.filename
file_type = filename.split('.')[-1].lower() if '.' in filename else 'txt'
if file_type not in ['pdf', 'docx', 'txt']:
raise HTTPException(
status_code=400,
detail="Unsupported file type. Supported: pdf, docx, txt"
)
# Upload to blob storage
blob_name = await blob_storage.upload_file(content, filename, user_id)
# Create document record
document = await document_service.create_document(
db=db,
user_id=user_id,
filename=filename,
blob_name=blob_name,
file_size=file_size,
file_type=file_type
)
return {
"status": "success",
"message": "Document uploaded successfully",
"data": {
"id": document.id,
"filename": document.filename,
"status": document.status
}
}
except Exception as e:
logger.error(f"Upload failed for user {user_id}", error=str(e))
raise HTTPException(
status_code=500,
detail=f"Upload failed: {str(e)}"
)
@router.delete("/document/delete")
@log_execution(logger)
async def delete_document(
document_id: str,
user_id: str,
db: AsyncSession = Depends(get_db)
):
"""Delete a document."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(
status_code=404,
detail="Document not found"
)
if document.user_id != user_id:
raise HTTPException(
status_code=403,
detail="Access denied"
)
success = await document_service.delete_document(db, document_id)
if success:
return {"status": "success", "message": "Document deleted successfully"}
else:
raise HTTPException(
status_code=500,
detail="Failed to delete document"
)
@router.post("/document/process")
@log_execution(logger)
async def process_document(
document_id: str,
user_id: str,
db: AsyncSession = Depends(get_db)
):
"""Process document and ingest to vector index."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(
status_code=404,
detail="Document not found"
)
if document.user_id != user_id:
raise HTTPException(
status_code=403,
detail="Access denied"
)
try:
# Update status to processing
await document_service.update_document_status(db, document_id, "processing")
# Process document
chunks_count = await knowledge_processor.process_document(document, db)
# Update status to completed
await document_service.update_document_status(db, document_id, "completed")
return {
"status": "success",
"message": "Document processed successfully",
"data": {
"document_id": document_id,
"chunks_processed": chunks_count
}
}
except Exception as e:
logger.error(f"Processing failed for document {document_id}", error=str(e))
await document_service.update_document_status(
db, document_id, "failed", str(e)
)
raise HTTPException(
status_code=500,
detail=f"Processing failed: {str(e)}"
)
|