mvi-ai-engine / api /main.py
Musombi's picture
Update api/main.py
3cb013f verified
import os
import os
import asyncio
import httpx
import asyncio
import traceback
from typing import Optional, List, Dict, Union
from fastapi import FastAPI, UploadFile, File, Form, Body, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from core.mvi_ai_full import MVI_AI
from reasoning.mvi_instruction_engine import build_system_prompt
from generation.responder import Responder
# -------- APP INIT --------
app = FastAPI(title="MVI-AI Multimodal Engine")
# -------- SAFE MVI_AI STARTUP --------
try:
mvi_ai = MVI_AI()
except Exception as e:
print(f"[STARTUP WARNING] MVI_AI failed to initialize: {e}")
mvi_ai = None
# -------- RESONDER --------
responder = Responder()
# -------- SESSION MEMORY --------
SESSION_MEMORY: Dict[str, List[str]] = {}
MAX_HISTORY = 10
def get_session_id(x_session_id: Optional[str] = Header(None)) -> str:
return x_session_id or "default_session"
# -------- DATA MODEL --------
class UserInput(BaseModel):
text: Optional[str] = None
# -------- SYSTEM PROMPT BUILDER --------
def prepare_system_prompt(user_input: str, runtime_prompt: Optional[str] = None) -> str:
external_prompt = """
You are MVI-AI.
Maintain safety, clarity, and ethical behavior.
"""
return build_system_prompt(
external_prompt=external_prompt,
user_prompt=runtime_prompt or user_input
)
# -------- ROOT & HEALTH --------
@app.get("/")
async def root():
return {"message": "MVI-AI Engine is running. Use /health to check status."}
@app.get("/health")
async def health():
return {"status": "ok", "mvi_ai_ready": bool(mvi_ai)}
# -------- SELF-PING / KEEP-ALIVE --------
SELF_URL = "https://musombi-mvi-ai-engine.hf.space/health"
PING_INTERVAL = 5 * 60 # every 5 minutes
async def keep_alive():
async with httpx.AsyncClient() as client:
while True:
try:
response = await client.get(SELF_URL)
if response.status_code == 200:
print("[KEEP-ALIVE] Ping successful")
else:
print(f"[KEEP-ALIVE] Ping failed: {response.status_code}")
except Exception as e:
print(f"[KEEP-ALIVE] Ping error: {e}")
await asyncio.sleep(PING_INTERVAL)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(keep_alive())
# -------- CORE HANDLER --------
async def handle_ask(
text: Optional[str] = None,
input_data: Optional[UserInput] = None,
audio_file: Optional[UploadFile] = None,
image_file: Optional[UploadFile] = None,
video_file: Optional[UploadFile] = None,
system_prompt: Optional[str] = None,
session_id: str = "default_session",
):
# MVI_AI not available
if mvi_ai is None:
return JSONResponse(
status_code=503,
content={"error": "MVI_AI not available. Check logs for model load errors."}
)
# Determine actual text input
actual_text = text or (input_data.text if input_data else "")
if not actual_text and not any([audio_file, image_file, video_file]):
return JSONResponse(status_code=400, content={"error": "No input provided"})
# -----------------------
# Session memory handling
# -----------------------
session_history = SESSION_MEMORY.get(session_id, [])
if actual_text:
session_history.append(actual_text)
SESSION_MEMORY[session_id] = session_history[-MAX_HISTORY:]
# -----------------------
# Detect modality
# -----------------------
modality = "text"
if audio_file:
modality = "audio"
elif image_file:
modality = "image"
elif video_file:
modality = "video"
# Build final system prompt
final_prompt = prepare_system_prompt(actual_text, runtime_prompt=system_prompt)
# -----------------------
# Run MVI_AI safely
# -----------------------
try:
mvi_response = await mvi_ai.ask(
text=actual_text,
audio=audio_file,
image=image_file,
video=video_file,
system_prompt=final_prompt
)
except Exception:
traceback.print_exc()
return JSONResponse(status_code=500, content={"error": "MVI_AI internal error (see logs)"})
# -----------------------
# Generate humanized output safely
# -----------------------
try:
response_data = responder.generate(
plan={
"emotion": mvi_response.get("emotion", {"label":"neutral"}).get("label","neutral"),
"reasoning_depth": "high"
}
)
# Save code if generated
code_file_info = None
generated_code = mvi_response.get("generated_code")
if generated_code:
lang, filename = responder.detect_language(generated_code)
if lang:
path = responder.save_code_file(filename, generated_code)
code_file_info = {"language": lang, "file_path": path}
return {
"success": True,
"response": response_data,
"raw_mvi_ai": mvi_response,
"code_file": code_file_info
}
except Exception:
traceback.print_exc()
return JSONResponse(status_code=500, content={"error": "Responder failed to generate response"})
# -------- /ASK ENDPOINT --------
@app.post("/ask")
async def ask(
input_data: Optional[UserInput] = Body(None),
message: Optional[str] = Body(None),
text: Optional[str] = Form(None),
system_prompt: Optional[str] = Form(None),
audio_file: Optional[UploadFile] = File(None),
image_file: Optional[UploadFile] = File(None),
video_file: Optional[UploadFile] = File(None),
x_session_id: Optional[str] = Header(None)
):
final_text = (input_data.text if input_data else None) or message or text
session_id = get_session_id(x_session_id)
return await handle_ask(
text=final_text,
input_data=input_data,
audio_file=audio_file,
image_file=image_file,
video_file=video_file,
system_prompt=system_prompt,
session_id=session_id
)
# -------- /PREDICT ENDPOINT --------
@app.post("/predict")
async def predict(
input_data: Optional[UserInput] = Body(None),
message: Optional[str] = Body(None),
text: Optional[str] = Form(None),
system_prompt: Optional[str] = Form(None),
audio_file: Optional[UploadFile] = File(None),
image_file: Optional[UploadFile] = File(None),
video_file: Optional[UploadFile] = File(None),
x_session_id: Optional[str] = Header(None)
):
final_text = message or text or (input_data.text if input_data else None)
session_id = get_session_id(x_session_id)
return await handle_ask(
text=final_text,
input_data=input_data,
audio_file=audio_file,
image_file=image_file,
video_file=video_file,
system_prompt=system_prompt,
session_id=session_id
)
@app.get("/health")
async def health_check():
return {"status": "ok"}