File size: 5,399 Bytes
daaa6ed
 
 
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
 
36ce73b
 
daaa6ed
 
 
 
36ce73b
 
 
 
 
daaa6ed
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36ce73b
daaa6ed
36ce73b
daaa6ed
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""

File management API.



Single Responsibility: only handles file CRUD operations within zones.

Separated from zone management (zones.py) — each has its own reason to change.

"""

import os
import shutil
from datetime import datetime
from pathlib import Path

from fastapi import APIRouter, Depends, Form, File, UploadFile, Query, HTTPException
from fastapi.responses import FileResponse

from auth import AuthUser, get_current_user
from storage import get_zone_path, safe_path, check_zone_owner

router = APIRouter(prefix="/api/zones/{zone_name}/files", tags=["files"])


def _check_access(zone_name: str, user: AuthUser):
    """Validate zone access for the current user."""
    check_zone_owner(zone_name, user.sub, user.role)


@router.get("")
def list_files(zone_name: str, path: str = Query(""), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        target = safe_path(zone_path, path)
        if not target.is_dir():
            raise ValueError("Không phải thư mục")
        return [
            {
                "name": item.name,
                "is_dir": item.is_dir(),
                "size": item.stat().st_size if item.is_file() else 0,
                "modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
            }
            for item in sorted(target.iterdir())
        ]
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.get("/read")
def read_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        target = safe_path(zone_path, path)
        if not target.is_file():
            raise ValueError("File không tồn tại")
        return {"content": target.read_text(encoding="utf-8", errors="replace"), "path": path}
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.get("/download")
def download_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        target = safe_path(zone_path, path)
        if not target.is_file():
            raise HTTPException(404, "File không tồn tại")
        return FileResponse(target, filename=target.name)
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.post("/write")
def write_file(zone_name: str, path: str = Form(...), content: str = Form(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        target = safe_path(zone_path, path)
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(content, encoding="utf-8")
        return {"ok": True}
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.post("/mkdir")
def create_folder(zone_name: str, path: str = Form(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        target = safe_path(zone_path, path)
        target.mkdir(parents=True, exist_ok=True)
        return {"ok": True}
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.post("/upload")
async def upload_file(zone_name: str, path: str = Form(""), file: UploadFile = File(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        dest = safe_path(zone_path, os.path.join(path, file.filename))
        dest.parent.mkdir(parents=True, exist_ok=True)
        content = await file.read()
        dest.write_bytes(content)
        return {"ok": True, "path": str(dest.relative_to(zone_path))}
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.delete("")
def delete_file(zone_name: str, path: str = Query(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        target = safe_path(zone_path, path)
        if target == zone_path.resolve():
            raise ValueError("Không thể xoá thư mục gốc zone")
        if target.is_dir():
            shutil.rmtree(target)
        elif target.is_file():
            target.unlink()
        else:
            raise ValueError("File/thư mục không tồn tại")
        return {"ok": True}
    except ValueError as e:
        raise HTTPException(400, str(e))


@router.post("/rename")
def rename_file(zone_name: str, old_path: str = Form(...), new_name: str = Form(...), user: AuthUser = Depends(get_current_user)):
    try:
        _check_access(zone_name, user)
        zone_path = get_zone_path(zone_name)
        source = safe_path(zone_path, old_path)
        if not source.exists():
            raise ValueError("File/thư mục nguồn không tồn tại")
        dest = safe_path(zone_path, str(Path(old_path).parent / new_name))
        source.rename(dest)
        return {"ok": True}
    except ValueError as e:
        raise HTTPException(400, str(e))