"""Folders API — full nested folder support (real file manager).""" from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy.orm import Session from app.database import get_db from app.auth import get_current_user from app.models import User, Folder, Document from typing import Optional, List import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/folders", tags=["folders"]) class CreateFolderRequest(BaseModel): name: str parent_id: Optional[str] = None # None = root level class RenameFolderRequest(BaseModel): name: str class MoveDocumentRequest(BaseModel): document_id: str class MoveFolderRequest(BaseModel): parent_id: Optional[str] = None # None = move to root # ── Create ──────────────────────────────────────────────────────────────────── @router.post("/", status_code=status.HTTP_201_CREATED) async def create_folder( request: CreateFolderRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Create a folder. Pass parent_id to create a subfolder.""" # Validate parent exists and belongs to same org if request.parent_id: parent = db.query(Folder).filter( Folder.id == request.parent_id, Folder.org_id == current_user.org_id, ).first() if not parent: raise HTTPException(status_code=404, detail="Parent folder not found") # Duplicate name check within same parent existing = db.query(Folder).filter( Folder.org_id == current_user.org_id, Folder.name == request.name.strip(), Folder.parent_id == request.parent_id, ).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"A folder named '{request.name}' already exists here", ) folder = Folder( name=request.name.strip(), org_id=current_user.org_id, parent_id=request.parent_id, ) db.add(folder) db.commit() db.refresh(folder) return _folder_response(folder, db) # ── List ────────────────────────────────────────────────────────────────────── @router.get("/") async def list_folders( parent_id: Optional[str] = None, # None = root level; pass id for children db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ List folders. - GET /folders/ → root-level folders - GET /folders/?parent_id=X → subfolders of X """ query = db.query(Folder).filter( Folder.org_id == current_user.org_id, Folder.parent_id == parent_id, ).order_by(Folder.created_at.desc()) return [_folder_response(f, db) for f in query.all()] @router.get("/tree") async def folder_tree( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return the full folder tree (all folders with nesting info).""" all_folders = db.query(Folder).filter( Folder.org_id == current_user.org_id ).order_by(Folder.created_at.desc()).all() def build_tree(parent_id: Optional[str]) -> List[dict]: nodes = [f for f in all_folders if f.parent_id == parent_id] return [ { **_folder_response(f, db), "children": build_tree(f.id), } for f in nodes ] return build_tree(None) # ── Get single ──────────────────────────────────────────────────────────────── @router.get("/{folder_id}") async def get_folder( folder_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get folder details + its direct documents + subfolders.""" folder = _get_or_404(folder_id, current_user.org_id, db) subfolders = db.query(Folder).filter( Folder.parent_id == folder_id, Folder.org_id == current_user.org_id, ).order_by(Folder.created_at.desc()).all() documents = db.query(Document).filter( Document.folder_id == folder_id, Document.org_id == current_user.org_id, ).order_by(Document.created_at.desc()).all() # Build breadcrumb path path = _build_path(folder, db) return { **_folder_response(folder, db), "path": path, "subfolders": [_folder_response(f, db) for f in subfolders], "documents": [_doc_response(d) for d in documents], } # ── Rename ──────────────────────────────────────────────────────────────────── @router.patch("/{folder_id}") async def rename_folder( folder_id: str, request: RenameFolderRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): folder = _get_or_404(folder_id, current_user.org_id, db) # Duplicate check within same parent existing = db.query(Folder).filter( Folder.org_id == current_user.org_id, Folder.name == request.name.strip(), Folder.parent_id == folder.parent_id, Folder.id != folder_id, ).first() if existing: raise HTTPException(400, detail=f"A folder named '{request.name}' already exists here") folder.name = request.name.strip() db.commit() db.refresh(folder) return _folder_response(folder, db) # ── Move folder ─────────────────────────────────────────────────────────────── @router.patch("/{folder_id}/move") async def move_folder( folder_id: str, request: MoveFolderRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Move a folder to a different parent (or root).""" folder = _get_or_404(folder_id, current_user.org_id, db) # Prevent moving into own descendant if request.parent_id: if request.parent_id == folder_id: raise HTTPException(400, detail="Cannot move a folder into itself") if _is_descendant(folder_id, request.parent_id, db): raise HTTPException(400, detail="Cannot move a folder into its own subfolder") parent = db.query(Folder).filter( Folder.id == request.parent_id, Folder.org_id == current_user.org_id, ).first() if not parent: raise HTTPException(404, detail="Target folder not found") folder.parent_id = request.parent_id db.commit() db.refresh(folder) return _folder_response(folder, db) # ── Move document into folder ───────────────────────────────────────────────── @router.post("/{folder_id}/move-document") async def move_document_to_folder( folder_id: str, request: MoveDocumentRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): _get_or_404(folder_id, current_user.org_id, db) doc = db.query(Document).filter( Document.id == request.document_id, Document.org_id == current_user.org_id, ).first() if not doc: raise HTTPException(404, detail="Document not found") doc.folder_id = folder_id db.commit() return {"message": "Document moved", "document_id": request.document_id, "folder_id": folder_id} # ── Delete ──────────────────────────────────────────────────────────────────── @router.delete("/{folder_id}") async def delete_folder( folder_id: str, force: bool = False, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Delete a folder. - force=false (default): fails if folder has documents or subfolders - force=true: recursively deletes all subfolders and moves documents to root """ folder = _get_or_404(folder_id, current_user.org_id, db) if force: _recursive_delete(folder_id, current_user.org_id, db) else: doc_count = db.query(Document).filter(Document.folder_id == folder_id).count() sub_count = db.query(Folder).filter(Folder.parent_id == folder_id).count() if doc_count > 0 or sub_count > 0: raise HTTPException( 400, detail=f"Folder is not empty ({doc_count} files, {sub_count} subfolders). Use ?force=true." ) db.delete(folder) db.commit() return {"message": "Folder deleted"} # ── Helpers ─────────────────────────────────────────────────────────────────── def _get_or_404(folder_id: str, org_id: str, db: Session) -> Folder: f = db.query(Folder).filter(Folder.id == folder_id, Folder.org_id == org_id).first() if not f: raise HTTPException(404, detail="Folder not found") return f def _folder_response(folder: Folder, db: Session) -> dict: doc_count = db.query(Document).filter(Document.folder_id == folder.id).count() sub_count = db.query(Folder).filter(Folder.parent_id == folder.id).count() return { "id": folder.id, "name": folder.name, "parent_id": folder.parent_id, "document_count": doc_count, "subfolder_count": sub_count, "created_at": folder.created_at.isoformat(), } def _doc_response(doc: Document) -> dict: return { "id": doc.id, "name": doc.name, "size": doc.size, "chunks": doc.chunks, "status": doc.status, "folder_id": doc.folder_id, "created_at": doc.created_at.isoformat(), } def _build_path(folder: Folder, db: Session) -> List[dict]: """Build breadcrumb path from root to this folder.""" path = [] current = folder while current: path.insert(0, {"id": current.id, "name": current.name}) if current.parent_id: current = db.query(Folder).filter(Folder.id == current.parent_id).first() else: break return path def _is_descendant(ancestor_id: str, candidate_id: str, db: Session) -> bool: """Check if candidate_id is a descendant of ancestor_id.""" current = db.query(Folder).filter(Folder.id == candidate_id).first() while current and current.parent_id: if current.parent_id == ancestor_id: return True current = db.query(Folder).filter(Folder.id == current.parent_id).first() return False def _recursive_delete(folder_id: str, org_id: str, db: Session) -> None: """Recursively delete folder and all subfolders; move docs to root.""" # Move documents to root db.query(Document).filter(Document.folder_id == folder_id).update({"folder_id": None}) # Recurse into subfolders children = db.query(Folder).filter(Folder.parent_id == folder_id, Folder.org_id == org_id).all() for child in children: _recursive_delete(child.id, org_id, db) # Delete this folder folder = db.query(Folder).filter(Folder.id == folder_id).first() if folder: db.delete(folder) db.commit()