File size: 13,157 Bytes
0053eb2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97872c7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import os
import shutil
import aiofiles
from fastapi import (
    FastAPI, UploadFile, File, HTTPException, Request, Depends, Form, APIRouter, status
)
import datetime
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pathlib import Path
import logging # Standard logging
import datetime # For admin logic timestamps
import math # For admin logic format_bytes
from typing import List, Optional # For admin logic type hinting

# --- Configuration ---
BASE_DIR = Path(__file__).resolve().parent
FILES_DIR = BASE_DIR / "uploaded_files"
STATIC_DIR = BASE_DIR / "static"
TEMPLATES_DIR = BASE_DIR / "templates"
# LOG_FILE constant removed

# Create directories if they don't exist
FILES_DIR.mkdir(parents=True, exist_ok=True)
STATIC_DIR.mkdir(parents=True, exist_ok=True)
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)

# --- Logging Configuration (Console Only) ---
logger = logging.getLogger("file_app")
logger.setLevel(logging.INFO)
# No file handler, no propagate=False. Logs will go to console via root logger.

logger.info("Application starting up... Logging to console.")

app = FastAPI(title="File Uploader/Downloader")

# Setup Jinja2 templates
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))

# --- Admin Panel Configuration & Logic ---
ADMIN_EMAIL = "admin@example.com"
ADMIN_PASSWORD = "adminpassword"
ADMIN_SESSION_COOKIE_NAME = "admin_session_id"
ADMIN_SESSION_SECRET_VALUE = "my_super_secret_admin_session_value_for_demo_only_change_me"

admin_router = APIRouter(
    prefix="/admin",
    tags=["Admin"],
)

# --- Admin Helper Functions ---
def get_file_info(file_path: Path):
    try:
        if file_path.is_file():
            stat_info = file_path.stat()
            return {
                "name": file_path.name,
                "size_bytes": stat_info.st_size,
                "size_human": format_bytes(stat_info.st_size),
                "modified_at": datetime.datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S UTC'),
                "url": f"/download/{file_path.name}"
            }
    except Exception as e:
        logger.error(f"Admin: Error getting file info for {file_path}: {e}")
        return None
    return None

def format_bytes(bytes_val, decimals=2):
    if bytes_val == 0: return '0 Bytes'
    k = 1024
    dm = decimals if decimals >= 0 else 0
    sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB']
    i = 0
    if bytes_val > 0:
        i = int(math.floor(math.log(bytes_val) / math.log(k)))
    if i >= len(sizes): i = len(sizes) -1
    return f"{bytes_val / (k**i):.{dm}f} {sizes[i]}"

# get_log_lines function removed as file logging is removed.

# --- Admin Authentication Dependencies ---
async def get_current_admin(request: Request):
    session_cookie = request.cookies.get(ADMIN_SESSION_COOKIE_NAME)
    if session_cookie == ADMIN_SESSION_SECRET_VALUE:
        return {"email": ADMIN_EMAIL} 
    
    current_path = request.url.path
    if current_path not in [app.url_path_for("admin_login_page"), app.url_path_for("admin_authenticate")]:
        logger.info(f"Admin: Unauthorized access attempt to {current_path}, redirecting to login.")
        return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT)
    return None

async def verify_admin_auth(request: Request):
    admin_user = await get_current_admin(request)
    if isinstance(admin_user, RedirectResponse):
        raise HTTPException(
            status_code=status.HTTP_307_TEMPORARY_REDIRECT,
            detail="Redirecting to login.",
            headers={"Location": app.url_path_for("admin_login_page")},
        )
    if not admin_user:
         logger.warning(f"Admin: Strict auth verification failed for {request.url.path}.")
         raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Bearer"}, 
        )
    return admin_user

# --- Admin Routes ---
@admin_router.get("/login", response_class=HTMLResponse, name="admin_login_page")
async def admin_login_page(request: Request):
    logger.info("Admin: Login page accessed.")
    current_year = datetime.datetime.now(datetime.timezone.utc).year # Get current UTC year
    return templates.TemplateResponse(
        "admin_login.html",
        {
            "request": request,
            "url_for": app.url_path_for,
            "current_year": current_year # Add current year to context
        }
    )

