# main.py # Orquestador principal de FastAPI: Endpoints, WebSockets y ciclo de vida de la aplicación. # VERSIÓN 2.0: ARQUITECTURA SUPERIOR import os import asyncio from typing import List, Optional from datetime import datetime from fastapi import FastAPI, WebSocket, Depends, HTTPException, status, Request, WebSocketDisconnect from fastapi.responses import HTMLResponse, JSONResponse from fastapi.security import OAuth2PasswordRequestForm from fastapi.staticfiles import StaticFiles from sqlalchemy.orm import Session from pydantic import BaseModel, EmailStr import auth import core import database import ui from database import User, Memory # --- Modelos de Datos Pydantic (Abstracción y Auto-Validación) --- class UserCreate(BaseModel): email: EmailStr password: str class Token(BaseModel): access_token: str token_type: str class InitiationAnswer(BaseModel): question: str answer: str class MemoryResponse(BaseModel): content: str timestamp: datetime class Config: orm_mode = True # --- Inicialización de la Aplicación --- app = FastAPI( title="Samuel v2.0 - El Confidente Digital Argentino", description="Una arquitectura cognitiva y de sistemas de nivel superior.", version="2.0.0" ) if not os.path.exists("static"): os.makedirs("static") app.mount("/static", StaticFiles(directory="static"), name="static") # --- Ciclo de Vida de la Aplicación y Chequeos de Salud --- @app.on_event("startup") def on_startup(): """Tareas a ejecutar al iniciar la aplicación con validaciones críticas.""" print("Iniciando Samuel v2.1 (Hardened)...") # Principio Fail-Fast: si faltan secretos, la aplicación no debe iniciar. if not os.getenv("JWT_SECRET_KEY"): raise RuntimeError("FATAL: JWT_SECRET_KEY no configurada.") if not os.getenv("GEMINI_API_KEY"): raise RuntimeError("FATAL: GEMINI_API_KEY no configurada.") database.create_db_and_tables() print("Base de datos verificada y lista.") core.load_tts_model() print("Samuel v2.1 está listo para conversar.") @app.get("/health", status_code=status.HTTP_200_OK) def health_check(): """Endpoint para el Health Check del orquestador de contenedores.""" return {"status": "ok"} # --- Endpoints de Autenticación y Registro (Actualizados con Pydantic) --- # --- Endpoints de Autenticación y Registro (Actualizados con Pydantic) --- @app.post("/register", status_code=status.HTTP_201_CREATED) def register_user(user: UserCreate, db: Session = Depends(database.get_db)): db_user = db.query(User).filter(User.email == user.email).first() if db_user: raise HTTPException(status_code=400, detail="El email ya está registrado.") hashed_password = auth.get_password_hash(user.password) new_user = User(email=user.email, hashed_password=hashed_password) db.add(new_user) db.commit() return {"message": f"Usuario {user.email} creado."} @app.post("/token", response_class=JSONResponse) def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(database.get_db)): user = db.query(User).filter(User.email == form_data.username).first() if not user or not auth.verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Email o contraseña incorrectos", ) access_token = auth.create_access_token(data={"sub": user.email}) response = JSONResponse(content={"message": "Autenticación exitosa"}) response.set_cookie(key="access_token", value=f"Bearer {access_token}", httponly=True, samesite='lax') return response @app.post("/logout") def logout(): response = JSONResponse(content={"message": "Sesión cerrada"}) response.delete_cookie("access_token") return response # --- Endpoints de la Interfaz de Usuario (HTML) --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request, db: Session = Depends(database.get_db)): token = request.cookies.get("access_token") if not token: return ui.get_login_page() try: user = await auth.get_current_user(token.replace("Bearer ", ""), db) if not user.has_completed_initiation: return ui.get_initiation_page() return ui.get_main_app_page() except HTTPException: # Token inválido, expira la cookie y muestra el login response = HTMLResponse(content=ui.get_login_page()) response.delete_cookie("access_token") return response # --- API para el Ritual de Iniciación y Memorias (Actualizados con Pydantic) --- @app.post("/initiation/answer", status_code=status.HTTP_200_OK) async def save_initiation_answer(answer_data: InitiationAnswer, current_user: User = Depends(auth.get_current_user), db: Session = Depends(database.get_db)): memoria_fundacional = f"Memoria Fundacional. Pregunta: '{answer_data.question}' Respuesta: '{answer_data.answer}'" new_memory = Memory(content=memoria_fundacional, owner=current_user) db.add(new_memory) user_memories_count = db.query(Memory).filter(Memory.user_id == current_user.id).count() if user_memories_count >= 5: current_user.has_completed_initiation = True db.commit() return {"message": "Respuesta guardada."} @app.get("/api/memories", response_model=List[MemoryResponse]) async def get_user_memories(current_user: User = Depends(auth.get_current_user), db: Session = Depends(database.get_db)): return db.query(Memory).filter(Memory.user_id == current_user.id).order_by(Memory.timestamp.desc()).all() # --- WebSocket con Hiper-Paralelismo --- @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket, db: Session = Depends(database.get_db)): await websocket.accept() user = None try: token = websocket.cookies.get("access_token") if not token: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Token no encontrado") return user = await auth.get_current_user(token.replace("Bearer ", ""), db) if not user: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Usuario inválido") return history = [] # El historial se construye sobre la marcha while True: data = await websocket.receive_json() user_message = data['content'] history.append({"role": "user", "parts": [user_message]}) samuel_response_text = core.get_samuel_response(history) history.append({"role": "model", "parts": [samuel_response_text]}) # 1. Enviar texto INMEDIATAMENTE await websocket.send_json({"type": "text", "content": samuel_response_text}) # 2. Crear tarea en paralelo para generar y enviar audio SIN BLOQUEAR async def generate_and_send_audio(text: str): loop = asyncio.get_running_loop() # Ejecutar la función síncrona de TTS en un hilo separado audio_base64 = await loop.run_in_executor(None, core.generate_audio_base64, text) if audio_base64: await websocket.send_json({"type": "audio", "content": audio_base64}) asyncio.create_task(generate_and_send_audio(samuel_response_text)) # 3. Procesar y guardar memoria en la base de datos if samuel_response_text.startswith("MEMORIA_GENERADA:"): mem_content = samuel_response_text.replace("MEMORIA_GENERADA:", "").strip() new_memory = Memory(content=mem_content, owner=user) db.add(new_memory) db.commit() except WebSocketDisconnect: email = user.email if user else "Cliente desconocido" print(f"{email} desconectado.") except Exception as e: print(f"Error fatal en WebSocket: {e}") await websocket.close(code=status.WS_1011_INTERNAL_ERROR)