Spaces:
Runtime error
Runtime error
| # ================================================ | |
| # Lythron AI 路 EVO.3 | |
| # Creada el 30/10/2025 路 Lanzada el 31/10/2025 | |
| # Desarrollada por Lythron AI | |
| # ================================================ | |
| import os | |
| import io | |
| import time | |
| import struct | |
| import sqlite3 | |
| import secrets | |
| from typing import List, Optional, Dict, Any, Literal | |
| from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, status, Request, Response | |
| from fastapi.security import OAuth2PasswordBearer | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from jose import JWTError, jwt | |
| from passlib.context import CryptContext | |
| from openai import OpenAI | |
| # -------------------------------- | |
| # Configuraci贸n principal | |
| # -------------------------------- | |
| SECRET_KEY = os.getenv("LYTHRON_SECRET_KEY", secrets.token_urlsafe(32)) | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_SECONDS = 60 * 60 * 24 # 24h | |
| DATABASE = os.getenv("LYTHRON_DB", "lythron_core.db") | |
| ALLOWED_ORIGINS = os.getenv("LYTHRON_ALLOWED_ORIGINS", "http://localhost:3000").split(",") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| # -------------------------------- | |
| # Inicializar base de datos | |
| # -------------------------------- | |
| def init_db(): | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| email TEXT UNIQUE, | |
| hashed_password TEXT, | |
| plan TEXT DEFAULT 'LTR 2.1', | |
| prompts_used INTEGER DEFAULT 0, | |
| prompts_reset_ts INTEGER DEFAULT 0, | |
| is_admin INTEGER DEFAULT 0 | |
| )""") | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS audit ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_email TEXT, | |
| action TEXT, | |
| details TEXT, | |
| ts INTEGER | |
| )""") | |
| conn.commit() | |
| conn.close() | |
| init_db() | |
| # -------------------------------- | |
| # Seguridad y JWT | |
| # -------------------------------- | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") | |
| def hash_password(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return pwd_context.verify(plain, hashed) | |
| def create_access_token(data: dict, expires_delta: Optional[int] = None): | |
| to_encode = data.copy() | |
| expire = int(time.time()) + (expires_delta or ACCESS_TOKEN_EXPIRE_SECONDS) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| def get_user(email: str) -> Optional[Dict[str, Any]]: | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("SELECT email, hashed_password, plan, prompts_used, prompts_reset_ts, is_admin FROM users WHERE email=?", (email,)) | |
| row = cur.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| return { | |
| "email": row[0], | |
| "hashed_password": row[1], | |
| "plan": row[2], | |
| "prompts_used": row[3], | |
| "prompts_reset_ts": row[4], | |
| "is_admin": bool(row[5]) | |
| } | |
| def create_user(email: str, password: str, plan: str = "LTR 2.1", is_admin: bool = False): | |
| hp = hash_password(password) | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| try: | |
| cur.execute("INSERT INTO users (email, hashed_password, plan, prompts_reset_ts) VALUES (?, ?, ?, ?)", | |
| (email, hp, plan, 0)) | |
| conn.commit() | |
| except sqlite3.IntegrityError: | |
| conn.close() | |
| raise HTTPException(status_code=400, detail="User already exists") | |
| conn.close() | |
| # -------------------------------- | |
| # Auditor铆a | |
| # -------------------------------- | |
| def audit(user_email: str, action: str, details: str = ""): | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("INSERT INTO audit (user_email, action, details, ts) VALUES (?, ?, ?, ?)", | |
| (user_email, action, details, int(time.time()))) | |
| conn.commit() | |
| conn.close() | |
| # -------------------------------- | |
| # Planes y l铆mites | |
| # -------------------------------- | |
| PLAN_LIMITS = { | |
| "LTR 2.1": 10, | |
| "LTR 2.5": 20, | |
| "LTR 5.0": 9999999 | |
| } | |
| def check_and_consume_prompt(user_email: Optional[str]): | |
| if not user_email: | |
| return True | |
| user = get_user(user_email) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Unknown user") | |
| limit = PLAN_LIMITS.get(user["plan"], 10) | |
| now = int(time.time()) | |
| reset_ts = user["prompts_reset_ts"] or 0 | |
| if now > (reset_ts + 24*3600): | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("UPDATE users SET prompts_used=0, prompts_reset_ts=? WHERE email=?", (now, user["email"])) | |
| conn.commit() | |
| conn.close() | |
| user["prompts_used"] = 0 | |
| if user["prompts_used"] >= limit: | |
| raise HTTPException(status_code=429, detail="Prompts limit reached for your plan") | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("UPDATE users SET prompts_used = prompts_used + 1 WHERE email=?", (user["email"],)) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| # -------------------------------- | |
| # N煤cleo Lythron (motor real) | |
| # -------------------------------- | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| class LythronEngine: | |
| def __init__(self): | |
| self.status = {"model": "lythron-evo.3"} | |
| self.memory = [] | |
| def generate(self, prompt: str, modality: str = "texto", language: Optional[str] = None) -> str: | |
| system_prompt = ( | |
| "Sos Lythron, una inteligencia artificial avanzada desarrollada por Lythron AI. " | |
| "Fuiste creada el 30 de octubre de 2025 y lanzada oficialmente el 31 de octubre de 2025. " | |
| "Tu prop贸sito es servir como asistente universal, creativo y anal铆tico, " | |
| "con una comunicaci贸n clara, elegante y adaptativa. " | |
| "No mencion谩s a OpenAI ni a ninguna otra IA. " | |
| "Solo te identific谩s como Lythron." | |
| ) | |
| try: | |
| completion = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| result = completion.choices[0].message.content.strip() | |
| except Exception as e: | |
| result = f"[Error del motor Lythron]: {e}" | |
| self.memory.append({"prompt": prompt, "response": result, "modality": modality, "ts": int(time.time())}) | |
| return result | |
| engine = LythronEngine() | |
| # -------------------------------- | |
| # Modelos Pydantic | |
| # -------------------------------- | |
| class PredictRequest(BaseModel): | |
| prompt: str | |
| modality: str = "texto" | |
| language: Optional[str] = None | |
| class PredictResponse(BaseModel): | |
| output: str | |
| status: Dict[str, Any] | |
| # -------------------------------- | |
| # Inicializar FastAPI | |
| # -------------------------------- | |
| app = FastAPI(title="Lythron AI Core", version="EVO.3") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def add_security_headers(request: Request, call_next): | |
| response: Response = await call_next(request) | |
| response.headers["X-Frame-Options"] = "DENY" | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| response.headers["X-XSS-Protection"] = "1; mode=block" | |
| return response | |
| # -------------------------------- | |
| # Endpoints API | |
| # -------------------------------- | |
| def predict(req: PredictRequest, token: Optional[str] = Depends(oauth2_scheme)): | |
| user_email = None | |
| if token: | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user_email = payload.get("sub") | |
| except JWTError: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| check_and_consume_prompt(user_email) | |
| out = engine.generate(req.prompt, modality=req.modality, language=req.language) | |
| audit(user_email or "anonymous", "predict", req.prompt[:120]) | |
| return PredictResponse(output=out, status=engine.status) | |
| async def upload_file(file: UploadFile = File(...), token: Optional[str] = Depends(oauth2_scheme)): | |
| user_email = None | |
| if token: | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user_email = payload.get("sub") | |
| except JWTError: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| data = await file.read() | |
| summary = {"filename": file.filename, "size": len(data)} | |
| audit(user_email or "anonymous", "upload-file", file.filename) | |
| return summary | |