@admin_router.post("/auth", name="admin_authenticate")
async def admin_authenticate(request: Request, email: str = Form(...), password: str = Form(...)):
    if email == ADMIN_EMAIL and password == ADMIN_PASSWORD:
        logger.info(f"Admin: Successful login for email {email}.")
        response = RedirectResponse(url=app.url_path_for("admin_dashboard"), status_code=status.HTTP_303_SEE_OTHER)
        response.set_cookie(
            key=ADMIN_SESSION_COOKIE_NAME, value=ADMIN_SESSION_SECRET_VALUE,
            httponly=True, samesite="lax", max_age=30 * 60
        )
        return response
    else:
        logger.warning(f"Admin: Failed login attempt for email {email}.")
        error_msg = "Invalid email or password."
        return templates.TemplateResponse(
            "admin_login.html", 
            {"request": request, "error": error_msg, "url_for": app.url_path_for}, 
            status_code=status.HTTP_401_UNAUTHORIZED
        )

@admin_router.get("/logout", name="admin_logout")
async def admin_logout(request: Request):
    logger.info("Admin: Logout initiated.")
    response = RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_303_SEE_OTHER)
    response.delete_cookie(ADMIN_SESSION_COOKIE_NAME, httponly=True, samesite="lax")
    return response

@admin_router.get("/dashboard", response_class=HTMLResponse, name="admin_dashboard")
async def admin_dashboard(request: Request, admin_user: dict = Depends(get_current_admin)):
    if isinstance(admin_user, RedirectResponse): return admin_user
    if not admin_user: return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT)
    
    logger.info(f"Admin: Dashboard accessed by {admin_user.get('email')}.")
    num_files = 0
    try:
        num_files = len([name for name in os.listdir(FILES_DIR) if (FILES_DIR / name).is_file()])
    except Exception as e:
        logger.error(f"Admin: Error counting files for dashboard: {e}")

    # log_lines_count removed
    return templates.TemplateResponse("admin_dashboard.html", {
        "request": request,
        "num_files": num_files,
        "admin_user": admin_user,
        "url_for": app.url_path_for
    })

@admin_router.get("/files", response_class=HTMLResponse, name="admin_files_page")
async def admin_files_page(request: Request, admin_user: dict = Depends(get_current_admin)):
    if isinstance(admin_user, RedirectResponse): return admin_user
    if not admin_user: return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT)

    logger.info(f"Admin: Files page accessed by {admin_user.get('email')}.")
    uploaded_file_objects = []
    try:
        for item_name in sorted(os.listdir(FILES_DIR)):
            item_path = FILES_DIR / item_name
            info = get_file_info(item_path)
            if info:
                uploaded_file_objects.append(info)
    except Exception as e:
        logger.error(f"Admin: Error listing files: {e}")
        
    return templates.TemplateResponse("admin_files.html", {
        "request": request,
        "files": uploaded_file_objects,
        "admin_user": admin_user,
        "url_for": app.url_path_for
    })

@admin_router.post("/files/delete/{filename}", name="admin_delete_file")
async def admin_delete_file(request: Request, filename: str, admin_user: dict = Depends(verify_admin_auth)):
    logger.info(f"Admin: Attempting to delete file '{filename}' by {admin_user.get('email')}.")
    safe_filename = sanitize_filename(filename)
    file_path = FILES_DIR / safe_filename

    if not file_path.resolve().is_relative_to(FILES_DIR.resolve()):
        logger.error(f"Admin: Invalid file path for deletion (traversal attempt) '{filename}'.")
        raise HTTPException(status_code=400, detail="Invalid file path for deletion.")

    if file_path.is_file():
        try:
            file_path.unlink()
            logger.info(f"Admin: File '{filename}' deleted successfully by {admin_user.get('email')}.")
            return RedirectResponse(url=app.url_path_for("admin_files_page") + "?message=File+deleted+successfully", status_code=status.HTTP_303_SEE_OTHER)
        except Exception as e:
            logger.error(f"Admin: Could not delete file '{filename}': {str(e)}")
            raise HTTPException(status_code=500, detail=f"Could not delete file: {str(e)}")
    else:
        logger.warning(f"Admin: File not found for deletion '{filename}'.")
        raise HTTPException(status_code=404, detail="File not found for deletion.")

