""" Document Management Endpoints Universal document upload and management for all entities """ from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Request from sqlalchemy.orm import Session from typing import List, Optional from uuid import UUID from app.api.deps import get_db, get_current_active_user from app.models.user import User from app.models.document import Document from app.models.user_document_link import UserDocumentLink from app.schemas.document import ( DocumentResponse, DocumentListResponse, DocumentUpdateRequest, UploaderInfo ) from app.services.media_service import StorageService from app.services.audit_service import AuditService from app.core.permissions import require_permission, require_any_permission from datetime import datetime import logging import json logger = logging.getLogger(__name__) router = APIRouter(prefix="/documents", tags=["Document Management"]) # ============================================ # SPECIFIC ROUTES (Must come before generic parameterized routes) # ============================================ # Current user document endpoints @router.get("/users/me", response_model=DocumentListResponse) async def get_my_documents( document_type: Optional[str] = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Get current user's documents""" query = db.query(Document).filter( Document.entity_type == "user", Document.entity_id == current_user.id, Document.deleted_at == None ) if document_type: query = query.filter(Document.document_type == document_type) documents = query.order_by(Document.created_at.desc()).all() # Enrich with uploader info and generate fresh signed URLs for Supabase files from app.integrations.supabase import SupabaseStorageService response_docs = [] for doc in documents: doc_response = DocumentResponse.from_orm(doc) # Generate fresh signed URL for Supabase files if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): bucket = doc.additional_metadata.get('bucket') path = doc.additional_metadata.get('path') if bucket and path: doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) if doc.uploaded_by_user_id: uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() if uploader: doc_response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) response_docs.append(doc_response) return DocumentListResponse( total=len(response_docs), documents=response_docs ) @router.get("/users/me/profile-photo", response_model=Optional[DocumentResponse]) async def get_my_profile_photo( current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Get current user's profile photo""" doc_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == current_user.id, UserDocumentLink.document_link_type == 'profile_photo' ).first() if not doc_link: return None document = db.query(Document).filter( Document.id == doc_link.document_id, Document.deleted_at == None ).first() if not document: return None response = DocumentResponse.from_orm(document) # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) if document.uploaded_by_user_id: uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() if uploader: response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) return response # ============================================ # UNIVERSAL DOCUMENT ENDPOINTS # ============================================ @router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_201_CREATED) @require_permission("upload_documents") async def upload_document( file: UploadFile = File(...), entity_type: str = Form(...), entity_id: str = Form(...), document_type: str = Form(...), document_category: Optional[str] = Form(None), description: Optional[str] = Form(None), tags: Optional[str] = Form("[]"), # JSON string of array is_public: bool = Form(False), force_provider: Optional[str] = Form(None), # Optional: 'cloudinary' or 'supabase' request: Request = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Universal document upload endpoint with smart fallback Default routing: - Images/videos → Cloudinary (with Supabase fallback) - Documents → Supabase Storage Features: - Automatic fallback if primary provider fails - Optional provider override via force_provider parameter - Supports all entity types: user, project, ticket, client, contractor, etc. Parameters: - force_provider: Optional override ('cloudinary' or 'supabase') """ try: # Parse tags from JSON string tags_list = json.loads(tags) if tags else [] # Validate file if not file.filename: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No file provided" ) # Validate force_provider if provided if force_provider and force_provider not in ['cloudinary', 'supabase']: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="force_provider must be 'cloudinary' or 'supabase'" ) # Upload file document = await StorageService.upload_file( file=file, entity_type=entity_type, entity_id=UUID(entity_id), document_type=document_type, document_category=document_category, description=description, tags=tags_list, is_public=is_public, uploaded_by_user_id=current_user.id, db=db, force_provider=force_provider ) # Create user_document_link for user documents if entity_type == 'user': # Check if link already exists for this document type existing_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == entity_id, UserDocumentLink.document_link_type == document_type ).first() if existing_link: # Update existing link to point to new document existing_link.document_id = document.id existing_link.updated_at = datetime.utcnow().isoformat() else: # Create new link doc_link = UserDocumentLink( user_id=entity_id, document_id=document.id, document_link_type=document_type, notes=description ) db.add(doc_link) db.commit() logger.info(f"Created user_document_link for {document_type}") # Audit log AuditService.log_action( db=db, action='create', entity_type='document', entity_id=str(document.id), description=f"Uploaded document: {file.filename} for {entity_type}:{entity_id}", user=current_user, request=request, additional_metadata={ 'file_name': file.filename, 'file_type': file.content_type, 'storage_provider': document.storage_provider } ) # Get uploader info uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() response = DocumentResponse.from_orm(document) if uploader: response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) return response except HTTPException: raise except Exception as e: logger.error(f"Upload failed: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Upload failed: {str(e)}" ) @router.get("/{entity_type}/{entity_id}", response_model=DocumentListResponse) @require_permission("view_documents") async def get_entity_documents( entity_type: str, entity_id: UUID, document_type: Optional[str] = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get all documents for a specific entity Optional filtering by document_type """ query = db.query(Document).filter( Document.entity_type == entity_type, Document.entity_id == entity_id, Document.deleted_at == None ) if document_type: query = query.filter(Document.document_type == document_type) documents = query.order_by(Document.created_at.desc()).all() # Enrich with uploader info and generate fresh signed URLs for Supabase files from app.integrations.supabase import SupabaseStorageService response_docs = [] for doc in documents: doc_response = DocumentResponse.from_orm(doc) # Generate fresh signed URL for Supabase files if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): bucket = doc.additional_metadata.get('bucket') path = doc.additional_metadata.get('path') if bucket and path: doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) if doc.uploaded_by_user_id: uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() if uploader: doc_response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) response_docs.append(doc_response) return DocumentListResponse( total=len(response_docs), documents=response_docs ) @router.get("/id/{document_id}", response_model=DocumentResponse) async def get_document( document_id: UUID, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Get single document by ID with fresh signed URL for Supabase files""" document = db.query(Document).filter( Document.id == document_id, Document.deleted_at == None ).first() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document not found" ) response = DocumentResponse.from_orm(document) # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: # Generate signed URL valid for 1 hour response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) # Add uploader info if document.uploaded_by_user_id: uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() if uploader: response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) return response @router.put("/id/{document_id}", response_model=DocumentResponse) async def update_document_metadata( document_id: UUID, data: DocumentUpdateRequest, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Update document metadata (not the file itself)""" document = db.query(Document).filter( Document.id == document_id, Document.deleted_at == None ).first() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document not found" ) # Update fields for field, value in data.dict(exclude_unset=True).items(): setattr(document, field, value) from datetime import datetime, timezone document.updated_at = datetime.now(timezone.utc).isoformat() db.commit() db.refresh(document) # Audit log AuditService.log_action( db=db, action='update', entity_type='document', entity_id=str(document.id), description=f"Updated document metadata: {document.file_name}", user=current_user, request=request ) return DocumentResponse.from_orm(document) @router.delete("/id/{document_id}", status_code=status.HTTP_204_NO_CONTENT) @require_permission("upload_documents") # Same permission as upload (can manage own documents) async def delete_document( document_id: UUID, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Delete document (soft delete in DB, actual file deletion from storage)""" document = db.query(Document).filter( Document.id == document_id, Document.deleted_at == None ).first() if not document: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document not found" ) # Delete file success = StorageService.delete_file(document, db) if not success: logger.warning(f"Failed to delete file from storage: {document.file_url}") # Audit log AuditService.log_action( db=db, action='delete', entity_type='document', entity_id=str(document.id), description=f"Deleted document: {document.file_name}", user=current_user, request=request ) # ============================================ # CONVENIENCE ENDPOINTS (Shortcuts) # ============================================ # Get document document = db.query(Document).filter( Document.id == doc_link.document_id, Document.deleted_at == None ).first() if not document: return None response = DocumentResponse.from_orm(document) # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) # Add uploader info if document.uploaded_by_user_id: uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() if uploader: response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) return response @router.get("/users/{user_id}", response_model=DocumentListResponse) async def get_user_documents( user_id: UUID, document_type: Optional[str] = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get another user's documents Requires permission to view the user (same org or shared projects) """ # Check if user exists and current user has permission target_user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Permission check (same logic as GET /users/{user_id}) has_permission = False if current_user.role == 'platform_admin': has_permission = True elif current_user.id == user_id: has_permission = True elif current_user.role in ['client_admin', 'contractor_admin']: if current_user.client_id and current_user.client_id == target_user.client_id: has_permission = True if current_user.contractor_id and current_user.contractor_id == target_user.contractor_id: has_permission = True elif current_user.role in ['project_manager', 'sales_manager', 'dispatcher']: if current_user.client_id and current_user.client_id == target_user.client_id: has_permission = True if current_user.contractor_id and current_user.contractor_id == target_user.contractor_id: has_permission = True if not has_permission: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to view this user's documents" ) # Get documents query = db.query(Document).filter( Document.entity_type == "user", Document.entity_id == user_id, Document.deleted_at == None ) if document_type: query = query.filter(Document.document_type == document_type) documents = query.order_by(Document.created_at.desc()).all() # Enrich with uploader info and generate fresh signed URLs from app.integrations.supabase import SupabaseStorageService response_docs = [] for doc in documents: doc_response = DocumentResponse.from_orm(doc) # Generate fresh signed URL for Supabase files if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): bucket = doc.additional_metadata.get('bucket') path = doc.additional_metadata.get('path') if bucket and path: doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) if doc.uploaded_by_user_id: uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() if uploader: doc_response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) response_docs.append(doc_response) return DocumentListResponse( total=len(response_docs), documents=response_docs ) @router.get("/users/{user_id}/profile-photo", response_model=Optional[DocumentResponse]) async def get_user_profile_photo( user_id: UUID, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get user's profile photo (convenience endpoint) Uses user_document_links for fast lookup """ # Get profile photo link doc_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == user_id, UserDocumentLink.document_link_type == 'profile_photo' ).first() if not doc_link: return None # Get document document = db.query(Document).filter( Document.id == doc_link.document_id, Document.deleted_at == None ).first() if not document: return None response = DocumentResponse.from_orm(document) # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) # Add uploader info if document.uploaded_by_user_id: uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() if uploader: response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) return response # ============================================ # DOCUMENT VERSIONING ENDPOINTS # ============================================ @router.get("/{entity_type}/{entity_id}/{document_type}/versions", response_model=List[DocumentResponse]) async def get_document_versions( entity_type: str, entity_id: UUID, document_type: str, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get all versions of a document Returns documents ordered by version (newest first) """ from app.services.media_service import StorageService documents = StorageService.get_document_versions( entity_type=entity_type, entity_id=entity_id, document_type=document_type, db=db ) # Enrich with uploader info and generate fresh signed URLs from app.integrations.supabase import SupabaseStorageService response_docs = [] for doc in documents: doc_response = DocumentResponse.from_orm(doc) # Generate fresh signed URL for Supabase files if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): bucket = doc.additional_metadata.get('bucket') path = doc.additional_metadata.get('path') if bucket and path: doc_response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) if doc.uploaded_by_user_id: uploader = db.query(User).filter(User.id == doc.uploaded_by_user_id).first() if uploader: doc_response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) response_docs.append(doc_response) return response_docs @router.get("/{entity_type}/{entity_id}/{document_type}/latest", response_model=Optional[DocumentResponse]) async def get_latest_document( entity_type: str, entity_id: UUID, document_type: str, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get latest version of a document """ from app.services.media_service import StorageService document = StorageService.get_latest_document( entity_type=entity_type, entity_id=entity_id, document_type=document_type, db=db ) if not document: return None response = DocumentResponse.from_orm(document) # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: response.file_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) # Add uploader info if document.uploaded_by_user_id: uploader = db.query(User).filter(User.id == document.uploaded_by_user_id).first() if uploader: response.uploader = UploaderInfo( id=uploader.id, name=uploader.name, email=uploader.email ) return response # ============================================ # PROFILE PHOTO SHORTCUTS # ============================================ @router.post("/users/me/profile-photo", response_model=DocumentResponse, status_code=status.HTTP_201_CREATED) async def upload_my_profile_photo( file: UploadFile = File(...), request: Request = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Upload or update current user's profile photo Automatically creates new version and marks old photo as outdated. Profile photos are stored in Cloudinary for optimization. """ # Validate file is an image if not file.content_type.startswith('image/'): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Profile photo must be an image" ) # Upload with versioning enabled (force Cloudinary for profile photos) document = await StorageService.upload_file( file=file, entity_type="user", entity_id=current_user.id, document_type="profile_photo", document_category="profile", description="User profile photo", tags=["profile", "photo"], is_public=True, uploaded_by_user_id=current_user.id, db=db, force_provider="cloudinary", # Force Cloudinary for profile photos enable_versioning=True ) # Update user_document_link from app.models.user_document_link import UserDocumentLink existing_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == current_user.id, UserDocumentLink.document_link_type == 'profile_photo' ).first() if existing_link: existing_link.document_id = document.id existing_link.updated_at = datetime.utcnow().isoformat() else: doc_link = UserDocumentLink( user_id=current_user.id, document_id=document.id, document_link_type='profile_photo', notes="Profile photo" ) db.add(doc_link) db.commit() # Audit log AuditService.log_action( db=db, action='upload', entity_type='document', entity_id=str(document.id), description=f"Profile photo uploaded for user {current_user.id}", user=current_user, request=request, additional_metadata={ 'document_type': 'profile_photo', 'version': document.version } ) # Get uploader info response = DocumentResponse.from_orm(document) response.uploader = UploaderInfo( id=current_user.id, name=current_user.name, email=current_user.email ) return response @router.post("/users/{user_id}/profile-photo", response_model=DocumentResponse, status_code=status.HTTP_201_CREATED) async def upload_profile_photo( user_id: UUID, file: UploadFile = File(...), request: Request = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Upload or update user profile photo (admin/manager) Automatically creates new version and marks old photo as outdated. Profile photos are stored in Cloudinary for optimization. """ # Validate file is an image if not file.content_type.startswith('image/'): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Profile photo must be an image" ) # Upload with versioning enabled (force Cloudinary for profile photos) document = await StorageService.upload_file( file=file, entity_type="user", entity_id=user_id, document_type="profile_photo", document_category="profile", description="User profile photo", tags=["profile", "photo"], is_public=True, uploaded_by_user_id=current_user.id, db=db, force_provider="cloudinary", # Force Cloudinary for profile photos enable_versioning=True ) # Update user_document_link from app.models.user_document_link import UserDocumentLink existing_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == user_id, UserDocumentLink.document_link_type == 'profile_photo' ).first() if existing_link: existing_link.document_id = document.id existing_link.updated_at = datetime.utcnow().isoformat() else: doc_link = UserDocumentLink( user_id=user_id, document_id=document.id, document_link_type='profile_photo', notes="Profile photo" ) db.add(doc_link) db.commit() # Audit log AuditService.log_action( db=db, action='upload', entity_type='document', entity_id=str(document.id), description=f"Profile photo uploaded for user {user_id}", user=current_user, request=request, additional_metadata={ 'document_type': 'profile_photo', 'version': document.version } ) # Get uploader info response = DocumentResponse.from_orm(document) response.uploader = UploaderInfo( id=current_user.id, name=current_user.name, email=current_user.email ) return response @router.get("/users/me/profile-photo/versions", response_model=List[DocumentResponse]) async def get_my_profile_photo_versions( current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get all versions of current user's profile photo """ return await get_document_versions( entity_type="user", entity_id=current_user.id, document_type="profile_photo", current_user=current_user, db=db ) @router.get("/users/{user_id}/profile-photo/versions", response_model=List[DocumentResponse]) async def get_profile_photo_versions( user_id: UUID, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get all versions of user's profile photo """ return await get_document_versions( entity_type="user", entity_id=user_id, document_type="profile_photo", current_user=current_user, db=db )