kamau1's picture
Enhance profile endpoint to return full user data in one call with optional include parameters
620792e
"""
Document Management Endpoints
Universal document upload and management for all entities
"""
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Request
from sqlalchemy.orm import Session
from typing import List, Optional
from uuid import UUID
from app.api.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.document import Document
from app.models.user_document_link import UserDocumentLink
from app.schemas.document import (
DocumentResponse,
DocumentListResponse,
DocumentUpdateRequest,
UploaderInfo
)
from app.services.media_service import StorageService
from app.services.audit_service import AuditService
from app.core.permissions import require_permission, require_any_permission
from datetime import datetime
import logging
import json
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/documents", tags=["Document Management"])
# ============================================
# SPECIFIC ROUTES (Must come before generic parameterized routes)
# ============================================
# Current user document endpoints
@router.get("/users/me", response_model=DocumentListResponse)
async def get_my_documents(
document_type: Optional[str] = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get current user's documents"""
query = db.query(Document).filter(
Document.entity_type == "user",
Document.entity_id == current_user.id,
Document.deleted_at == None
)
if document_type:
query = query.filter(Document.document_type == document_type)
documents = query.order_by(Document.created_at.desc()).all()
# Enrich with uploader info and generate fresh signed URLs for Supabase files
from app.integrations.supabase import SupabaseStorageService
response_docs = []
for doc in documents:
doc_response = DocumentResponse.from_orm(doc)
# Generate fresh signed URL for Supabase files
if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'):
bucket = doc.additional_metadata.get('bucket')
path = doc.additional_metadata.get('path')
if bucket and path:
doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
if doc.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first()
if uploader:
doc_response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
response_docs.append(doc_response)
return DocumentListResponse(
total=len(response_docs),
documents=response_docs
)
@router.get("/users/me/profile-photo", response_model=Optional[DocumentResponse])
async def get_my_profile_photo(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get current user's profile photo"""
doc_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == current_user.id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if not doc_link:
return None
document = db.query(Document).filter(
Document.id == doc_link.document_id,
Document.deleted_at == None
).first()
if not document:
return None
response = DocumentResponse.from_orm(document)
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
if document.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first()
if uploader:
response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
return response
# ============================================
# UNIVERSAL DOCUMENT ENDPOINTS
# ============================================
@router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_201_CREATED)
@require_permission("upload_documents")
async def upload_document(
file: UploadFile = File(...),
entity_type: str = Form(...),
entity_id: str = Form(...),
document_type: str = Form(...),
document_category: Optional[str] = Form(None),
description: Optional[str] = Form(None),
tags: Optional[str] = Form("[]"), # JSON string of array
is_public: bool = Form(False),
force_provider: Optional[str] = Form(None), # Optional: 'cloudinary' or 'supabase'
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Universal document upload endpoint with smart fallback
Default routing:
- Images/videos → Cloudinary (with Supabase fallback)
- Documents → Supabase Storage
Features:
- Automatic fallback if primary provider fails
- Optional provider override via force_provider parameter
- Supports all entity types: user, project, ticket, client, contractor, etc.
Parameters:
- force_provider: Optional override ('cloudinary' or 'supabase')
"""
try:
# Parse tags from JSON string
tags_list = json.loads(tags) if tags else []
# Validate file
if not file.filename:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No file provided"
)
# Validate force_provider if provided
if force_provider and force_provider not in ['cloudinary', 'supabase']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="force_provider must be 'cloudinary' or 'supabase'"
)
# Upload file
document = await StorageService.upload_file(
file=file,
entity_type=entity_type,
entity_id=UUID(entity_id),
document_type=document_type,
document_category=document_category,
description=description,
tags=tags_list,
is_public=is_public,
uploaded_by_user_id=current_user.id,
db=db,
force_provider=force_provider
)
# Create user_document_link for user documents
if entity_type == 'user':
# Check if link already exists for this document type
existing_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == entity_id,
UserDocumentLink.document_link_type == document_type
).first()
if existing_link:
# Update existing link to point to new document
existing_link.document_id = document.id
existing_link.updated_at = datetime.utcnow().isoformat()
else:
# Create new link
doc_link = UserDocumentLink(
user_id=entity_id,
document_id=document.id,
document_link_type=document_type,
notes=description
)
db.add(doc_link)
db.commit()
logger.info(f"Created user_document_link for {document_type}")
# Audit log
AuditService.log_action(
db=db,
action='create',
entity_type='document',
entity_id=str(document.id),
description=f"Uploaded document: {file.filename} for {entity_type}:{entity_id}",
user=current_user,
request=request,
additional_metadata={
'file_name': file.filename,
'file_type': file.content_type,
'storage_provider': document.storage_provider
}
)
# Get uploader info
uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first()
response = DocumentResponse.from_orm(document)
if uploader:
response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Upload failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Upload failed: {str(e)}"
)
@router.get("/{entity_type}/{entity_id}", response_model=DocumentListResponse)
@require_permission("view_documents")
async def get_entity_documents(
entity_type: str,
entity_id: UUID,
document_type: Optional[str] = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get all documents for a specific entity
Optional filtering by document_type
"""
query = db.query(Document).filter(
Document.entity_type == entity_type,
Document.entity_id == entity_id,
Document.deleted_at == None
)
if document_type:
query = query.filter(Document.document_type == document_type)
documents = query.order_by(Document.created_at.desc()).all()
# Enrich with uploader info and generate fresh signed URLs for Supabase files
from app.integrations.supabase import SupabaseStorageService
response_docs = []
for doc in documents:
doc_response = DocumentResponse.from_orm(doc)
# Generate fresh signed URL for Supabase files
if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'):
bucket = doc.additional_metadata.get('bucket')
path = doc.additional_metadata.get('path')
if bucket and path:
doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
if doc.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first()
if uploader:
doc_response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
response_docs.append(doc_response)
return DocumentListResponse(
total=len(response_docs),
documents=response_docs
)
@router.get("/id/{document_id}", response_model=DocumentResponse)
async def get_document(
document_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get single document by ID with fresh signed URL for Supabase files"""
document = db.query(Document).filter(
Document.id == document_id,
Document.deleted_at == None
).first()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
response = DocumentResponse.from_orm(document)
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
# Generate signed URL valid for 1 hour
response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
# Add uploader info
if document.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first()
if uploader:
response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
return response
@router.put("/id/{document_id}", response_model=DocumentResponse)
async def update_document_metadata(
document_id: UUID,
data: DocumentUpdateRequest,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update document metadata (not the file itself)"""
document = db.query(Document).filter(
Document.id == document_id,
Document.deleted_at == None
).first()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# Update fields
for field, value in data.dict(exclude_unset=True).items():
setattr(document, field, value)
from datetime import datetime, timezone
document.updated_at = datetime.now(timezone.utc).isoformat()
db.commit()
db.refresh(document)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='document',
entity_id=str(document.id),
description=f"Updated document metadata: {document.file_name}",
user=current_user,
request=request
)
return DocumentResponse.from_orm(document)
@router.delete("/id/{document_id}", status_code=status.HTTP_204_NO_CONTENT)
@require_permission("upload_documents") # Same permission as upload (can manage own documents)
async def delete_document(
document_id: UUID,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Delete document (soft delete in DB, actual file deletion from storage)"""
document = db.query(Document).filter(
Document.id == document_id,
Document.deleted_at == None
).first()
if not document:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# Delete file
success = StorageService.delete_file(document, db)
if not success:
logger.warning(f"Failed to delete file from storage: {document.file_url}")
# Audit log
AuditService.log_action(
db=db,
action='delete',
entity_type='document',
entity_id=str(document.id),
description=f"Deleted document: {document.file_name}",
user=current_user,
request=request
)
# ============================================
# CONVENIENCE ENDPOINTS (Shortcuts)
# ============================================
# Get document
document = db.query(Document).filter(
Document.id == doc_link.document_id,
Document.deleted_at == None
).first()
if not document:
return None
response = DocumentResponse.from_orm(document)
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
# Add uploader info
if document.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first()
if uploader:
response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
return response
@router.get("/users/{user_id}", response_model=DocumentListResponse)
async def get_user_documents(
user_id: UUID,
document_type: Optional[str] = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get another user's documents
Requires permission to view the user (same org or shared projects)
"""
# Check if user exists and current user has permission
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Permission check (same logic as GET /users/{user_id})
has_permission = False
if current_user.role == 'platform_admin':
has_permission = True
elif current_user.id == user_id:
has_permission = True
elif current_user.role in ['client_admin', 'contractor_admin']:
if current_user.client_id and current_user.client_id == target_user.client_id:
has_permission = True
if current_user.contractor_id and current_user.contractor_id == target_user.contractor_id:
has_permission = True
elif current_user.role in ['project_manager', 'sales_manager', 'dispatcher']:
if current_user.client_id and current_user.client_id == target_user.client_id:
has_permission = True
if current_user.contractor_id and current_user.contractor_id == target_user.contractor_id:
has_permission = True
if not has_permission:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this user's documents"
)
# Get documents
query = db.query(Document).filter(
Document.entity_type == "user",
Document.entity_id == user_id,
Document.deleted_at == None
)
if document_type:
query = query.filter(Document.document_type == document_type)
documents = query.order_by(Document.created_at.desc()).all()
# Enrich with uploader info and generate fresh signed URLs
from app.integrations.supabase import SupabaseStorageService
response_docs = []
for doc in documents:
doc_response = DocumentResponse.from_orm(doc)
# Generate fresh signed URL for Supabase files
if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'):
bucket = doc.additional_metadata.get('bucket')
path = doc.additional_metadata.get('path')
if bucket and path:
doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
if doc.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first()
if uploader:
doc_response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
response_docs.append(doc_response)
return DocumentListResponse(
total=len(response_docs),
documents=response_docs
)
@router.get("/users/{user_id}/profile-photo", response_model=Optional[DocumentResponse])
async def get_user_profile_photo(
user_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get user's profile photo (convenience endpoint)
Uses user_document_links for fast lookup
"""
# Get profile photo link
doc_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == user_id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if not doc_link:
return None
# Get document
document = db.query(Document).filter(
Document.id == doc_link.document_id,
Document.deleted_at == None
).first()
if not document:
return None
response = DocumentResponse.from_orm(document)
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
# Add uploader info
if document.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first()
if uploader:
response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
return response
# ============================================
# DOCUMENT VERSIONING ENDPOINTS
# ============================================
@router.get("/{entity_type}/{entity_id}/{document_type}/versions", response_model=List[DocumentResponse])
async def get_document_versions(
entity_type: str,
entity_id: UUID,
document_type: str,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get all versions of a document
Returns documents ordered by version (newest first)
"""
from app.services.media_service import StorageService
documents = StorageService.get_document_versions(
entity_type=entity_type,
entity_id=entity_id,
document_type=document_type,
db=db
)
# Enrich with uploader info and generate fresh signed URLs
from app.integrations.supabase import SupabaseStorageService
response_docs = []
for doc in documents:
doc_response = DocumentResponse.from_orm(doc)
# Generate fresh signed URL for Supabase files
if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'):
bucket = doc.additional_metadata.get('bucket')
path = doc.additional_metadata.get('path')
if bucket and path:
doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
if doc.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first()
if uploader:
doc_response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
response_docs.append(doc_response)
return response_docs
@router.get("/{entity_type}/{entity_id}/{document_type}/latest", response_model=Optional[DocumentResponse])
async def get_latest_document(
entity_type: str,
entity_id: UUID,
document_type: str,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get latest version of a document
"""
from app.services.media_service import StorageService
document = StorageService.get_latest_document(
entity_type=entity_type,
entity_id=entity_id,
document_type=document_type,
db=db
)
if not document:
return None
response = DocumentResponse.from_orm(document)
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
# Add uploader info
if document.uploaded_by_user_id:
uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first()
if uploader:
response.uploader = UploaderInfo(
id=uploader.id,
name=uploader.name,
email=uploader.email
)
return response
# ============================================
# PROFILE PHOTO SHORTCUTS
# ============================================
@router.post("/users/me/profile-photo", response_model=DocumentResponse, status_code=status.HTTP_201_CREATED)
async def upload_my_profile_photo(
file: UploadFile = File(...),
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Upload or update current user's profile photo
Automatically creates new version and marks old photo as outdated.
Profile photos are stored in Cloudinary for optimization.
"""
# Validate file is an image
if not file.content_type.startswith('image/'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Profile photo must be an image"
)
# Upload with versioning enabled (force Cloudinary for profile photos)
document = await StorageService.upload_file(
file=file,
entity_type="user",
entity_id=current_user.id,
document_type="profile_photo",
document_category="profile",
description="User profile photo",
tags=["profile", "photo"],
is_public=True,
uploaded_by_user_id=current_user.id,
db=db,
force_provider="cloudinary", # Force Cloudinary for profile photos
enable_versioning=True
)
# Update user_document_link
from app.models.user_document_link import UserDocumentLink
existing_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == current_user.id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if existing_link:
existing_link.document_id = document.id
existing_link.updated_at = datetime.utcnow().isoformat()
else:
doc_link = UserDocumentLink(
user_id=current_user.id,
document_id=document.id,
document_link_type='profile_photo',
notes="Profile photo"
)
db.add(doc_link)
db.commit()
# Audit log
AuditService.log_action(
db=db,
action='upload',
entity_type='document',
entity_id=str(document.id),
description=f"Profile photo uploaded for user {current_user.id}",
user=current_user,
request=request,
additional_metadata={
'document_type': 'profile_photo',
'version': document.version
}
)
# Get uploader info
response = DocumentResponse.from_orm(document)
response.uploader = UploaderInfo(
id=current_user.id,
name=current_user.name,
email=current_user.email
)
return response
@router.post("/users/{user_id}/profile-photo", response_model=DocumentResponse, status_code=status.HTTP_201_CREATED)
async def upload_profile_photo(
user_id: UUID,
file: UploadFile = File(...),
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Upload or update user profile photo (admin/manager)
Automatically creates new version and marks old photo as outdated.
Profile photos are stored in Cloudinary for optimization.
"""
# Validate file is an image
if not file.content_type.startswith('image/'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Profile photo must be an image"
)
# Upload with versioning enabled (force Cloudinary for profile photos)
document = await StorageService.upload_file(
file=file,
entity_type="user",
entity_id=user_id,
document_type="profile_photo",
document_category="profile",
description="User profile photo",
tags=["profile", "photo"],
is_public=True,
uploaded_by_user_id=current_user.id,
db=db,
force_provider="cloudinary", # Force Cloudinary for profile photos
enable_versioning=True
)
# Update user_document_link
from app.models.user_document_link import UserDocumentLink
existing_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == user_id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if existing_link:
existing_link.document_id = document.id
existing_link.updated_at = datetime.utcnow().isoformat()
else:
doc_link = UserDocumentLink(
user_id=user_id,
document_id=document.id,
document_link_type='profile_photo',
notes="Profile photo"
)
db.add(doc_link)
db.commit()
# Audit log
AuditService.log_action(
db=db,
action='upload',
entity_type='document',
entity_id=str(document.id),
description=f"Profile photo uploaded for user {user_id}",
user=current_user,
request=request,
additional_metadata={
'document_type': 'profile_photo',
'version': document.version
}
)
# Get uploader info
response = DocumentResponse.from_orm(document)
response.uploader = UploaderInfo(
id=current_user.id,
name=current_user.name,
email=current_user.email
)
return response
@router.get("/users/me/profile-photo/versions", response_model=List[DocumentResponse])
async def get_my_profile_photo_versions(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get all versions of current user's profile photo
"""
return await get_document_versions(
entity_type="user",
entity_id=current_user.id,
document_type="profile_photo",
current_user=current_user,
db=db
)
@router.get("/users/{user_id}/profile-photo/versions", response_model=List[DocumentResponse])
async def get_profile_photo_versions(
user_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get all versions of user's profile photo
"""
return await get_document_versions(
entity_type="user",
entity_id=user_id,
document_type="profile_photo",
current_user=current_user,
db=db
)