Ragora-Server / app /api /folders.py
Peterase's picture
feat: nested folder hierarchy with parent_id, root_only filter, recursive delete
5ebe979
"""Folders API β€” full nested folder support (real file manager)."""
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import get_db
from app.auth import get_current_user
from app.models import User, Folder, Document
from typing import Optional, List
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/folders", tags=["folders"])
class CreateFolderRequest(BaseModel):
name: str
parent_id: Optional[str] = None # None = root level
class RenameFolderRequest(BaseModel):
name: str
class MoveDocumentRequest(BaseModel):
document_id: str
class MoveFolderRequest(BaseModel):
parent_id: Optional[str] = None # None = move to root
# ── Create ────────────────────────────────────────────────────────────────────
@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_folder(
request: CreateFolderRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create a folder. Pass parent_id to create a subfolder."""
# Validate parent exists and belongs to same org
if request.parent_id:
parent = db.query(Folder).filter(
Folder.id == request.parent_id,
Folder.org_id == current_user.org_id,
).first()
if not parent:
raise HTTPException(status_code=404, detail="Parent folder not found")
# Duplicate name check within same parent
existing = db.query(Folder).filter(
Folder.org_id == current_user.org_id,
Folder.name == request.name.strip(),
Folder.parent_id == request.parent_id,
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"A folder named '{request.name}' already exists here",
)
folder = Folder(
name=request.name.strip(),
org_id=current_user.org_id,
parent_id=request.parent_id,
)
db.add(folder)
db.commit()
db.refresh(folder)
return _folder_response(folder, db)
# ── List ──────────────────────────────────────────────────────────────────────
@router.get("/")
async def list_folders(
parent_id: Optional[str] = None, # None = root level; pass id for children
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
List folders.
- GET /folders/ β†’ root-level folders
- GET /folders/?parent_id=X β†’ subfolders of X
"""
query = db.query(Folder).filter(
Folder.org_id == current_user.org_id,
Folder.parent_id == parent_id,
).order_by(Folder.created_at.desc())
return [_folder_response(f, db) for f in query.all()]
@router.get("/tree")
async def folder_tree(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return the full folder tree (all folders with nesting info)."""
all_folders = db.query(Folder).filter(
Folder.org_id == current_user.org_id
).order_by(Folder.created_at.desc()).all()
def build_tree(parent_id: Optional[str]) -> List[dict]:
nodes = [f for f in all_folders if f.parent_id == parent_id]
return [
{
**_folder_response(f, db),
"children": build_tree(f.id),
}
for f in nodes
]
return build_tree(None)
# ── Get single ────────────────────────────────────────────────────────────────
@router.get("/{folder_id}")
async def get_folder(
folder_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get folder details + its direct documents + subfolders."""
folder = _get_or_404(folder_id, current_user.org_id, db)
subfolders = db.query(Folder).filter(
Folder.parent_id == folder_id,
Folder.org_id == current_user.org_id,
).order_by(Folder.created_at.desc()).all()
documents = db.query(Document).filter(
Document.folder_id == folder_id,
Document.org_id == current_user.org_id,
).order_by(Document.created_at.desc()).all()
# Build breadcrumb path
path = _build_path(folder, db)
return {
**_folder_response(folder, db),
"path": path,
"subfolders": [_folder_response(f, db) for f in subfolders],
"documents": [_doc_response(d) for d in documents],
}
# ── Rename ────────────────────────────────────────────────────────────────────
@router.patch("/{folder_id}")
async def rename_folder(
folder_id: str,
request: RenameFolderRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
folder = _get_or_404(folder_id, current_user.org_id, db)
# Duplicate check within same parent
existing = db.query(Folder).filter(
Folder.org_id == current_user.org_id,
Folder.name == request.name.strip(),
Folder.parent_id == folder.parent_id,
Folder.id != folder_id,
).first()
if existing:
raise HTTPException(400, detail=f"A folder named '{request.name}' already exists here")
folder.name = request.name.strip()
db.commit()
db.refresh(folder)
return _folder_response(folder, db)
# ── Move folder ───────────────────────────────────────────────────────────────
@router.patch("/{folder_id}/move")
async def move_folder(
folder_id: str,
request: MoveFolderRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Move a folder to a different parent (or root)."""
folder = _get_or_404(folder_id, current_user.org_id, db)
# Prevent moving into own descendant
if request.parent_id:
if request.parent_id == folder_id:
raise HTTPException(400, detail="Cannot move a folder into itself")
if _is_descendant(folder_id, request.parent_id, db):
raise HTTPException(400, detail="Cannot move a folder into its own subfolder")
parent = db.query(Folder).filter(
Folder.id == request.parent_id,
Folder.org_id == current_user.org_id,
).first()
if not parent:
raise HTTPException(404, detail="Target folder not found")
folder.parent_id = request.parent_id
db.commit()
db.refresh(folder)
return _folder_response(folder, db)
# ── Move document into folder ─────────────────────────────────────────────────
@router.post("/{folder_id}/move-document")
async def move_document_to_folder(
folder_id: str,
request: MoveDocumentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_get_or_404(folder_id, current_user.org_id, db)
doc = db.query(Document).filter(
Document.id == request.document_id,
Document.org_id == current_user.org_id,
).first()
if not doc:
raise HTTPException(404, detail="Document not found")
doc.folder_id = folder_id
db.commit()
return {"message": "Document moved", "document_id": request.document_id, "folder_id": folder_id}
# ── Delete ────────────────────────────────────────────────────────────────────
@router.delete("/{folder_id}")
async def delete_folder(
folder_id: str,
force: bool = False,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Delete a folder.
- force=false (default): fails if folder has documents or subfolders
- force=true: recursively deletes all subfolders and moves documents to root
"""
folder = _get_or_404(folder_id, current_user.org_id, db)
if force:
_recursive_delete(folder_id, current_user.org_id, db)
else:
doc_count = db.query(Document).filter(Document.folder_id == folder_id).count()
sub_count = db.query(Folder).filter(Folder.parent_id == folder_id).count()
if doc_count > 0 or sub_count > 0:
raise HTTPException(
400,
detail=f"Folder is not empty ({doc_count} files, {sub_count} subfolders). Use ?force=true."
)
db.delete(folder)
db.commit()
return {"message": "Folder deleted"}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_or_404(folder_id: str, org_id: str, db: Session) -> Folder:
f = db.query(Folder).filter(Folder.id == folder_id, Folder.org_id == org_id).first()
if not f:
raise HTTPException(404, detail="Folder not found")
return f
def _folder_response(folder: Folder, db: Session) -> dict:
doc_count = db.query(Document).filter(Document.folder_id == folder.id).count()
sub_count = db.query(Folder).filter(Folder.parent_id == folder.id).count()
return {
"id": folder.id,
"name": folder.name,
"parent_id": folder.parent_id,
"document_count": doc_count,
"subfolder_count": sub_count,
"created_at": folder.created_at.isoformat(),
}
def _doc_response(doc: Document) -> dict:
return {
"id": doc.id, "name": doc.name, "size": doc.size,
"chunks": doc.chunks, "status": doc.status,
"folder_id": doc.folder_id, "created_at": doc.created_at.isoformat(),
}
def _build_path(folder: Folder, db: Session) -> List[dict]:
"""Build breadcrumb path from root to this folder."""
path = []
current = folder
while current:
path.insert(0, {"id": current.id, "name": current.name})
if current.parent_id:
current = db.query(Folder).filter(Folder.id == current.parent_id).first()
else:
break
return path
def _is_descendant(ancestor_id: str, candidate_id: str, db: Session) -> bool:
"""Check if candidate_id is a descendant of ancestor_id."""
current = db.query(Folder).filter(Folder.id == candidate_id).first()
while current and current.parent_id:
if current.parent_id == ancestor_id:
return True
current = db.query(Folder).filter(Folder.id == current.parent_id).first()
return False
def _recursive_delete(folder_id: str, org_id: str, db: Session) -> None:
"""Recursively delete folder and all subfolders; move docs to root."""
# Move documents to root
db.query(Document).filter(Document.folder_id == folder_id).update({"folder_id": None})
# Recurse into subfolders
children = db.query(Folder).filter(Folder.parent_id == folder_id, Folder.org_id == org_id).all()
for child in children:
_recursive_delete(child.id, org_id, db)
# Delete this folder
folder = db.query(Folder).filter(Folder.id == folder_id).first()
if folder:
db.delete(folder)
db.commit()