import os import os import asyncio import httpx import asyncio import traceback from typing import Optional, List, Dict, Union from fastapi import FastAPI, UploadFile, File, Form, Body, Header from fastapi.responses import JSONResponse from pydantic import BaseModel from core.mvi_ai_full import MVI_AI from reasoning.mvi_instruction_engine import build_system_prompt from generation.responder import Responder # -------- APP INIT -------- app = FastAPI(title="MVI-AI Multimodal Engine") # -------- SAFE MVI_AI STARTUP -------- try: mvi_ai = MVI_AI() except Exception as e: print(f"[STARTUP WARNING] MVI_AI failed to initialize: {e}") mvi_ai = None # -------- RESONDER -------- responder = Responder() # -------- SESSION MEMORY -------- SESSION_MEMORY: Dict[str, List[str]] = {} MAX_HISTORY = 10 def get_session_id(x_session_id: Optional[str] = Header(None)) -> str: return x_session_id or "default_session" # -------- DATA MODEL -------- class UserInput(BaseModel): text: Optional[str] = None # -------- SYSTEM PROMPT BUILDER -------- def prepare_system_prompt(user_input: str, runtime_prompt: Optional[str] = None) -> str: external_prompt = """ You are MVI-AI. Maintain safety, clarity, and ethical behavior. """ return build_system_prompt( external_prompt=external_prompt, user_prompt=runtime_prompt or user_input ) # -------- ROOT & HEALTH -------- @app.get("/") async def root(): return {"message": "MVI-AI Engine is running. Use /health to check status."} @app.get("/health") async def health(): return {"status": "ok", "mvi_ai_ready": bool(mvi_ai)} # -------- SELF-PING / KEEP-ALIVE -------- SELF_URL = "https://musombi-mvi-ai-engine.hf.space/health" PING_INTERVAL = 5 * 60 # every 5 minutes async def keep_alive(): async with httpx.AsyncClient() as client: while True: try: response = await client.get(SELF_URL) if response.status_code == 200: print("[KEEP-ALIVE] Ping successful") else: print(f"[KEEP-ALIVE] Ping failed: {response.status_code}") except Exception as e: print(f"[KEEP-ALIVE] Ping error: {e}") await asyncio.sleep(PING_INTERVAL) @app.on_event("startup") async def startup_event(): asyncio.create_task(keep_alive()) # -------- CORE HANDLER -------- async def handle_ask( text: Optional[str] = None, input_data: Optional[UserInput] = None, audio_file: Optional[UploadFile] = None, image_file: Optional[UploadFile] = None, video_file: Optional[UploadFile] = None, system_prompt: Optional[str] = None, session_id: str = "default_session", ): # MVI_AI not available if mvi_ai is None: return JSONResponse( status_code=503, content={"error": "MVI_AI not available. Check logs for model load errors."} ) # Determine actual text input actual_text = text or (input_data.text if input_data else "") if not actual_text and not any([audio_file, image_file, video_file]): return JSONResponse(status_code=400, content={"error": "No input provided"}) # ----------------------- # Session memory handling # ----------------------- session_history = SESSION_MEMORY.get(session_id, []) if actual_text: session_history.append(actual_text) SESSION_MEMORY[session_id] = session_history[-MAX_HISTORY:] # ----------------------- # Detect modality # ----------------------- modality = "text" if audio_file: modality = "audio" elif image_file: modality = "image" elif video_file: modality = "video" # Build final system prompt final_prompt = prepare_system_prompt(actual_text, runtime_prompt=system_prompt) # ----------------------- # Run MVI_AI safely # ----------------------- try: mvi_response = await mvi_ai.ask( text=actual_text, audio=audio_file, image=image_file, video=video_file, system_prompt=final_prompt ) except Exception: traceback.print_exc() return JSONResponse(status_code=500, content={"error": "MVI_AI internal error (see logs)"}) # ----------------------- # Generate humanized output safely # ----------------------- try: response_data = responder.generate( plan={ "emotion": mvi_response.get("emotion", {"label":"neutral"}).get("label","neutral"), "reasoning_depth": "high" } ) # Save code if generated code_file_info = None generated_code = mvi_response.get("generated_code") if generated_code: lang, filename = responder.detect_language(generated_code) if lang: path = responder.save_code_file(filename, generated_code) code_file_info = {"language": lang, "file_path": path} return { "success": True, "response": response_data, "raw_mvi_ai": mvi_response, "code_file": code_file_info } except Exception: traceback.print_exc() return JSONResponse(status_code=500, content={"error": "Responder failed to generate response"}) # -------- /ASK ENDPOINT -------- @app.post("/ask") async def ask( input_data: Optional[UserInput] = Body(None), message: Optional[str] = Body(None), text: Optional[str] = Form(None), system_prompt: Optional[str] = Form(None), audio_file: Optional[UploadFile] = File(None), image_file: Optional[UploadFile] = File(None), video_file: Optional[UploadFile] = File(None), x_session_id: Optional[str] = Header(None) ): final_text = (input_data.text if input_data else None) or message or text session_id = get_session_id(x_session_id) return await handle_ask( text=final_text, input_data=input_data, audio_file=audio_file, image_file=image_file, video_file=video_file, system_prompt=system_prompt, session_id=session_id ) # -------- /PREDICT ENDPOINT -------- @app.post("/predict") async def predict( input_data: Optional[UserInput] = Body(None), message: Optional[str] = Body(None), text: Optional[str] = Form(None), system_prompt: Optional[str] = Form(None), audio_file: Optional[UploadFile] = File(None), image_file: Optional[UploadFile] = File(None), video_file: Optional[UploadFile] = File(None), x_session_id: Optional[str] = Header(None) ): final_text = message or text or (input_data.text if input_data else None) session_id = get_session_id(x_session_id) return await handle_ask( text=final_text, input_data=input_data, audio_file=audio_file, image_file=image_file, video_file=video_file, system_prompt=system_prompt, session_id=session_id ) @app.get("/health") async def health_check(): return {"status": "ok"}