Spaces:
Running
Running
| from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from starlette.responses import Response | |
| from pydantic import BaseModel | |
| from typing import List | |
| from passlib.context import CryptContext | |
| import os | |
| import uvicorn | |
| import shutil | |
| import requests | |
| app = FastAPI() | |
| # Configuración de seguridad para las contraseñas | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # Directorios para subir archivos y para respaldos en Hugging Face | |
| UPLOAD_DIRECTORY = "/mnt/data/uploads" | |
| BACKUP_SPACE = "SorovotPelo/IseaUploadsArchive" | |
| HF_TOKEN = os.getenv("HF_API_TOKEN") # Obtén el token desde las variables de entorno | |
| # Crear el directorio de uploads si no existe | |
| os.makedirs(UPLOAD_DIRECTORY, exist_ok=True) | |
| # Base de datos simulada | |
| users_db = {} | |
| files_db = {} | |
| class User(BaseModel): | |
| username: str | |
| password: str | |
| def get_password_hash(password): | |
| return pwd_context.hash(password) | |
| def verify_password(plain_password, hashed_password): | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def authenticate_user(username: str, password: str): | |
| user = users_db.get(username) | |
| if user and verify_password(password, user["password"]): | |
| return user | |
| return False | |
| async def register(username: str = Form(...), password: str = Form(...)): | |
| if username in users_db: | |
| raise HTTPException(status_code=400, detail="Usuario ya existe") | |
| hashed_password = get_password_hash(password) | |
| users_db[username] = {"username": username, "password": hashed_password} | |
| files_db[username] = [] | |
| return {"message": "Usuario registrado con éxito"} | |
| async def login(username: str = Form(...), password: str = Form(...)): | |
| user = authenticate_user(username, password) | |
| if not user: | |
| raise HTTPException(status_code=400, detail="Usuario o contraseña incorrectos") | |
| return {"message": f"Bienvenido {username}"} | |
| async def upload_file(username: str = Form(...), file: UploadFile = File(...)): | |
| user_files = files_db.get(username) | |
| if not user_files: | |
| raise HTTPException(status_code=404, detail="Usuario no encontrado") | |
| file_path = os.path.join(UPLOAD_DIRECTORY, username, file.filename) | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| # Respaldo en Hugging Face Spaces | |
| response = requests.post( | |
| f"https://huggingface.co/api/spaces/{BACKUP_SPACE}/uploads", | |
| headers={"Authorization": f"Bearer {HF_TOKEN}"}, | |
| files={"file": (file.filename, open(file_path, "rb"))} | |
| ) | |
| if response.status_code != 200: | |
| raise HTTPException(status_code=500, detail="Error al respaldar archivo en Hugging Face") | |
| user_files.append(file.filename) | |
| return {"message": "Archivo subido con éxito", "filename": file.filename, "url": f"/download/{username}/{file.filename}"} | |
| async def download_file(username: str, filename: str): | |
| user_files = files_db.get(username) | |
| if not user_files or filename not in user_files: | |
| raise HTTPException(status_code=404, detail="Archivo no encontrado") | |
| file_path = os.path.join(UPLOAD_DIRECTORY, username, filename) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="Archivo no encontrado") | |
| return FileResponse(file_path) | |
| async def home(): | |
| with open("index.html", "r") as f: | |
| return f.read() | |
| async def get_styles(): | |
| file_path = "styles.css" | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="CSS no encontrado") | |
| return FileResponse(file_path) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |