| """
|
| File management API.
|
|
|
| Single Responsibility: only handles file CRUD operations within zones.
|
| Separated from zone management (zones.py) — each has its own reason to change.
|
| """
|
|
|
| import os
|
| import shutil
|
| from datetime import datetime
|
| from pathlib import Path
|
|
|
| from fastapi import APIRouter, Depends, Form, File, UploadFile, Query, HTTPException
|
| from fastapi.responses import FileResponse
|
|
|
| from auth import AuthUser, get_current_user
|
| from storage import get_zone_path, safe_path, check_zone_owner
|
|
|
| router = APIRouter(prefix="/api/zones/{zone_name}/files", tags=["files"])
|
|
|
|
|
| def _check_access(zone_name: str, user: AuthUser):
|
| """Validate zone access for the current user."""
|
| check_zone_owner(zone_name, user.sub, user.role)
|
|
|
|
|
| @router.get("")
|
| def list_files(zone_name: str, path: str = Query(""), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| target = safe_path(zone_path, path)
|
| if not target.is_dir():
|
| raise ValueError("Không phải thư mục")
|
| return [
|
| {
|
| "name": item.name,
|
| "is_dir": item.is_dir(),
|
| "size": item.stat().st_size if item.is_file() else 0,
|
| "modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
|
| }
|
| for item in sorted(target.iterdir())
|
| ]
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.get("/read")
|
| def read_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| target = safe_path(zone_path, path)
|
| if not target.is_file():
|
| raise ValueError("File không tồn tại")
|
| return {"content": target.read_text(encoding="utf-8", errors="replace"), "path": path}
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.get("/download")
|
| def download_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| target = safe_path(zone_path, path)
|
| if not target.is_file():
|
| raise HTTPException(404, "File không tồn tại")
|
| return FileResponse(target, filename=target.name)
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.post("/write")
|
| def write_file(zone_name: str, path: str = Form(...), content: str = Form(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| target = safe_path(zone_path, path)
|
| target.parent.mkdir(parents=True, exist_ok=True)
|
| target.write_text(content, encoding="utf-8")
|
| return {"ok": True}
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.post("/mkdir")
|
| def create_folder(zone_name: str, path: str = Form(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| target = safe_path(zone_path, path)
|
| target.mkdir(parents=True, exist_ok=True)
|
| return {"ok": True}
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.post("/upload")
|
| async def upload_file(zone_name: str, path: str = Form(""), file: UploadFile = File(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| dest = safe_path(zone_path, os.path.join(path, file.filename))
|
| dest.parent.mkdir(parents=True, exist_ok=True)
|
| content = await file.read()
|
| dest.write_bytes(content)
|
| return {"ok": True, "path": str(dest.relative_to(zone_path))}
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.delete("")
|
| def delete_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| target = safe_path(zone_path, path)
|
| if target == zone_path.resolve():
|
| raise ValueError("Không thể xoá thư mục gốc zone")
|
| if target.is_dir():
|
| shutil.rmtree(target)
|
| elif target.is_file():
|
| target.unlink()
|
| else:
|
| raise ValueError("File/thư mục không tồn tại")
|
| return {"ok": True}
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|
|
|
| @router.post("/rename")
|
| def rename_file(zone_name: str, old_path: str = Form(...), new_name: str = Form(...), user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _check_access(zone_name, user)
|
| zone_path = get_zone_path(zone_name)
|
| source = safe_path(zone_path, old_path)
|
| if not source.exists():
|
| raise ValueError("File/thư mục nguồn không tồn tại")
|
| dest = safe_path(zone_path, str(Path(old_path).parent / new_name))
|
| source.rename(dest)
|
| return {"ok": True}
|
| except ValueError as e:
|
| raise HTTPException(400, str(e))
|
|
|