Spaces:
Sleeping
Sleeping
| """ | |
| Document Management Endpoints | |
| Universal document upload and management for all entities | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Request | |
| from sqlalchemy.orm import Session | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from app.api.deps import get_db, get_current_active_user | |
| from app.models.user import User | |
| from app.models.document import Document | |
| from app.models.user_document_link import UserDocumentLink | |
| from app.schemas.document import ( | |
| DocumentResponse, | |
| DocumentListResponse, | |
| DocumentUpdateRequest, | |
| UploaderInfo | |
| ) | |
| from app.services.media_service import StorageService | |
| from app.services.audit_service import AuditService | |
| from app.core.permissions import require_permission, require_any_permission | |
| from datetime import datetime | |
| import logging | |
| import json | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/documents", tags=["Document Management"]) | |
| # ============================================ | |
| # SPECIFIC ROUTES (Must come before generic parameterized routes) | |
| # ============================================ | |
| # Current user document endpoints | |
| async def get_my_documents( | |
| document_type: Optional[str] = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get current user's documents""" | |
| query = db.query(Document).filter( | |
| Document.entity_type == "user", | |
| Document.entity_id == current_user.id, | |
| Document.deleted_at == None | |
| ) | |
| if document_type: | |
| query = query.filter(Document.document_type == document_type) | |
| documents = query.order_by(Document.created_at.desc()).all() | |
| # Enrich with uploader info and generate fresh signed URLs for Supabase files | |
| from app.integrations.supabase import SupabaseStorageService | |
| response_docs = [] | |
| for doc in documents: | |
| doc_response = DocumentResponse.from_orm(doc) | |
| # Generate fresh signed URL for Supabase files | |
| if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): | |
| bucket = doc.additional_metadata.get('bucket') | |
| path = doc.additional_metadata.get('path') | |
| if bucket and path: | |
| doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| if doc.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() | |
| if uploader: | |
| doc_response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| response_docs.append(doc_response) | |
| return DocumentListResponse( | |
| total=len(response_docs), | |
| documents=response_docs | |
| ) | |
| async def get_my_profile_photo( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get current user's profile photo""" | |
| doc_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == current_user.id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if not doc_link: | |
| return None | |
| document = db.query(Document).filter( | |
| Document.id == doc_link.document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if not document: | |
| return None | |
| response = DocumentResponse.from_orm(document) | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| if document.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() | |
| if uploader: | |
| response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| return response | |
| # ============================================ | |
| # UNIVERSAL DOCUMENT ENDPOINTS | |
| # ============================================ | |
| async def upload_document( | |
| file: UploadFile = File(...), | |
| entity_type: str = Form(...), | |
| entity_id: str = Form(...), | |
| document_type: str = Form(...), | |
| document_category: Optional[str] = Form(None), | |
| description: Optional[str] = Form(None), | |
| tags: Optional[str] = Form("[]"), # JSON string of array | |
| is_public: bool = Form(False), | |
| force_provider: Optional[str] = Form(None), # Optional: 'cloudinary' or 'supabase' | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Universal document upload endpoint with smart fallback | |
| Default routing: | |
| - Images/videos → Cloudinary (with Supabase fallback) | |
| - Documents → Supabase Storage | |
| Features: | |
| - Automatic fallback if primary provider fails | |
| - Optional provider override via force_provider parameter | |
| - Supports all entity types: user, project, ticket, client, contractor, etc. | |
| Parameters: | |
| - force_provider: Optional override ('cloudinary' or 'supabase') | |
| """ | |
| try: | |
| # Parse tags from JSON string | |
| tags_list = json.loads(tags) if tags else [] | |
| # Validate file | |
| if not file.filename: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="No file provided" | |
| ) | |
| # Validate force_provider if provided | |
| if force_provider and force_provider not in ['cloudinary', 'supabase']: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="force_provider must be 'cloudinary' or 'supabase'" | |
| ) | |
| # Upload file | |
| document = await StorageService.upload_file( | |
| file=file, | |
| entity_type=entity_type, | |
| entity_id=UUID(entity_id), | |
| document_type=document_type, | |
| document_category=document_category, | |
| description=description, | |
| tags=tags_list, | |
| is_public=is_public, | |
| uploaded_by_user_id=current_user.id, | |
| db=db, | |
| force_provider=force_provider | |
| ) | |
| # Create user_document_link for user documents | |
| if entity_type == 'user': | |
| # Check if link already exists for this document type | |
| existing_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == entity_id, | |
| UserDocumentLink.document_link_type == document_type | |
| ).first() | |
| if existing_link: | |
| # Update existing link to point to new document | |
| existing_link.document_id = document.id | |
| existing_link.updated_at = datetime.utcnow().isoformat() | |
| else: | |
| # Create new link | |
| doc_link = UserDocumentLink( | |
| user_id=entity_id, | |
| document_id=document.id, | |
| document_link_type=document_type, | |
| notes=description | |
| ) | |
| db.add(doc_link) | |
| db.commit() | |
| logger.info(f"Created user_document_link for {document_type}") | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='create', | |
| entity_type='document', | |
| entity_id=str(document.id), | |
| description=f"Uploaded document: {file.filename} for {entity_type}:{entity_id}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'file_name': file.filename, | |
| 'file_type': file.content_type, | |
| 'storage_provider': document.storage_provider | |
| } | |
| ) | |
| # Get uploader info | |
| uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() | |
| response = DocumentResponse.from_orm(document) | |
| if uploader: | |
| response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Upload failed: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Upload failed: {str(e)}" | |
| ) | |
| async def get_entity_documents( | |
| entity_type: str, | |
| entity_id: UUID, | |
| document_type: Optional[str] = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all documents for a specific entity | |
| Optional filtering by document_type | |
| """ | |
| query = db.query(Document).filter( | |
| Document.entity_type == entity_type, | |
| Document.entity_id == entity_id, | |
| Document.deleted_at == None | |
| ) | |
| if document_type: | |
| query = query.filter(Document.document_type == document_type) | |
| documents = query.order_by(Document.created_at.desc()).all() | |
| # Enrich with uploader info and generate fresh signed URLs for Supabase files | |
| from app.integrations.supabase import SupabaseStorageService | |
| response_docs = [] | |
| for doc in documents: | |
| doc_response = DocumentResponse.from_orm(doc) | |
| # Generate fresh signed URL for Supabase files | |
| if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): | |
| bucket = doc.additional_metadata.get('bucket') | |
| path = doc.additional_metadata.get('path') | |
| if bucket and path: | |
| doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| if doc.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() | |
| if uploader: | |
| doc_response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| response_docs.append(doc_response) | |
| return DocumentListResponse( | |
| total=len(response_docs), | |
| documents=response_docs | |
| ) | |
| async def get_document( | |
| document_id: UUID, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get single document by ID with fresh signed URL for Supabase files""" | |
| document = db.query(Document).filter( | |
| Document.id == document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if not document: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Document not found" | |
| ) | |
| response = DocumentResponse.from_orm(document) | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| # Generate signed URL valid for 1 hour | |
| response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| # Add uploader info | |
| if document.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() | |
| if uploader: | |
| response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| return response | |
| async def update_document_metadata( | |
| document_id: UUID, | |
| data: DocumentUpdateRequest, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update document metadata (not the file itself)""" | |
| document = db.query(Document).filter( | |
| Document.id == document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if not document: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Document not found" | |
| ) | |
| # Update fields | |
| for field, value in data.dict(exclude_unset=True).items(): | |
| setattr(document, field, value) | |
| from datetime import datetime, timezone | |
| document.updated_at = datetime.now(timezone.utc).isoformat() | |
| db.commit() | |
| db.refresh(document) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='document', | |
| entity_id=str(document.id), | |
| description=f"Updated document metadata: {document.file_name}", | |
| user=current_user, | |
| request=request | |
| ) | |
| return DocumentResponse.from_orm(document) | |
| # Same permission as upload (can manage own documents) | |
| async def delete_document( | |
| document_id: UUID, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Delete document (soft delete in DB, actual file deletion from storage)""" | |
| document = db.query(Document).filter( | |
| Document.id == document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if not document: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Document not found" | |
| ) | |
| # Delete file | |
| success = StorageService.delete_file(document, db) | |
| if not success: | |
| logger.warning(f"Failed to delete file from storage: {document.file_url}") | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='delete', | |
| entity_type='document', | |
| entity_id=str(document.id), | |
| description=f"Deleted document: {document.file_name}", | |
| user=current_user, | |
| request=request | |
| ) | |
| # ============================================ | |
| # CONVENIENCE ENDPOINTS (Shortcuts) | |
| # ============================================ | |
| # Get document | |
| document = db.query(Document).filter( | |
| Document.id == doc_link.document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if not document: | |
| return None | |
| response = DocumentResponse.from_orm(document) | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| # Add uploader info | |
| if document.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() | |
| if uploader: | |
| response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| return response | |
| async def get_user_documents( | |
| user_id: UUID, | |
| document_type: Optional[str] = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get another user's documents | |
| Requires permission to view the user (same org or shared projects) | |
| """ | |
| # Check if user exists and current user has permission | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Permission check (same logic as GET /users/{user_id}) | |
| has_permission = False | |
| if current_user.role == 'platform_admin': | |
| has_permission = True | |
| elif current_user.id == user_id: | |
| has_permission = True | |
| elif current_user.role in ['client_admin', 'contractor_admin']: | |
| if current_user.client_id and current_user.client_id == target_user.client_id: | |
| has_permission = True | |
| if current_user.contractor_id and current_user.contractor_id == target_user.contractor_id: | |
| has_permission = True | |
| elif current_user.role in ['project_manager', 'sales_manager', 'dispatcher']: | |
| if current_user.client_id and current_user.client_id == target_user.client_id: | |
| has_permission = True | |
| if current_user.contractor_id and current_user.contractor_id == target_user.contractor_id: | |
| has_permission = True | |
| if not has_permission: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to view this user's documents" | |
| ) | |
| # Get documents | |
| query = db.query(Document).filter( | |
| Document.entity_type == "user", | |
| Document.entity_id == user_id, | |
| Document.deleted_at == None | |
| ) | |
| if document_type: | |
| query = query.filter(Document.document_type == document_type) | |
| documents = query.order_by(Document.created_at.desc()).all() | |
| # Enrich with uploader info and generate fresh signed URLs | |
| from app.integrations.supabase import SupabaseStorageService | |
| response_docs = [] | |
| for doc in documents: | |
| doc_response = DocumentResponse.from_orm(doc) | |
| # Generate fresh signed URL for Supabase files | |
| if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): | |
| bucket = doc.additional_metadata.get('bucket') | |
| path = doc.additional_metadata.get('path') | |
| if bucket and path: | |
| doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| if doc.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() | |
| if uploader: | |
| doc_response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| response_docs.append(doc_response) | |
| return DocumentListResponse( | |
| total=len(response_docs), | |
| documents=response_docs | |
| ) | |
| async def get_user_profile_photo( | |
| user_id: UUID, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get user's profile photo (convenience endpoint) | |
| Uses user_document_links for fast lookup | |
| """ | |
| # Get profile photo link | |
| doc_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == user_id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if not doc_link: | |
| return None | |
| # Get document | |
| document = db.query(Document).filter( | |
| Document.id == doc_link.document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if not document: | |
| return None | |
| response = DocumentResponse.from_orm(document) | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| # Add uploader info | |
| if document.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() | |
| if uploader: | |
| response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| return response | |
| # ============================================ | |
| # DOCUMENT VERSIONING ENDPOINTS | |
| # ============================================ | |
| async def get_document_versions( | |
| entity_type: str, | |
| entity_id: UUID, | |
| document_type: str, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all versions of a document | |
| Returns documents ordered by version (newest first) | |
| """ | |
| from app.services.media_service import StorageService | |
| documents = StorageService.get_document_versions( | |
| entity_type=entity_type, | |
| entity_id=entity_id, | |
| document_type=document_type, | |
| db=db | |
| ) | |
| # Enrich with uploader info and generate fresh signed URLs | |
| from app.integrations.supabase import SupabaseStorageService | |
| response_docs = [] | |
| for doc in documents: | |
| doc_response = DocumentResponse.from_orm(doc) | |
| # Generate fresh signed URL for Supabase files | |
| if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): | |
| bucket = doc.additional_metadata.get('bucket') | |
| path = doc.additional_metadata.get('path') | |
| if bucket and path: | |
| doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| if doc.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() | |
| if uploader: | |
| doc_response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| response_docs.append(doc_response) | |
| return response_docs | |
| async def get_latest_document( | |
| entity_type: str, | |
| entity_id: UUID, | |
| document_type: str, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get latest version of a document | |
| """ | |
| from app.services.media_service import StorageService | |
| document = StorageService.get_latest_document( | |
| entity_type=entity_type, | |
| entity_id=entity_id, | |
| document_type=document_type, | |
| db=db | |
| ) | |
| if not document: | |
| return None | |
| response = DocumentResponse.from_orm(document) | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| # Add uploader info | |
| if document.uploaded_by_user_id: | |
| uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() | |
| if uploader: | |
| response.uploader = UploaderInfo( | |
| id=uploader.id, | |
| name=uploader.name, | |
| email=uploader.email | |
| ) | |
| return response | |
| # ============================================ | |
| # PROFILE PHOTO SHORTCUTS | |
| # ============================================ | |
| async def upload_my_profile_photo( | |
| file: UploadFile = File(...), | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Upload or update current user's profile photo | |
| Automatically creates new version and marks old photo as outdated. | |
| Profile photos are stored in Cloudinary for optimization. | |
| """ | |
| # Validate file is an image | |
| if not file.content_type.startswith('image/'): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Profile photo must be an image" | |
| ) | |
| # Upload with versioning enabled (force Cloudinary for profile photos) | |
| document = await StorageService.upload_file( | |
| file=file, | |
| entity_type="user", | |
| entity_id=current_user.id, | |
| document_type="profile_photo", | |
| document_category="profile", | |
| description="User profile photo", | |
| tags=["profile", "photo"], | |
| is_public=True, | |
| uploaded_by_user_id=current_user.id, | |
| db=db, | |
| force_provider="cloudinary", # Force Cloudinary for profile photos | |
| enable_versioning=True | |
| ) | |
| # Update user_document_link | |
| from app.models.user_document_link import UserDocumentLink | |
| existing_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == current_user.id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if existing_link: | |
| existing_link.document_id = document.id | |
| existing_link.updated_at = datetime.utcnow().isoformat() | |
| else: | |
| doc_link = UserDocumentLink( | |
| user_id=current_user.id, | |
| document_id=document.id, | |
| document_link_type='profile_photo', | |
| notes="Profile photo" | |
| ) | |
| db.add(doc_link) | |
| db.commit() | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='upload', | |
| entity_type='document', | |
| entity_id=str(document.id), | |
| description=f"Profile photo uploaded for user {current_user.id}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'document_type': 'profile_photo', | |
| 'version': document.version | |
| } | |
| ) | |
| # Get uploader info | |
| response = DocumentResponse.from_orm(document) | |
| response.uploader = UploaderInfo( | |
| id=current_user.id, | |
| name=current_user.name, | |
| email=current_user.email | |
| ) | |
| return response | |
| async def upload_profile_photo( | |
| user_id: UUID, | |
| file: UploadFile = File(...), | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Upload or update user profile photo (admin/manager) | |
| Automatically creates new version and marks old photo as outdated. | |
| Profile photos are stored in Cloudinary for optimization. | |
| """ | |
| # Validate file is an image | |
| if not file.content_type.startswith('image/'): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Profile photo must be an image" | |
| ) | |
| # Upload with versioning enabled (force Cloudinary for profile photos) | |
| document = await StorageService.upload_file( | |
| file=file, | |
| entity_type="user", | |
| entity_id=user_id, | |
| document_type="profile_photo", | |
| document_category="profile", | |
| description="User profile photo", | |
| tags=["profile", "photo"], | |
| is_public=True, | |
| uploaded_by_user_id=current_user.id, | |
| db=db, | |
| force_provider="cloudinary", # Force Cloudinary for profile photos | |
| enable_versioning=True | |
| ) | |
| # Update user_document_link | |
| from app.models.user_document_link import UserDocumentLink | |
| existing_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == user_id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if existing_link: | |
| existing_link.document_id = document.id | |
| existing_link.updated_at = datetime.utcnow().isoformat() | |
| else: | |
| doc_link = UserDocumentLink( | |
| user_id=user_id, | |
| document_id=document.id, | |
| document_link_type='profile_photo', | |
| notes="Profile photo" | |
| ) | |
| db.add(doc_link) | |
| db.commit() | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='upload', | |
| entity_type='document', | |
| entity_id=str(document.id), | |
| description=f"Profile photo uploaded for user {user_id}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'document_type': 'profile_photo', | |
| 'version': document.version | |
| } | |
| ) | |
| # Get uploader info | |
| response = DocumentResponse.from_orm(document) | |
| response.uploader = UploaderInfo( | |
| id=current_user.id, | |
| name=current_user.name, | |
| email=current_user.email | |
| ) | |
| return response | |
| async def get_my_profile_photo_versions( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all versions of current user's profile photo | |
| """ | |
| return await get_document_versions( | |
| entity_type="user", | |
| entity_id=current_user.id, | |
| document_type="profile_photo", | |
| current_user=current_user, | |
| db=db | |
| ) | |
| async def get_profile_photo_versions( | |
| user_id: UUID, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get all versions of user's profile photo | |
| """ | |
| return await get_document_versions( | |
| entity_type="user", | |
| entity_id=user_id, | |
| document_type="profile_photo", | |
| current_user=current_user, | |
| db=db | |
| ) | |