# /admin/logs route and admin_logs_page function removed

# --- Main Application Helper Functions (Original) ---
def sanitize_filename(filename: str) -> str:
    return os.path.basename(filename).strip()

# --- Main Application Routes (Original functionality with added logging) ---
@app.get("/", response_class=HTMLResponse, name="read_root")
async def read_root(request: Request):
    logger.info(f"Root page accessed by {request.client.host}")
    return templates.TemplateResponse("index.html", {"request": request, "url_for": app.url_path_for})

@app.post("/upload/", name="upload_file")
async def upload_file(request: Request, file: UploadFile = File(...)):
    original_filename = file.filename
    logger.info(f"Upload attempt for '{original_filename}' from {request.client.host}")

    if not original_filename:
        logger.warning(f"Upload failed: No filename provided by {request.client.host}")
        raise HTTPException(status_code=400, detail="No filename provided.")

    filename = sanitize_filename(original_filename)
    if not filename: 
        logger.warning(f"Upload failed: Invalid filename '{original_filename}' after sanitization by {request.client.host}")
        raise HTTPException(status_code=400, detail="Invalid filename after sanitization.")

    file_path = FILES_DIR / filename

    if not file_path.resolve().is_relative_to(FILES_DIR.resolve()):
        logger.error(f"Upload security violation: Attempted directory traversal for '{filename}' by {request.client.host}")
        raise HTTPException(status_code=400, detail="Invalid file path (attempted directory traversal).")

    try:
        async with aiofiles.open(file_path, "wb") as buffer:
            while content := await file.read(1024 * 1024):
                await buffer.write(content)
        file_size = file_path.stat().st_size
        logger.info(f"File '{filename}' uploaded successfully by {request.client.host}. Size: {file_size} bytes.")
    except Exception as e:
        logger.error(f"Could not save file '{filename}' from {request.client.host}: {str(e)}")
        if file_path.exists():
            try:
                file_path.unlink()
                logger.info(f"Cleaned up partially uploaded file '{filename}'.")
            except Exception as unlink_e:
                logger.error(f"Error cleaning up partially uploaded file '{filename}': {unlink_e}")
        raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}")

    return JSONResponse(
        content={
            "message": "File uploaded successfully",
            "filename": filename,
            "download_url": app.url_path_for("download_file", filename=filename)
        }
    )

@app.get("/download/{filename}", name="download_file")
async def download_file(request: Request, filename: str):
    clean_filename = sanitize_filename(filename)
    logger.info(f"Download attempt for '{clean_filename}' by {request.client.host}")

    if not clean_filename:
        logger.warning(f"Download failed: Invalid filename '{filename}' by {request.client.host}")
        raise HTTPException(status_code=400, detail="Invalid filename.")

    file_path = FILES_DIR / clean_filename

    if not file_path.resolve().is_relative_to(FILES_DIR.resolve()) or not file_path.is_file():
        logger.warning(f"Download failed: File not found or access denied for '{clean_filename}' by {request.client.host}")
        raise HTTPException(status_code=404, detail="File not found or access denied.")
    
    logger.info(f"File '{clean_filename}' downloaded by {request.client.host}")
    return FileResponse(path=file_path, filename=clean_filename, media_type='application/octet-stream')

app.include_router(admin_router)
logger.info("Admin panel router included. File-based logging is disabled.")

if __name__ == "__main__":
    import uvicorn
    logger.info("Starting Uvicorn server for local development...")
    uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)