| """ |
| File management API. |
| |
| Single Responsibility: only handles file CRUD operations within zones. |
| Separated from zone management (zones.py) — each has its own reason to change. |
| """ |
|
|
| import os |
| import shutil |
| import tempfile |
| import zipfile |
| from datetime import datetime |
| from pathlib import Path |
|
|
| from fastapi import APIRouter, Depends, Form, File, UploadFile, Query, HTTPException |
| from fastapi.responses import FileResponse |
|
|
| from auth import AuthUser, get_current_user |
| from storage import get_zone_path, safe_path, check_zone_owner |
|
|
| router = APIRouter(prefix="/api/zones/{zone_name}/files", tags=["files"]) |
|
|
|
|
| def _check_access(zone_name: str, user: AuthUser): |
| """Validate zone access for the current user.""" |
| check_zone_owner(zone_name, user.sub, user.role) |
|
|
|
|
| @router.get("") |
| def list_files(zone_name: str, path: str = Query(""), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| target = safe_path(zone_path, path) |
| if not target.is_dir(): |
| raise ValueError("Không phải thư mục") |
| items = sorted(target.iterdir(), key=lambda item: (not item.is_dir(), item.name.lower())) |
| return [ |
| { |
| "name": item.name, |
| "is_dir": item.is_dir(), |
| "size": item.stat().st_size if item.is_file() else 0, |
| "modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat(), |
| } |
| for item in items |
| ] |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.get("/read") |
| def read_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| target = safe_path(zone_path, path) |
| if not target.is_file(): |
| raise ValueError("File không tồn tại") |
| return {"content": target.read_text(encoding="utf-8", errors="replace"), "path": path} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.get("/download") |
| def download_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| target = safe_path(zone_path, path) |
| if target.is_file(): |
| return FileResponse(target, filename=target.name) |
| if not target.is_dir(): |
| raise HTTPException(404, "File không tồn tại") |
|
|
| fd, temp_name = tempfile.mkstemp(prefix=f"{target.name}-", suffix=".zip") |
| os.close(fd) |
| archive_path = Path(temp_name) |
| with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf: |
| for child in target.rglob("*"): |
| if child.is_file(): |
| zf.write(child, arcname=child.relative_to(target.parent)) |
| return FileResponse(archive_path, filename=f"{target.name}.zip", background=None) |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.post("/write") |
| def write_file(zone_name: str, path: str = Form(...), content: str = Form(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| target = safe_path(zone_path, path) |
| target.parent.mkdir(parents=True, exist_ok=True) |
| target.write_text(content, encoding="utf-8") |
| return {"ok": True} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.post("/mkdir") |
| def create_folder(zone_name: str, path: str = Form(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| target = safe_path(zone_path, path) |
| target.mkdir(parents=True, exist_ok=True) |
| return {"ok": True} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.post("/upload") |
| async def upload_file(zone_name: str, path: str = Form(""), file: UploadFile = File(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| dest = safe_path(zone_path, os.path.join(path, file.filename)) |
| dest.parent.mkdir(parents=True, exist_ok=True) |
| content = await file.read() |
| dest.write_bytes(content) |
| return {"ok": True, "path": str(dest.relative_to(zone_path))} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.delete("") |
| def delete_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| target = safe_path(zone_path, path) |
| if target == zone_path.resolve(): |
| raise ValueError("Không thể xoá thư mục gốc zone") |
| if target.is_dir(): |
| shutil.rmtree(target) |
| elif target.is_file(): |
| target.unlink() |
| else: |
| raise ValueError("File/thư mục không tồn tại") |
| return {"ok": True} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
|
|
| @router.post("/rename") |
| def rename_file(zone_name: str, old_path: str = Form(...), new_name: str = Form(...), user: AuthUser = Depends(get_current_user)): |
| try: |
| _check_access(zone_name, user) |
| zone_path = get_zone_path(zone_name) |
| source = safe_path(zone_path, old_path) |
| if not source.exists(): |
| raise ValueError("File/thư mục nguồn không tồn tại") |
| dest = safe_path(zone_path, str(Path(old_path).parent / new_name)) |
| source.rename(dest) |
| return {"ok": True} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|