Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import HTMLResponse, FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| import os | |
| import subprocess | |
| import datetime | |
| import uuid | |
| from pathlib import Path | |
| import logging | |
| import shutil | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger("file-test-app") | |
| app = FastAPI(title="File System Test App") | |
| # Setup templates and static files | |
| templates = Jinja2Templates(directory="templates") | |
| # Create files directory if it doesn't exist | |
| os.makedirs("templates", exist_ok=True) | |
| os.makedirs("files", exist_ok=True) | |
| os.makedirs("static", exist_ok=True) | |
| # Mount files directory as static | |
| app.mount("/files", StaticFiles(directory="files"), name="files") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| async def home(request: Request): | |
| """Home page""" | |
| # Log system information | |
| try: | |
| whoami = subprocess.run(["whoami"], capture_output=True, text=True).stdout.strip() | |
| file_perms = subprocess.run(["ls", "-la", "/app/files"], capture_output=True, text=True).stdout | |
| disk_info = subprocess.run(["df", "-h"], capture_output=True, text=True).stdout | |
| logger.info(f"Running as user: {whoami}") | |
| logger.info(f"Files directory permissions:\n{file_perms}") | |
| logger.info(f"Disk space:\n{disk_info}") | |
| except Exception as e: | |
| logger.error(f"Error getting system info: {str(e)}") | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def save_file(content: str = Form(...), filename: str = Form(...)): | |
| """Save content to a file""" | |
| try: | |
| # Create a safe filename | |
| safe_filename = filename.replace(" ", "_") | |
| if not safe_filename.endswith(".txt"): | |
| safe_filename += ".txt" | |
| file_path = os.path.join("files", safe_filename) | |
| # Write the file | |
| with open(file_path, "w") as f: | |
| f.write(content) | |
| # Get file size | |
| file_size = os.path.getsize(file_path) | |
| return JSONResponse({ | |
| "success": True, | |
| "message": "File saved successfully", | |
| "file_path": file_path, | |
| "file_size": file_size, | |
| "url": f"/files/{safe_filename}" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error saving file: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error saving file: {str(e)}" | |
| }, status_code=500) | |
| async def upload_file(file: UploadFile = File(...)): | |
| """Upload a file""" | |
| try: | |
| # Create a safe filename | |
| safe_filename = file.filename.replace(" ", "_") | |
| file_path = os.path.join("files", safe_filename) | |
| # Save the file | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| # Get file size | |
| file_size = os.path.getsize(file_path) | |
| return JSONResponse({ | |
| "success": True, | |
| "message": "File uploaded successfully", | |
| "file_path": file_path, | |
| "file_size": file_size, | |
| "url": f"/files/{safe_filename}" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error uploading file: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error uploading file: {str(e)}" | |
| }, status_code=500) | |
| async def list_files(): | |
| """List all files in the files directory""" | |
| try: | |
| files_dir = Path("files") | |
| files = [] | |
| for f in files_dir.glob("*"): | |
| if f.is_file(): | |
| files.append({ | |
| "name": f.name, | |
| "path": str(f), | |
| "size": os.path.getsize(f), | |
| "size_readable": f"{os.path.getsize(f) / 1024:.2f} KB", | |
| "modified": datetime.datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S"), | |
| "url": f"/files/{f.name}" | |
| }) | |
| return JSONResponse({ | |
| "success": True, | |
| "files": files | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error listing files: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error listing files: {str(e)}" | |
| }, status_code=500) | |
| async def delete_file(filename: str): | |
| """Delete a file""" | |
| try: | |
| file_path = os.path.join("files", filename) | |
| if not os.path.exists(file_path): | |
| return JSONResponse({ | |
| "success": False, | |
| "message": "File not found" | |
| }, status_code=404) | |
| os.remove(file_path) | |
| return JSONResponse({ | |
| "success": True, | |
| "message": "File deleted successfully" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error deleting file: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error deleting file: {str(e)}" | |
| }, status_code=500) | |
| async def system_info(): | |
| """Get system information""" | |
| try: | |
| whoami = subprocess.run(["whoami"], capture_output=True, text=True).stdout.strip() | |
| file_perms = subprocess.run(["ls", "-la", "/app/files"], capture_output=True, text=True).stdout | |
| disk_info = subprocess.run(["df", "-h"], capture_output=True, text=True).stdout | |
| env_vars = {k: v for k, v in os.environ.items()} | |
| # Try to create a large test file | |
| test_file_path = os.path.join("files", "large_test_file.txt") | |
| try: | |
| with open(test_file_path, "w") as f: | |
| # Write a 10MB file with repeated text | |
| text_block = "This is a test line to check file system write capabilities.\n" * 100 | |
| for _ in range(1000): # Write 10,000 lines | |
| f.write(text_block) | |
| large_file_size = os.path.getsize(test_file_path) | |
| large_file_status = f"Created large test file: {large_file_size / (1024*1024):.2f} MB" | |
| except Exception as e: | |
| large_file_status = f"Failed to create large test file: {str(e)}" | |
| return JSONResponse({ | |
| "success": True, | |
| "whoami": whoami, | |
| "file_permissions": file_perms, | |
| "disk_info": disk_info, | |
| "large_file_test": large_file_status, | |
| "env_vars": env_vars, | |
| "python_version": subprocess.run(["python", "--version"], capture_output=True, text=True).stdout.strip(), | |
| "pwd": subprocess.run(["pwd"], capture_output=True, text=True).stdout.strip(), | |
| "ls_app": subprocess.run(["ls", "-la", "/app"], capture_output=True, text=True).stdout | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error getting system info: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error getting system info: {str(e)}" | |
| }, status_code=500) | |
| async def test_binary_file(): | |
| """Create a test binary file""" | |
| try: | |
| file_path = os.path.join("files", "binary_test.bin") | |
| # Create binary data | |
| binary_data = bytearray(os.urandom(1024 * 1024)) # 1MB of random data | |
| # Write binary file | |
| with open(file_path, "wb") as f: | |
| f.write(binary_data) | |
| file_size = os.path.getsize(file_path) | |
| return JSONResponse({ | |
| "success": True, | |
| "message": "Binary file created successfully", | |
| "file_path": file_path, | |
| "file_size": file_size, | |
| "url": f"/files/binary_test.bin" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error creating binary file: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error creating binary file: {str(e)}" | |
| }, status_code=500) | |
| # Testing command to simulate pg_dump | |
| async def test_command_output(): | |
| """Test running a command and saving its output""" | |
| try: | |
| output_file = os.path.join("files", "command_output.txt") | |
| # Run a command and redirect output to a file | |
| cmd = ["ls", "-la", "/"] | |
| # Method 1: Direct redirection | |
| with open(output_file, "w") as f: | |
| subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True) | |
| # Method 2: Capture then write | |
| output_file2 = os.path.join("files", "command_output2.txt") | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| with open(output_file2, "w") as f: | |
| f.write(result.stdout) | |
| return JSONResponse({ | |
| "success": True, | |
| "message": "Command output saved", | |
| "file1_path": output_file, | |
| "file1_size": os.path.getsize(output_file), | |
| "file1_url": f"/files/command_output.txt", | |
| "file2_path": output_file2, | |
| "file2_size": os.path.getsize(output_file2), | |
| "file2_url": f"/files/command_output2.txt", | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error running command: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error running command: {str(e)}" | |
| }, status_code=500) |