from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form from fastapi.responses import HTMLResponse, FileResponse from starlette.responses import Response from pydantic import BaseModel from typing import List from passlib.context import CryptContext import os import uvicorn import shutil import requests app = FastAPI() # Configuración de seguridad para las contraseñas pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Directorios para subir archivos y para respaldos en Hugging Face UPLOAD_DIRECTORY = "/mnt/data/uploads" BACKUP_SPACE = "SorovotPelo/IseaUploadsArchive" HF_TOKEN = os.getenv("HF_API_TOKEN") # Obtén el token desde las variables de entorno # Crear el directorio de uploads si no existe os.makedirs(UPLOAD_DIRECTORY, exist_ok=True) # Base de datos simulada users_db = {} files_db = {} class User(BaseModel): username: str password: str def get_password_hash(password): return pwd_context.hash(password) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def authenticate_user(username: str, password: str): user = users_db.get(username) if user and verify_password(password, user["password"]): return user return False @app.post("/register") async def register(username: str = Form(...), password: str = Form(...)): if username in users_db: raise HTTPException(status_code=400, detail="Usuario ya existe") hashed_password = get_password_hash(password) users_db[username] = {"username": username, "password": hashed_password} files_db[username] = [] return {"message": "Usuario registrado con éxito"} @app.post("/login") async def login(username: str = Form(...), password: str = Form(...)): user = authenticate_user(username, password) if not user: raise HTTPException(status_code=400, detail="Usuario o contraseña incorrectos") return {"message": f"Bienvenido {username}"} @app.post("/upload") async def upload_file(username: str = Form(...), file: UploadFile = File(...)): user_files = files_db.get(username) if not user_files: raise HTTPException(status_code=404, detail="Usuario no encontrado") file_path = os.path.join(UPLOAD_DIRECTORY, username, file.filename) os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as f: shutil.copyfileobj(file.file, f) # Respaldo en Hugging Face Spaces response = requests.post( f"https://huggingface.co/api/spaces/{BACKUP_SPACE}/uploads", headers={"Authorization": f"Bearer {HF_TOKEN}"}, files={"file": (file.filename, open(file_path, "rb"))} ) if response.status_code != 200: raise HTTPException(status_code=500, detail="Error al respaldar archivo en Hugging Face") user_files.append(file.filename) return {"message": "Archivo subido con éxito", "filename": file.filename, "url": f"/download/{username}/{file.filename}"} @app.get("/download/{username}/{filename}") async def download_file(username: str, filename: str): user_files = files_db.get(username) if not user_files or filename not in user_files: raise HTTPException(status_code=404, detail="Archivo no encontrado") file_path = os.path.join(UPLOAD_DIRECTORY, username, filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="Archivo no encontrado") return FileResponse(file_path) @app.get("/", response_class=HTMLResponse) async def home(): with open("index.html", "r") as f: return f.read() @app.get("/styles.css", response_class=FileResponse) async def get_styles(): file_path = "styles.css" if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="CSS no encontrado") return FileResponse(file_path) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)