Samuel_4.0 / main.py
Lukeetah's picture
Upload 13 files
5b0bb4b verified
# main.py
# Orquestador principal de FastAPI: Endpoints, WebSockets y ciclo de vida de la aplicación.
# VERSIÓN 2.0: ARQUITECTURA SUPERIOR
import os
import asyncio
from typing import List, Optional
from datetime import datetime
from fastapi import FastAPI, WebSocket, Depends, HTTPException, status, Request, WebSocketDisconnect
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
import auth
import core
import database
import ui
from database import User, Memory
# --- Modelos de Datos Pydantic (Abstracción y Auto-Validación) ---
class UserCreate(BaseModel):
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
token_type: str
class InitiationAnswer(BaseModel):
question: str
answer: str
class MemoryResponse(BaseModel):
content: str
timestamp: datetime
class Config:
orm_mode = True
# --- Inicialización de la Aplicación ---
app = FastAPI(
title="Samuel v2.0 - El Confidente Digital Argentino",
description="Una arquitectura cognitiva y de sistemas de nivel superior.",
version="2.0.0"
)
if not os.path.exists("static"):
os.makedirs("static")
app.mount("/static", StaticFiles(directory="static"), name="static")
# --- Ciclo de Vida de la Aplicación y Chequeos de Salud ---
@app.on_event("startup")
def on_startup():
"""Tareas a ejecutar al iniciar la aplicación con validaciones críticas."""
print("Iniciando Samuel v2.1 (Hardened)...")
# Principio Fail-Fast: si faltan secretos, la aplicación no debe iniciar.
if not os.getenv("JWT_SECRET_KEY"):
raise RuntimeError("FATAL: JWT_SECRET_KEY no configurada.")
if not os.getenv("GEMINI_API_KEY"):
raise RuntimeError("FATAL: GEMINI_API_KEY no configurada.")
database.create_db_and_tables()
print("Base de datos verificada y lista.")
core.load_tts_model()
print("Samuel v2.1 está listo para conversar.")
@app.get("/health", status_code=status.HTTP_200_OK)
def health_check():
"""Endpoint para el Health Check del orquestador de contenedores."""
return {"status": "ok"}
# --- Endpoints de Autenticación y Registro (Actualizados con Pydantic) ---
# --- Endpoints de Autenticación y Registro (Actualizados con Pydantic) ---
@app.post("/register", status_code=status.HTTP_201_CREATED)
def register_user(user: UserCreate, db: Session = Depends(database.get_db)):
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="El email ya está registrado.")
hashed_password = auth.get_password_hash(user.password)
new_user = User(email=user.email, hashed_password=hashed_password)
db.add(new_user)
db.commit()
return {"message": f"Usuario {user.email} creado."}
@app.post("/token", response_class=JSONResponse)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(database.get_db)):
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not auth.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
)
access_token = auth.create_access_token(data={"sub": user.email})
response = JSONResponse(content={"message": "Autenticación exitosa"})
response.set_cookie(key="access_token", value=f"Bearer {access_token}", httponly=True, samesite='lax')
return response
@app.post("/logout")
def logout():
response = JSONResponse(content={"message": "Sesión cerrada"})
response.delete_cookie("access_token")
return response
# --- Endpoints de la Interfaz de Usuario (HTML) ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request, db: Session = Depends(database.get_db)):
token = request.cookies.get("access_token")
if not token:
return ui.get_login_page()
try:
user = await auth.get_current_user(token.replace("Bearer ", ""), db)
if not user.has_completed_initiation:
return ui.get_initiation_page()
return ui.get_main_app_page()
except HTTPException:
# Token inválido, expira la cookie y muestra el login
response = HTMLResponse(content=ui.get_login_page())
response.delete_cookie("access_token")
return response
# --- API para el Ritual de Iniciación y Memorias (Actualizados con Pydantic) ---
@app.post("/initiation/answer", status_code=status.HTTP_200_OK)
async def save_initiation_answer(answer_data: InitiationAnswer, current_user: User = Depends(auth.get_current_user), db: Session = Depends(database.get_db)):
memoria_fundacional = f"Memoria Fundacional. Pregunta: '{answer_data.question}' Respuesta: '{answer_data.answer}'"
new_memory = Memory(content=memoria_fundacional, owner=current_user)
db.add(new_memory)
user_memories_count = db.query(Memory).filter(Memory.user_id == current_user.id).count()
if user_memories_count >= 5:
current_user.has_completed_initiation = True
db.commit()
return {"message": "Respuesta guardada."}
@app.get("/api/memories", response_model=List[MemoryResponse])
async def get_user_memories(current_user: User = Depends(auth.get_current_user), db: Session = Depends(database.get_db)):
return db.query(Memory).filter(Memory.user_id == current_user.id).order_by(Memory.timestamp.desc()).all()
# --- WebSocket con Hiper-Paralelismo ---
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, db: Session = Depends(database.get_db)):
await websocket.accept()
user = None
try:
token = websocket.cookies.get("access_token")
if not token:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Token no encontrado")
return
user = await auth.get_current_user(token.replace("Bearer ", ""), db)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Usuario inválido")
return
history = [] # El historial se construye sobre la marcha
while True:
data = await websocket.receive_json()
user_message = data['content']
history.append({"role": "user", "parts": [user_message]})
samuel_response_text = core.get_samuel_response(history)
history.append({"role": "model", "parts": [samuel_response_text]})
# 1. Enviar texto INMEDIATAMENTE
await websocket.send_json({"type": "text", "content": samuel_response_text})
# 2. Crear tarea en paralelo para generar y enviar audio SIN BLOQUEAR
async def generate_and_send_audio(text: str):
loop = asyncio.get_running_loop()
# Ejecutar la función síncrona de TTS en un hilo separado
audio_base64 = await loop.run_in_executor(None, core.generate_audio_base64, text)
if audio_base64:
await websocket.send_json({"type": "audio", "content": audio_base64})
asyncio.create_task(generate_and_send_audio(samuel_response_text))
# 3. Procesar y guardar memoria en la base de datos
if samuel_response_text.startswith("MEMORIA_GENERADA:"):
mem_content = samuel_response_text.replace("MEMORIA_GENERADA:", "").strip()
new_memory = Memory(content=mem_content, owner=user)
db.add(new_memory)
db.commit()
except WebSocketDisconnect:
email = user.email if user else "Cliente desconocido"
print(f"{email} desconectado.")
except Exception as e:
print(f"Error fatal en WebSocket: {e}")
await websocket.close(code=status.WS_1011_INTERNAL_ERROR)