Spaces:
Running
Running
| import os | |
| import os | |
| import asyncio | |
| import httpx | |
| import asyncio | |
| import traceback | |
| from typing import Optional, List, Dict, Union | |
| from fastapi import FastAPI, UploadFile, File, Form, Body, Header | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from core.mvi_ai_full import MVI_AI | |
| from reasoning.mvi_instruction_engine import build_system_prompt | |
| from generation.responder import Responder | |
| # -------- APP INIT -------- | |
| app = FastAPI(title="MVI-AI Multimodal Engine") | |
| # -------- SAFE MVI_AI STARTUP -------- | |
| try: | |
| mvi_ai = MVI_AI() | |
| except Exception as e: | |
| print(f"[STARTUP WARNING] MVI_AI failed to initialize: {e}") | |
| mvi_ai = None | |
| # -------- RESONDER -------- | |
| responder = Responder() | |
| # -------- SESSION MEMORY -------- | |
| SESSION_MEMORY: Dict[str, List[str]] = {} | |
| MAX_HISTORY = 10 | |
| def get_session_id(x_session_id: Optional[str] = Header(None)) -> str: | |
| return x_session_id or "default_session" | |
| # -------- DATA MODEL -------- | |
| class UserInput(BaseModel): | |
| text: Optional[str] = None | |
| # -------- SYSTEM PROMPT BUILDER -------- | |
| def prepare_system_prompt(user_input: str, runtime_prompt: Optional[str] = None) -> str: | |
| external_prompt = """ | |
| You are MVI-AI. | |
| Maintain safety, clarity, and ethical behavior. | |
| """ | |
| return build_system_prompt( | |
| external_prompt=external_prompt, | |
| user_prompt=runtime_prompt or user_input | |
| ) | |
| # -------- ROOT & HEALTH -------- | |
| async def root(): | |
| return {"message": "MVI-AI Engine is running. Use /health to check status."} | |
| async def health(): | |
| return {"status": "ok", "mvi_ai_ready": bool(mvi_ai)} | |
| # -------- SELF-PING / KEEP-ALIVE -------- | |
| SELF_URL = "https://musombi-mvi-ai-engine.hf.space/health" | |
| PING_INTERVAL = 5 * 60 # every 5 minutes | |
| async def keep_alive(): | |
| async with httpx.AsyncClient() as client: | |
| while True: | |
| try: | |
| response = await client.get(SELF_URL) | |
| if response.status_code == 200: | |
| print("[KEEP-ALIVE] Ping successful") | |
| else: | |
| print(f"[KEEP-ALIVE] Ping failed: {response.status_code}") | |
| except Exception as e: | |
| print(f"[KEEP-ALIVE] Ping error: {e}") | |
| await asyncio.sleep(PING_INTERVAL) | |
| async def startup_event(): | |
| asyncio.create_task(keep_alive()) | |
| # -------- CORE HANDLER -------- | |
| async def handle_ask( | |
| text: Optional[str] = None, | |
| input_data: Optional[UserInput] = None, | |
| audio_file: Optional[UploadFile] = None, | |
| image_file: Optional[UploadFile] = None, | |
| video_file: Optional[UploadFile] = None, | |
| system_prompt: Optional[str] = None, | |
| session_id: str = "default_session", | |
| ): | |
| # MVI_AI not available | |
| if mvi_ai is None: | |
| return JSONResponse( | |
| status_code=503, | |
| content={"error": "MVI_AI not available. Check logs for model load errors."} | |
| ) | |
| # Determine actual text input | |
| actual_text = text or (input_data.text if input_data else "") | |
| if not actual_text and not any([audio_file, image_file, video_file]): | |
| return JSONResponse(status_code=400, content={"error": "No input provided"}) | |
| # ----------------------- | |
| # Session memory handling | |
| # ----------------------- | |
| session_history = SESSION_MEMORY.get(session_id, []) | |
| if actual_text: | |
| session_history.append(actual_text) | |
| SESSION_MEMORY[session_id] = session_history[-MAX_HISTORY:] | |
| # ----------------------- | |
| # Detect modality | |
| # ----------------------- | |
| modality = "text" | |
| if audio_file: | |
| modality = "audio" | |
| elif image_file: | |
| modality = "image" | |
| elif video_file: | |
| modality = "video" | |
| # Build final system prompt | |
| final_prompt = prepare_system_prompt(actual_text, runtime_prompt=system_prompt) | |
| # ----------------------- | |
| # Run MVI_AI safely | |
| # ----------------------- | |
| try: | |
| mvi_response = await mvi_ai.ask( | |
| text=actual_text, | |
| audio=audio_file, | |
| image=image_file, | |
| video=video_file, | |
| system_prompt=final_prompt | |
| ) | |
| except Exception: | |
| traceback.print_exc() | |
| return JSONResponse(status_code=500, content={"error": "MVI_AI internal error (see logs)"}) | |
| # ----------------------- | |
| # Generate humanized output safely | |
| # ----------------------- | |
| try: | |
| response_data = responder.generate( | |
| plan={ | |
| "emotion": mvi_response.get("emotion", {"label":"neutral"}).get("label","neutral"), | |
| "reasoning_depth": "high" | |
| } | |
| ) | |
| # Save code if generated | |
| code_file_info = None | |
| generated_code = mvi_response.get("generated_code") | |
| if generated_code: | |
| lang, filename = responder.detect_language(generated_code) | |
| if lang: | |
| path = responder.save_code_file(filename, generated_code) | |
| code_file_info = {"language": lang, "file_path": path} | |
| return { | |
| "success": True, | |
| "response": response_data, | |
| "raw_mvi_ai": mvi_response, | |
| "code_file": code_file_info | |
| } | |
| except Exception: | |
| traceback.print_exc() | |
| return JSONResponse(status_code=500, content={"error": "Responder failed to generate response"}) | |
| # -------- /ASK ENDPOINT -------- | |
| async def ask( | |
| input_data: Optional[UserInput] = Body(None), | |
| message: Optional[str] = Body(None), | |
| text: Optional[str] = Form(None), | |
| system_prompt: Optional[str] = Form(None), | |
| audio_file: Optional[UploadFile] = File(None), | |
| image_file: Optional[UploadFile] = File(None), | |
| video_file: Optional[UploadFile] = File(None), | |
| x_session_id: Optional[str] = Header(None) | |
| ): | |
| final_text = (input_data.text if input_data else None) or message or text | |
| session_id = get_session_id(x_session_id) | |
| return await handle_ask( | |
| text=final_text, | |
| input_data=input_data, | |
| audio_file=audio_file, | |
| image_file=image_file, | |
| video_file=video_file, | |
| system_prompt=system_prompt, | |
| session_id=session_id | |
| ) | |
| # -------- /PREDICT ENDPOINT -------- | |
| async def predict( | |
| input_data: Optional[UserInput] = Body(None), | |
| message: Optional[str] = Body(None), | |
| text: Optional[str] = Form(None), | |
| system_prompt: Optional[str] = Form(None), | |
| audio_file: Optional[UploadFile] = File(None), | |
| image_file: Optional[UploadFile] = File(None), | |
| video_file: Optional[UploadFile] = File(None), | |
| x_session_id: Optional[str] = Header(None) | |
| ): | |
| final_text = message or text or (input_data.text if input_data else None) | |
| session_id = get_session_id(x_session_id) | |
| return await handle_ask( | |
| text=final_text, | |
| input_data=input_data, | |
| audio_file=audio_file, | |
| image_file=image_file, | |
| video_file=video_file, | |
| system_prompt=system_prompt, | |
| session_id=session_id | |
| ) | |
| async def health_check(): | |
| return {"status": "ok"} |