Spaces:
Sleeping
Sleeping
| """Folders API β full nested folder support (real file manager).""" | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from pydantic import BaseModel | |
| from sqlalchemy.orm import Session | |
| from app.database import get_db | |
| from app.auth import get_current_user | |
| from app.models import User, Folder, Document | |
| from typing import Optional, List | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/folders", tags=["folders"]) | |
| class CreateFolderRequest(BaseModel): | |
| name: str | |
| parent_id: Optional[str] = None # None = root level | |
| class RenameFolderRequest(BaseModel): | |
| name: str | |
| class MoveDocumentRequest(BaseModel): | |
| document_id: str | |
| class MoveFolderRequest(BaseModel): | |
| parent_id: Optional[str] = None # None = move to root | |
| # ββ Create ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def create_folder( | |
| request: CreateFolderRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| """Create a folder. Pass parent_id to create a subfolder.""" | |
| # Validate parent exists and belongs to same org | |
| if request.parent_id: | |
| parent = db.query(Folder).filter( | |
| Folder.id == request.parent_id, | |
| Folder.org_id == current_user.org_id, | |
| ).first() | |
| if not parent: | |
| raise HTTPException(status_code=404, detail="Parent folder not found") | |
| # Duplicate name check within same parent | |
| existing = db.query(Folder).filter( | |
| Folder.org_id == current_user.org_id, | |
| Folder.name == request.name.strip(), | |
| Folder.parent_id == request.parent_id, | |
| ).first() | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"A folder named '{request.name}' already exists here", | |
| ) | |
| folder = Folder( | |
| name=request.name.strip(), | |
| org_id=current_user.org_id, | |
| parent_id=request.parent_id, | |
| ) | |
| db.add(folder) | |
| db.commit() | |
| db.refresh(folder) | |
| return _folder_response(folder, db) | |
| # ββ List ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_folders( | |
| parent_id: Optional[str] = None, # None = root level; pass id for children | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| """ | |
| List folders. | |
| - GET /folders/ β root-level folders | |
| - GET /folders/?parent_id=X β subfolders of X | |
| """ | |
| query = db.query(Folder).filter( | |
| Folder.org_id == current_user.org_id, | |
| Folder.parent_id == parent_id, | |
| ).order_by(Folder.created_at.desc()) | |
| return [_folder_response(f, db) for f in query.all()] | |
| async def folder_tree( | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| """Return the full folder tree (all folders with nesting info).""" | |
| all_folders = db.query(Folder).filter( | |
| Folder.org_id == current_user.org_id | |
| ).order_by(Folder.created_at.desc()).all() | |
| def build_tree(parent_id: Optional[str]) -> List[dict]: | |
| nodes = [f for f in all_folders if f.parent_id == parent_id] | |
| return [ | |
| { | |
| **_folder_response(f, db), | |
| "children": build_tree(f.id), | |
| } | |
| for f in nodes | |
| ] | |
| return build_tree(None) | |
| # ββ Get single ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_folder( | |
| folder_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| """Get folder details + its direct documents + subfolders.""" | |
| folder = _get_or_404(folder_id, current_user.org_id, db) | |
| subfolders = db.query(Folder).filter( | |
| Folder.parent_id == folder_id, | |
| Folder.org_id == current_user.org_id, | |
| ).order_by(Folder.created_at.desc()).all() | |
| documents = db.query(Document).filter( | |
| Document.folder_id == folder_id, | |
| Document.org_id == current_user.org_id, | |
| ).order_by(Document.created_at.desc()).all() | |
| # Build breadcrumb path | |
| path = _build_path(folder, db) | |
| return { | |
| **_folder_response(folder, db), | |
| "path": path, | |
| "subfolders": [_folder_response(f, db) for f in subfolders], | |
| "documents": [_doc_response(d) for d in documents], | |
| } | |
| # ββ Rename ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def rename_folder( | |
| folder_id: str, | |
| request: RenameFolderRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| folder = _get_or_404(folder_id, current_user.org_id, db) | |
| # Duplicate check within same parent | |
| existing = db.query(Folder).filter( | |
| Folder.org_id == current_user.org_id, | |
| Folder.name == request.name.strip(), | |
| Folder.parent_id == folder.parent_id, | |
| Folder.id != folder_id, | |
| ).first() | |
| if existing: | |
| raise HTTPException(400, detail=f"A folder named '{request.name}' already exists here") | |
| folder.name = request.name.strip() | |
| db.commit() | |
| db.refresh(folder) | |
| return _folder_response(folder, db) | |
| # ββ Move folder βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def move_folder( | |
| folder_id: str, | |
| request: MoveFolderRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| """Move a folder to a different parent (or root).""" | |
| folder = _get_or_404(folder_id, current_user.org_id, db) | |
| # Prevent moving into own descendant | |
| if request.parent_id: | |
| if request.parent_id == folder_id: | |
| raise HTTPException(400, detail="Cannot move a folder into itself") | |
| if _is_descendant(folder_id, request.parent_id, db): | |
| raise HTTPException(400, detail="Cannot move a folder into its own subfolder") | |
| parent = db.query(Folder).filter( | |
| Folder.id == request.parent_id, | |
| Folder.org_id == current_user.org_id, | |
| ).first() | |
| if not parent: | |
| raise HTTPException(404, detail="Target folder not found") | |
| folder.parent_id = request.parent_id | |
| db.commit() | |
| db.refresh(folder) | |
| return _folder_response(folder, db) | |
| # ββ Move document into folder βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def move_document_to_folder( | |
| folder_id: str, | |
| request: MoveDocumentRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| _get_or_404(folder_id, current_user.org_id, db) | |
| doc = db.query(Document).filter( | |
| Document.id == request.document_id, | |
| Document.org_id == current_user.org_id, | |
| ).first() | |
| if not doc: | |
| raise HTTPException(404, detail="Document not found") | |
| doc.folder_id = folder_id | |
| db.commit() | |
| return {"message": "Document moved", "document_id": request.document_id, "folder_id": folder_id} | |
| # ββ Delete ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def delete_folder( | |
| folder_id: str, | |
| force: bool = False, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| """ | |
| Delete a folder. | |
| - force=false (default): fails if folder has documents or subfolders | |
| - force=true: recursively deletes all subfolders and moves documents to root | |
| """ | |
| folder = _get_or_404(folder_id, current_user.org_id, db) | |
| if force: | |
| _recursive_delete(folder_id, current_user.org_id, db) | |
| else: | |
| doc_count = db.query(Document).filter(Document.folder_id == folder_id).count() | |
| sub_count = db.query(Folder).filter(Folder.parent_id == folder_id).count() | |
| if doc_count > 0 or sub_count > 0: | |
| raise HTTPException( | |
| 400, | |
| detail=f"Folder is not empty ({doc_count} files, {sub_count} subfolders). Use ?force=true." | |
| ) | |
| db.delete(folder) | |
| db.commit() | |
| return {"message": "Folder deleted"} | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_or_404(folder_id: str, org_id: str, db: Session) -> Folder: | |
| f = db.query(Folder).filter(Folder.id == folder_id, Folder.org_id == org_id).first() | |
| if not f: | |
| raise HTTPException(404, detail="Folder not found") | |
| return f | |
| def _folder_response(folder: Folder, db: Session) -> dict: | |
| doc_count = db.query(Document).filter(Document.folder_id == folder.id).count() | |
| sub_count = db.query(Folder).filter(Folder.parent_id == folder.id).count() | |
| return { | |
| "id": folder.id, | |
| "name": folder.name, | |
| "parent_id": folder.parent_id, | |
| "document_count": doc_count, | |
| "subfolder_count": sub_count, | |
| "created_at": folder.created_at.isoformat(), | |
| } | |
| def _doc_response(doc: Document) -> dict: | |
| return { | |
| "id": doc.id, "name": doc.name, "size": doc.size, | |
| "chunks": doc.chunks, "status": doc.status, | |
| "folder_id": doc.folder_id, "created_at": doc.created_at.isoformat(), | |
| } | |
| def _build_path(folder: Folder, db: Session) -> List[dict]: | |
| """Build breadcrumb path from root to this folder.""" | |
| path = [] | |
| current = folder | |
| while current: | |
| path.insert(0, {"id": current.id, "name": current.name}) | |
| if current.parent_id: | |
| current = db.query(Folder).filter(Folder.id == current.parent_id).first() | |
| else: | |
| break | |
| return path | |
| def _is_descendant(ancestor_id: str, candidate_id: str, db: Session) -> bool: | |
| """Check if candidate_id is a descendant of ancestor_id.""" | |
| current = db.query(Folder).filter(Folder.id == candidate_id).first() | |
| while current and current.parent_id: | |
| if current.parent_id == ancestor_id: | |
| return True | |
| current = db.query(Folder).filter(Folder.id == current.parent_id).first() | |
| return False | |
| def _recursive_delete(folder_id: str, org_id: str, db: Session) -> None: | |
| """Recursively delete folder and all subfolders; move docs to root.""" | |
| # Move documents to root | |
| db.query(Document).filter(Document.folder_id == folder_id).update({"folder_id": None}) | |
| # Recurse into subfolders | |
| children = db.query(Folder).filter(Folder.parent_id == folder_id, Folder.org_id == org_id).all() | |
| for child in children: | |
| _recursive_delete(child.id, org_id, db) | |
| # Delete this folder | |
| folder = db.query(Folder).filter(Folder.id == folder_id).first() | |
| if folder: | |
| db.delete(folder) | |
| db.commit() | |