ishaq101's picture
[NOTICKET] Demo agentic agent
bef5e76
"""Document management API endpoints."""
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.postgres.connection import get_db
from src.document.document_service import document_service
from src.knowledge.processing_service import knowledge_processor
from src.storage.az_blob.az_blob import blob_storage
from src.middlewares.logging import get_logger, log_execution
from src.middlewares.rate_limit import limiter
from pydantic import BaseModel
from typing import List
logger = get_logger("document_api")
router = APIRouter(prefix="/api/v1", tags=["Documents"])
class DocumentResponse(BaseModel):
id: str
filename: str
status: str
file_size: int
file_type: str
created_at: str
@router.get("/documents/{user_id}", response_model=List[DocumentResponse])
@log_execution(logger)
async def list_documents(
user_id: str,
db: AsyncSession = Depends(get_db)
):
"""List all documents for a user."""
documents = await document_service.get_user_documents(db, user_id)
return [
DocumentResponse(
id=doc.id,
filename=doc.filename,
status=doc.status,
file_size=doc.file_size or 0,
file_type=doc.file_type,
created_at=doc.created_at.isoformat()
)
for doc in documents
]
@router.post("/document/upload")
@limiter.limit("10/minute")
@log_execution(logger)
async def upload_document(
request: Request,
file: UploadFile = File(...),
user_id: str = None,
db: AsyncSession = Depends(get_db)
):
"""Upload a document."""
if not user_id:
raise HTTPException(
status_code=400,
detail="user_id is required"
)
try:
# Read file content
content = await file.read()
file_size = len(content)
# Get file type
filename = file.filename
file_type = filename.split('.')[-1].lower() if '.' in filename else 'txt'
if file_type not in ['pdf', 'docx', 'txt']:
raise HTTPException(
status_code=400,
detail="Unsupported file type. Supported: pdf, docx, txt"
)
# Upload to blob storage
blob_name = await blob_storage.upload_file(content, filename, user_id)
# Create document record
document = await document_service.create_document(
db=db,
user_id=user_id,
filename=filename,
blob_name=blob_name,
file_size=file_size,
file_type=file_type
)
return {
"status": "success",
"message": "Document uploaded successfully",
"data": {
"id": document.id,
"filename": document.filename,
"status": document.status
}
}
except Exception as e:
logger.error(f"Upload failed for user {user_id}", error=str(e))
raise HTTPException(
status_code=500,
detail=f"Upload failed: {str(e)}"
)
@router.delete("/document/delete")
@log_execution(logger)
async def delete_document(
document_id: str,
user_id: str,
db: AsyncSession = Depends(get_db)
):
"""Delete a document."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(
status_code=404,
detail="Document not found"
)
if document.user_id != user_id:
raise HTTPException(
status_code=403,
detail="Access denied"
)
success = await document_service.delete_document(db, document_id)
if success:
return {"status": "success", "message": "Document deleted successfully"}
else:
raise HTTPException(
status_code=500,
detail="Failed to delete document"
)
@router.post("/document/process")
@log_execution(logger)
async def process_document(
document_id: str,
user_id: str,
db: AsyncSession = Depends(get_db)
):
"""Process document and ingest to vector index."""
document = await document_service.get_document(db, document_id)
if not document:
raise HTTPException(
status_code=404,
detail="Document not found"
)
if document.user_id != user_id:
raise HTTPException(
status_code=403,
detail="Access denied"
)
try:
# Update status to processing
await document_service.update_document_status(db, document_id, "processing")
# Process document
chunks_count = await knowledge_processor.process_document(document, db)
# Update status to completed
await document_service.update_document_status(db, document_id, "completed")
return {
"status": "success",
"message": "Document processed successfully",
"data": {
"document_id": document_id,
"chunks_processed": chunks_count
}
}
except Exception as e:
logger.error(f"Processing failed for document {document_id}", error=str(e))
await document_service.update_document_status(
db, document_id, "failed", str(e)
)
raise HTTPException(
status_code=500,
detail=f"Processing failed: {str(e)}"
)