IseaFire / app.py
SorovotPelo's picture
Update app.py
e715199 verified
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form
from fastapi.responses import HTMLResponse, FileResponse
from starlette.responses import Response
from pydantic import BaseModel
from typing import List
from passlib.context import CryptContext
import os
import uvicorn
import shutil
import requests
app = FastAPI()
# Configuración de seguridad para las contraseñas
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Directorios para subir archivos y para respaldos en Hugging Face
UPLOAD_DIRECTORY = "/mnt/data/uploads"
BACKUP_SPACE = "SorovotPelo/IseaUploadsArchive"
HF_TOKEN = os.getenv("HF_API_TOKEN") # Obtén el token desde las variables de entorno
# Crear el directorio de uploads si no existe
os.makedirs(UPLOAD_DIRECTORY, exist_ok=True)
# Base de datos simulada
users_db = {}
files_db = {}
class User(BaseModel):
username: str
password: str
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def authenticate_user(username: str, password: str):
user = users_db.get(username)
if user and verify_password(password, user["password"]):
return user
return False
@app.post("/register")
async def register(username: str = Form(...), password: str = Form(...)):
if username in users_db:
raise HTTPException(status_code=400, detail="Usuario ya existe")
hashed_password = get_password_hash(password)
users_db[username] = {"username": username, "password": hashed_password}
files_db[username] = []
return {"message": "Usuario registrado con éxito"}
@app.post("/login")
async def login(username: str = Form(...), password: str = Form(...)):
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=400, detail="Usuario o contraseña incorrectos")
return {"message": f"Bienvenido {username}"}
@app.post("/upload")
async def upload_file(username: str = Form(...), file: UploadFile = File(...)):
user_files = files_db.get(username)
if not user_files:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
file_path = os.path.join(UPLOAD_DIRECTORY, username, file.filename)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
shutil.copyfileobj(file.file, f)
# Respaldo en Hugging Face Spaces
response = requests.post(
f"https://huggingface.co/api/spaces/{BACKUP_SPACE}/uploads",
headers={"Authorization": f"Bearer {HF_TOKEN}"},
files={"file": (file.filename, open(file_path, "rb"))}
)
if response.status_code != 200:
raise HTTPException(status_code=500, detail="Error al respaldar archivo en Hugging Face")
user_files.append(file.filename)
return {"message": "Archivo subido con éxito", "filename": file.filename, "url": f"/download/{username}/{file.filename}"}
@app.get("/download/{username}/{filename}")
async def download_file(username: str, filename: str):
user_files = files_db.get(username)
if not user_files or filename not in user_files:
raise HTTPException(status_code=404, detail="Archivo no encontrado")
file_path = os.path.join(UPLOAD_DIRECTORY, username, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="Archivo no encontrado")
return FileResponse(file_path)
@app.get("/", response_class=HTMLResponse)
async def home():
with open("index.html", "r") as f:
return f.read()
@app.get("/styles.css", response_class=FileResponse)
async def get_styles():
file_path = "styles.css"
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="CSS no encontrado")
return FileResponse(file_path)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)