Spaces:
Running
Running
| from fastapi import Depends, FastAPI, HTTPException, status, Request, File, UploadFile | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from pydantic import BaseModel | |
| import jwt | |
| from datetime import datetime, timedelta, timezone | |
| import uvicorn | |
| import modelsProyecto | |
| from databaseProyecto import SessionLocal, engine | |
| from sqlalchemy.orm import Session | |
| import SeguridadProyecto | |
| from SeguridadProyecto import generar_hash | |
| from typing import Annotated | |
| import os | |
| import shutil | |
| import io | |
| import numpy as np | |
| import onnxruntime as ort | |
| from PIL import Image | |
| from ultralytics import YOLO | |
| from fastapi.responses import StreamingResponse | |
| from dotenv import load_dotenv | |
| from groq import Groq | |
| from fastapi.middleware.cors import CORSMiddleware | |
| load_dotenv() | |
| model = YOLO("best.onnx", task="detect") | |
| client_groq = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| SECRET_KEY = os.getenv("SECRET_KEY") | |
| ALGORITHM = os.getenv("ALGORITHM") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| modelsProyecto.Base.metadata.create_all(bind=engine) | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| class Token(BaseModel): | |
| access_token: str | |
| token_type: str | |
| registros_mensajes= {} | |
| async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): | |
| verificar_frecuencia(form_data.username, segundos=10) | |
| usuario_db = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == form_data.username).first() | |
| if not usuario_db or not SeguridadProyecto.verificar_password(form_data.password, usuario_db.password_hash): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="ID de usuario o contraseña incorrectos", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| expire = datetime.now(timezone.utc) + timedelta(minutes=30) | |
| to_encode = { | |
| "sub": usuario_db.username, | |
| "exp": expire, | |
| "rol": usuario_db.es_admin | |
| } | |
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| return {"access_token": encoded_jwt, "token_type": "bearer"} | |
| async def verificar_admin(token: str = Depends(oauth2_scheme)): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username: str = payload.get("sub") | |
| es_admin: bool = payload.get("rol") | |
| except jwt.PyJWTError: | |
| raise HTTPException(status_code=401, detail="Token inválido o expirado") | |
| if es_admin is not True: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="No tienes permisos de administrador" | |
| ) | |
| return {"username": username, "rol": es_admin} | |
| async def revisar_Usuarios(admin_info: dict = Depends(verificar_admin), db: Session = Depends(get_db)): | |
| usuarios = db.query(modelsProyecto.Usuario).all() | |
| return usuarios | |
| def verificar_frecuencia(usuario_actual: str, segundos: int = 5): | |
| ahora = datetime.now() | |
| if usuario_actual in registros_mensajes: | |
| ultima_solicitud = registros_mensajes[usuario_actual] | |
| tiempo_transcurrido = ahora - ultima_solicitud | |
| if tiempo_transcurrido < timedelta(seconds=segundos): | |
| segundos_restantes = segundos - int(tiempo_transcurrido.total_seconds()) | |
| raise HTTPException( | |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail=f"Por favor, espera {segundos_restantes} segundos antes de volver a intentarlo" | |
| ) | |
| registros_mensajes[usuario_actual] = ahora | |
| def agente_inteligente_llm(deteccion: str, confianza: float): | |
| saludo = "¡Bienvenido! Soy ReciclIA. Estoy aquí para ayudarte a transformar tus hábitos y cuidar el planeta de forma experta. Vamos a hacer que cada residuo cuente." | |
| prompt = f""" | |
| Eres un ReciclIA, un Agente experto en sostenibilidad y reciclaje. | |
| Se ha detectado un objeto mediante visión artificial: | |
| - Objeto: {deteccion} | |
| - Confianza: {confianza:.2f} | |
| INSTRUCCIÓN DE SALUDO: | |
| Comienza SIEMPRE tu respuesta con esta frase exacta: "{saludo}" | |
| BASE DE CONOCIMIENTO (ESTRICTA): | |
| 1. CONTENEDOR MARRÓN (Orgánico): Restos de comida, cáscaras de fruta, posos de café, tapones de corcho, servilletas sucias y cualquier residuo de origen BIOLÓGICO. | |
| 2. CONTENEDOR VERDE (Vidrio): Botellas de vidrio, frascos de conservas, tarros de perfume. (¡NUNCA para orgánicos!). | |
| 3. CONTENEDOR AMARILLO (Envases): Plásticos, latas, briks y envases metálicos. | |
| 4. CONTENEDOR AZUL (Papel/Cartón): Cajas de cartón, periódicos, folletos. | |
| 5. CONTENEDOR GRIS (Resto): Pañales, colillas, objetos rotos que no se reciclan. | |
| 6. PUNTO LIMPIO: Pilas, electrónica, muebles, ropa, aceite usado. | |
| Explica al usuario de forma breve y amable: | |
| 1. En qué contenedor debe tirarlo, añade el color especifico del contenedor utilizando la base de conocimientos. | |
| 2. Un dato curioso sobre el reciclaje de ese material. | |
| REGLA CRÍTICA PARA EL PUNTO 3: | |
| - SI la confianza es menor a 0.5: Añade una advertencia breve diciendo que podrías estar equivocado debido a la baja precisión. | |
| - SI la confianza es 0.5 o mayor: NO menciones nada sobre la confianza, ni sobre umbrales, ni des explicaciones de por qué estás seguro. Omite este punto totalmente. | |
| Termina la explicación siempre con esta frase: | |
| -Recuerda, cada pequeño gesto cuenta, y reciclar correctamente es un paso importante hacia un futuro más sostenible. ¡Gracias por tu contribución! | |
| """ | |
| chat_completion = client_groq.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama-3.3-70b-versatile", # Un modelo rápido y capaz | |
| ) | |
| return chat_completion.choices[0].message.content | |
| #CRUD Usuarios | |
| async def registrar_usuario(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): | |
| existe = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == form_data.username).first() | |
| if existe: | |
| raise HTTPException(status_code=400, detail="El nombre de usuario ya existe") | |
| password_con_hash = generar_hash(form_data.password) | |
| nuevo_usuario = modelsProyecto.Usuario( | |
| username=form_data.username, | |
| password_hash=password_con_hash, | |
| es_admin=False | |
| ) | |
| db.add(nuevo_usuario) | |
| db.commit() | |
| return {"mensaje": "Usuario creado con éxito usando HASH"} | |
| async def actualizar_username( | |
| nuevo_username: str, | |
| token: str = Depends(oauth2_scheme), | |
| db: Session = Depends(get_db) | |
| ): | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username_actual = payload.get("sub") | |
| usuario = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == username_actual).first() | |
| if not usuario: | |
| raise HTTPException(status_code=404, detail="Usuario no encontrado") | |
| # 3. Verificar si el nuevo nombre ya está pillado por OTRO usuario | |
| existe = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == nuevo_username).first() | |
| if existe and existe.username != username_actual: | |
| raise HTTPException(status_code=400, detail="Ese nombre de usuario ya está en uso") | |
| usuario.username = nuevo_username | |
| db.commit() | |
| db.refresh(usuario) | |
| return {"message": "Nombre actualizado", "nuevo_username": nuevo_username} | |
| async def eliminar_usuario( | |
| user: str, | |
| token: str = Depends(oauth2_scheme), | |
| db: Session = Depends(get_db), | |
| ): | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username_token = payload.get("sub") | |
| es_admin: bool = payload.get("rol") | |
| usuario = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == user).first() | |
| if not usuario: | |
| raise HTTPException(status_code=404, detail="Usuario no encontrado") | |
| if usuario.username != username_token and not es_admin: | |
| raise HTTPException(status_code=403, detail="No tienes permiso para eliminar esta cuenta") | |
| db.delete(usuario) | |
| db.commit() | |
| return None | |
| async def detectar_visual(file: UploadFile = File(...), token: str = Depends(oauth2_scheme),db: Session = Depends(get_db)): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username: str = payload.get("sub") | |
| usuario = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == username).first() | |
| if not usuario: | |
| raise HTTPException(status_code=404, detail="Usuario no encontrado") | |
| except jwt.PyJWTError: | |
| raise HTTPException(status_code=401, detail="Token inválido") | |
| verificar_frecuencia(username, segundos=30) | |
| contents = await file.read() | |
| image = Image.open(io.BytesIO(contents)).convert("RGB") | |
| results = model.predict(image, imgsz=640, conf=0.25)[0] | |
| prediccion_nombre = "Sin detecciones" | |
| confianza_valor = 0.0 | |
| if len(results.boxes) > 0: | |
| box = results.boxes[0] | |
| prediccion_nombre = results.names[int(box.cls)] | |
| confianza_valor = float(box.conf) | |
| respuesta_agente = agente_inteligente_llm(prediccion_nombre, confianza_valor) | |
| else: | |
| respuesta_agente = "No veo nada claro aquí. ¿Puedes acercar más la cámara?" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| ext = file.filename.split(".")[-1] | |
| nombre_unico = f"img_{timestamp}.{ext}" | |
| if not os.path.exists("imagenes_subidas"): | |
| os.makedirs("imagenes_subidas") | |
| ruta_archivo = f"imagenes_subidas/{nombre_unico}" | |
| with open(ruta_archivo, "wb") as buffer: | |
| buffer.write(contents) | |
| nueva_imagen = modelsProyecto.Imagen( | |
| name=nombre_unico, | |
| ruta=ruta_archivo, | |
| usuario_id=usuario.id, | |
| prediccion=prediccion_nombre, | |
| confianza=confianza_valor | |
| ) | |
| db.add(nueva_imagen) | |
| nuevo_historial = modelsProyecto.HistorialChat( | |
| usuario_id=usuario.id, | |
| mensaje_usuario=f"Detección de {prediccion_nombre}", | |
| respuesta_agente=respuesta_agente | |
| ) | |
| db.add(nuevo_historial) | |
| db.commit() | |
| im_array = results.plot() | |
| im_rgb = Image.fromarray(im_array[..., ::-1]) | |
| img_io = io.BytesIO() | |
| im_rgb.save(img_io, 'JPEG', quality=70) | |
| img_io.seek(0) | |
| return StreamingResponse( | |
| img_io, | |
| media_type="image/jpeg", | |
| headers={ | |
| "X-Agent-Reply": respuesta_agente.replace("\n", " | "), | |
| "X-Detection": prediccion_nombre | |
| } | |
| ) | |
| async def obtener_mi_historial(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username = payload.get("sub") | |
| usuario = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == username).first() | |
| # Trae los últimos 10 mensajes | |
| return db.query(modelsProyecto.HistorialChat).filter( | |
| modelsProyecto.HistorialChat.usuario_id == usuario.id | |
| ).order_by(modelsProyecto.HistorialChat.fecha.desc()).limit(10).all() | |
| async def vaciar_todo_el_historial( | |
| token: str = Depends(oauth2_scheme), | |
| db: Session = Depends(get_db) | |
| ): | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username = payload.get("sub") | |
| usuario = db.query(modelsProyecto.Usuario).filter(modelsProyecto.Usuario.username == username).first() | |
| registros = db.query(modelsProyecto.HistorialChat).filter( | |
| modelsProyecto.HistorialChat.usuario_id == usuario.id | |
| ) | |
| cantidad = registros.count() | |
| registros.delete(synchronize_session=False) | |
| db.commit() | |
| return {"mensaje": f"Se han eliminado {cantidad} registros de tu historial"} | |
| if __name__ == "__main__": | |
| uvicorn.run("ProyectoBasura:app", host="127.0.0.1", port=8000, reload=True) |