memory-bridge / modal_app.py
Sheikh Mohammad Rakib
feat: initialize modal_app.py for memory bridge application framework
4bf2b01
Raw
History Blame Contribute Delete
23.1 kB
import modal
import json
import os
import base64
from pathlib import Path
app = modal.App("memory-keeper")
# ── Volumes ───────────────────────────────────────────────────────────────────
volume = modal.Volume.from_name("memory-keeper-personas", create_if_missing=True)
hf_cache_vol = modal.Volume.from_name("memory-keeper-hf-cache", create_if_missing=True)
# ── MiniCPM Hosted API ────────────────────────────────────────────────────────
MINICPM_LLM_URL = "http://35.203.155.71:8001/v1" # MiniCPM4.1-8B
MINICPM_VISION_URL = "http://35.203.155.71:8003/v1" # MiniCPM-V-4.6
MINICPM_AUTH = "Bearer sk-minicpm-V8bcD-YTAMxECagaKOnbwTCN69IlN2LhSezGOgq2Ues"
MINICPM_LLM_MODEL = "MiniCPM4.1-8B"
MINICPM_VIS_MODEL = "MiniCPM-V-4.6"
# ── Model IDs (Modal-hosted) ──────────────────────────────────────────────────
ASR_MODEL = "CohereLabs/cohere-transcribe-03-2026"
OCR_MODEL = "nvidia/NVIDIA-Nemotron-Parse-v1.2"
TTS_MODEL = "openbmb/VoxCPM2"
MULTILINGUAL_LM_FIRE = "CohereLabs/tiny-aya-fire" # South Asian (Bengali, Hindi, Urdu)
MULTILINGUAL_LM_WATER = "CohereLabs/tiny-aya-water" # Asia Pacific
MINUTES = 60
# ── Images ────────────────────────────────────────────────────────────────────
base_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install("openai", "requests", "fastapi[standard]", "huggingface_hub")
)
asr_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"torch", "transformers>=4.50.0", "torchaudio",
"huggingface_hub", "soundfile", "fastapi[standard]",
)
)
ocr_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"torch", "torchvision", "transformers>=4.50.0",
"Pillow", "huggingface_hub", "fastapi[standard]",
)
)
tts_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"torch", "soundfile", "huggingface_hub", "fastapi[standard]",
"voxcpm",
)
)
multilingual_image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"torch", "transformers>=4.50.0",
"huggingface_hub", "fastapi[standard]",
)
)
# ── Helper: MiniCPM client ────────────────────────────────────────────────────
def get_llm_client():
from openai import OpenAI
return OpenAI(
base_url=MINICPM_LLM_URL,
api_key=MINICPM_AUTH.replace("Bearer ", ""),
)
def get_vision_client():
from openai import OpenAI
return OpenAI(
base_url=MINICPM_VISION_URL,
api_key=MINICPM_AUTH.replace("Bearer ", ""),
)
# ─────────────────────────────────────────────────────────────────────────────
# 1. ASR — Cohere Transcribe 2B
# ─────────────────────────────────────────────────────────────────────────────
@app.function(gpu="T4", image=asr_image, timeout=5 * MINUTES,
volumes={"/root/.cache/huggingface": hf_cache_vol})
def transcribe_audio(audio_bytes: bytes, filename: str = "audio.wav") -> str:
import tempfile, torch, soundfile as sf
from transformers import pipeline
pipe = pipeline(
"automatic-speech-recognition",
model=ASR_MODEL,
trust_remote_code=True,
device="cuda",
torch_dtype=torch.float16,
)
with tempfile.NamedTemporaryFile(suffix=Path(filename).suffix, delete=False) as f:
f.write(audio_bytes)
tmp_path = f.name
result = pipe(tmp_path, return_timestamps=False)
os.unlink(tmp_path)
return result["text"] if isinstance(result, dict) else str(result)
# ─────────────────────────────────────────────────────────────────────────────
# 2. OCR — Nemotron Parse v1.2
# ─────────────────────────────────────────────────────────────────────────────
@app.function(gpu="T4", image=ocr_image, timeout=5 * MINUTES,
volumes={"/root/.cache/huggingface": hf_cache_vol})
def ocr_document(image_bytes: bytes) -> str:
import tempfile, torch
from PIL import Image
from transformers import AutoModelForImageTextToText, AutoProcessor
processor = AutoProcessor.from_pretrained(OCR_MODEL, trust_remote_code=True)
model = AutoModelForImageTextToText.from_pretrained(
OCR_MODEL, trust_remote_code=True,
torch_dtype=torch.float16, device_map="auto",
)
model.eval()
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
f.write(image_bytes)
tmp_path = f.name
image = Image.open(tmp_path).convert("RGB")
os.unlink(tmp_path)
messages = [{"role": "user", "content": [
{"type": "image"},
{"type": "text", "text": "Extract all text from this document. Preserve structure and formatting."},
]}]
inputs = processor.apply_chat_template(
messages, tokenize=True, add_generation_prompt=True,
return_dict=True, return_tensors="pt",
).to(model.device)
generated_ids = model.generate(**inputs, max_new_tokens=2048)
trimmed = [out[len(inp):] for inp, out in zip(inputs.input_ids, generated_ids)]
return processor.batch_decode(trimmed, skip_special_tokens=True)[0]
# ─────────────────────────────────────────────────────────────────────────────
# 3. TTS — VoxCPM2
# ─────────────────────────────────────────────────────────────────────────────
@app.function(gpu="T4", image=tts_image, timeout=5 * MINUTES,
volumes={"/root/.cache/huggingface": hf_cache_vol})
def text_to_speech(text: str, voice_description: str = None) -> bytes:
import soundfile as sf, io
from voxcpm import VoxCPM
model = VoxCPM.from_pretrained(TTS_MODEL, load_denoiser=False)
if voice_description:
text = f"({voice_description}){text}"
wav = model.generate(text=text, cfg_value=2.0, inference_timesteps=10)
buf = io.BytesIO()
sf.write(buf, wav, model.tts_model.sample_rate, format="WAV")
return buf.getvalue()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Multilingual — Tiny Aya Fire (Bengali/South Asian fallback)
# ─────────────────────────────────────────────────────────────────────────────
@app.function(gpu="T4", image=multilingual_image, timeout=5 * MINUTES,
volumes={"/root/.cache/huggingface": hf_cache_vol})
def chat_multilingual(system_prompt: str, history: list, user_message: str, model_id: str = None) -> str:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
if model_id is None:
model_id = MULTILINGUAL_LM_FIRE
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
model_id, torch_dtype=torch.float16, device_map="auto",
)
model.eval()
messages = [{"role": "system", "content": system_prompt}]
for msg in history[-8:]:
messages.append({"role": msg["role"], "content": msg["content"]})
messages.append({"role": "user", "content": user_message})
input_ids = tokenizer.apply_chat_template(
messages, tokenize=True, add_generation_prompt=True, return_tensors="pt",
).to(model.device)
gen_tokens = model.generate(
input_ids, max_new_tokens=512, do_sample=True,
temperature=0.8, top_p=0.95,
)
output = gen_tokens[0][input_ids.shape[-1]:]
return tokenizer.decode(output, skip_special_tokens=True)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Photo Description — MiniCPM-V 4.6 (hosted API)
# ─────────────────────────────────────────────────────────────────────────────
@app.function(image=base_image, timeout=5 * MINUTES)
def describe_photo(image_bytes: bytes, prompt: str = None) -> str:
import base64
client = get_vision_client()
img_b64 = base64.b64encode(image_bytes).decode()
if prompt is None:
prompt = (
"Describe this person in detail: their appearance, expression, what they are doing, "
"the setting, and any emotional tone you sense. "
"This will help reconstruct their memory and personality."
)
response = client.chat.completions.create(
model=MINICPM_VIS_MODEL,
messages=[{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}},
{"type": "text", "text": prompt},
]}],
max_tokens=512,
temperature=0.3,
)
return response.choices[0].message.content.strip()
# ─────────────────────────────────────────────────────────────────────────────
# 6. Persona Builder — MiniCPM4.1-8B (hosted API)
# ─────────────────────────────────────────────────────────────────────────────
@app.function(image=base_image, timeout=20 * MINUTES, volumes={"/personas": volume})
def build_persona(
persona_id: str, name: str, relationship: str,
texts: list, photo_captions: list, voice_transcripts: list,
) -> dict:
client = get_llm_client()
all_content = []
if texts:
all_content.append("=== LETTERS / DIARY ENTRIES ===\n" + "\n---\n".join(texts))
if photo_captions:
all_content.append("=== PHOTO DESCRIPTIONS ===\n" + "\n---\n".join(photo_captions))
if voice_transcripts:
all_content.append("=== VOICE TRANSCRIPTS ===\n" + "\n---\n".join(voice_transcripts))
combined = "\n\n".join(all_content)
prompt = f"""You are helping preserve the memory of {name} ({relationship}).
Below is everything we have from them:
{combined}
Extract a rich persona profile. Return ONLY valid JSON, no markdown, no backticks:
{{
"name": "{name}",
"relationship": "{relationship}",
"personality_traits": ["5-8 traits"],
"speech_style": "how they speak, tone, vocabulary, sentence length",
"common_phrases": ["phrases or expressions they often used"],
"key_memories": ["10-15 specific memories or life events"],
"values": ["what they cared about most"],
"language": "primary language (English/Bengali/Hindi/mixed)",
"emotional_tone": "overall emotional quality",
"topics_they_loved": ["subjects they talked about often"],
"advice_they_gave": ["wisdom or advice they shared"],
"voice_description": "describe their voice: age, gender, tone, accent e.g. elderly Bengali man, warm gentle voice",
"system_prompt": "A 300-word system prompt starting with: You are {name}..."
}}"""
response = client.chat.completions.create(
model=MINICPM_LLM_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=2048,
)
import re
raw = response.choices[0].message.content.strip()
print(f"RAW RESPONSE (first 500 chars): {raw[:500]}")
# Strip <think>...</think> tags
raw = re.sub(r'<think>.*?</think>', '', raw, flags=re.DOTALL).strip()
print(f"AFTER THINK STRIP (first 500 chars): {raw[:500]}")
# Strip markdown code blocks
if "```" in raw:
parts = raw.split("```")
for part in parts:
if part.startswith("json"):
part = part[4:]
part = part.strip()
if part.startswith("{"):
raw = part
break
# Extract JSON object
json_match = re.search(r'[{].*[}]', raw, re.DOTALL)
if json_match:
raw = json_match.group(0)
print(f"FINAL RAW FOR PARSING (first 300 chars): {raw[:300]}")
persona = json.loads(raw)
os.makedirs("/personas", exist_ok=True)
with open(f"/personas/{persona_id}.json", "w", encoding="utf-8") as f:
json.dump(persona, f, ensure_ascii=False, indent=2)
volume.commit()
return persona
# ─────────────────────────────────────────────────────────────────────────────
# 7. Chat — MiniCPM4.1-8B (hosted API)
# ─────────────────────────────────────────────────────────────────────────────
BENGALI_CHARS = set("অআইঈউঊঋএঐওঔকখগঘঙচছজঝঞটঠডঢণতথদধনপফবভমযরলশষসহড়ঢ়য়ংঃ")
# Asia Pacific: Chinese, Japanese, Korean, Thai, Vietnamese tones etc.
ASIA_PACIFIC_RANGES = [(0x4E00, 0x9FFF), (0x3040, 0x30FF), (0xAC00, 0xD7AF), (0x0E00, 0x0E7F)]
def is_asia_pacific(text):
for ch in text:
cp = ord(ch)
if any(lo <= cp <= hi for lo, hi in ASIA_PACIFIC_RANGES):
return True
return False
@app.function(image=base_image, timeout=5 * MINUTES, volumes={"/personas": volume})
def chat_with_persona(
persona_id: str, history: list, user_message: str, language: str = "auto",
) -> dict:
persona_path = f"/personas/{persona_id}.json"
if not os.path.exists(persona_path):
raise FileNotFoundError(f"Persona '{persona_id}' not found.")
with open(persona_path, "r", encoding="utf-8") as f:
persona = json.load(f)
system_prompt = persona.get("system_prompt", f"You are {persona['name']}.")
voice_desc = persona.get("voice_description", "warm elderly voice")
full_system = f"""{system_prompt}
IMPORTANT:
- You ARE {persona['name']}. Never break character.
- Use their speech style, phrases, and memories naturally.
- Be warm and personal, not like an AI.
- Detect the user's language and respond in the same language.
- Keep responses 2-4 sentences unless sharing a story."""
import re
is_bengali = any(c in BENGALI_CHARS for c in user_message)
is_ap = is_asia_pacific(user_message)
# All languages through MiniCPM4.1-8B (handles Bengali/Hindi/Chinese natively)
# Tiny Aya Fire/Water kept for dedicated multilingual endpoint (future use)
client = get_llm_client()
# Add language instruction to system prompt
if is_bengali or language == "Bengali":
full_system = full_system + "\n\nIMPORTANT: The user is writing in Bengali. You MUST respond in Bengali only."
elif is_ap or language in ("Chinese", "Japanese", "Korean", "Thai"):
full_system = full_system + "\n\nIMPORTANT: Respond in " + str(language) + " only."
messages = [{"role": "system", "content": full_system}]
for msg in history[-10:]:
messages.append({"role": msg["role"], "content": msg["content"]})
messages.append({"role": "user", "content": user_message})
response = client.chat.completions.create(
model=MINICPM_LLM_MODEL,
messages=messages,
temperature=0.8,
max_tokens=1024,
stop=None,
)
response_text = response.choices[0].message.content.strip()
response_text = re.sub(r'<think>.*?</think>', '', response_text, flags=re.DOTALL).strip()
# Remove any truncated incomplete sentence at end
if response_text and not response_text[-1] in '.!?।':
last_punct = max(
response_text.rfind('.'),
response_text.rfind('!'),
response_text.rfind('?'),
response_text.rfind('।'),
)
if last_punct > len(response_text) // 2:
response_text = response_text[:last_punct+1]
return {"text": response_text, "voice_description": voice_desc}
# ─────────────────────────────────────────────────────────────────────────────
# 8. Web Endpoints
# ─────────────────────────────────────────────────────────────────────────────
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
def make_app():
web_app = FastAPI()
web_app.add_middleware(
CORSMiddleware, allow_origins=["*"],
allow_methods=["*"], allow_headers=["*"],
)
return web_app
# Health check — tests MiniCPM API reachability
health_web = make_app()
@app.function(image=base_image, timeout=30)
@modal.asgi_app(label="health")
def health_endpoint():
@health_web.get("/")
async def handler():
try:
client = get_llm_client()
r = client.chat.completions.create(
model=MINICPM_LLM_MODEL,
messages=[{"role": "user", "content": "Hi"}],
max_tokens=5,
)
return {"status": "ok", "minicpm": "reachable", "reply": r.choices[0].message.content}
except Exception as e:
return {"status": "error", "minicpm": "unreachable", "error": str(e)}
return health_web
# Build persona
build_web = make_app()
@app.function(image=base_image, volumes={"/personas": volume}, timeout=20 * MINUTES)
@modal.asgi_app(label="build-persona")
def build_persona_endpoint():
@build_web.post("/")
async def handler(request: Request):
try:
data = await request.json()
persona = build_persona.remote(
persona_id=data["persona_id"], name=data["name"],
relationship=data["relationship"], texts=data.get("texts", []),
photo_captions=data.get("photo_captions", []),
voice_transcripts=data.get("voice_transcripts", []),
)
return {"success": True, "persona": persona}
except Exception as e:
return {"success": False, "error": str(e)}
return build_web
# Chat
chat_web = make_app()
@app.function(image=base_image, volumes={"/personas": volume}, timeout=5 * MINUTES)
@modal.asgi_app(label="chat")
def chat_endpoint():
@chat_web.post("/")
async def handler(request: Request):
data = await request.json()
result = chat_with_persona.remote(
persona_id=data["persona_id"], history=data.get("history", []),
user_message=data["message"], language=data.get("language", "auto"),
)
return result
return chat_web
# Transcribe
transcribe_web = make_app()
@app.function(image=asr_image, volumes={"/root/.cache/huggingface": hf_cache_vol}, timeout=5 * MINUTES)
@modal.asgi_app(label="transcribe")
def transcribe_endpoint():
@transcribe_web.post("/")
async def handler(request: Request):
data = await request.json()
audio_bytes = base64.b64decode(data["audio_b64"])
transcript = transcribe_audio.remote(audio_bytes, data.get("filename", "audio.wav"))
return {"transcript": transcript}
return transcribe_web
# Vision
vision_web = make_app()
@app.function(image=base_image, timeout=5 * MINUTES)
@modal.asgi_app(label="describe-photo")
def vision_endpoint():
@vision_web.post("/")
async def handler(request: Request):
data = await request.json()
image_bytes = base64.b64decode(data["image_b64"])
description = describe_photo.remote(image_bytes, data.get("prompt"))
return {"description": description}
return vision_web
# OCR
ocr_web = make_app()
@app.function(image=ocr_image, volumes={"/root/.cache/huggingface": hf_cache_vol}, timeout=5 * MINUTES)
@modal.asgi_app(label="ocr-document")
def ocr_endpoint():
@ocr_web.post("/")
async def handler(request: Request):
data = await request.json()
image_bytes = base64.b64decode(data["image_b64"])
text = ocr_document.remote(image_bytes)
return {"text": text}
return ocr_web
# TTS
tts_web = make_app()
@app.function(image=tts_image, volumes={"/root/.cache/huggingface": hf_cache_vol}, timeout=5 * MINUTES)
@modal.asgi_app(label="text-to-speech")
def tts_endpoint():
@tts_web.post("/")
async def handler(request: Request):
data = await request.json()
wav_bytes = text_to_speech.remote(data["text"], data.get("voice_description"))
return Response(content=wav_bytes, media_type="audio/wav")
return tts_web
# List personas
list_web = make_app()
@app.function(image=base_image, volumes={"/personas": volume}, timeout=MINUTES)
@modal.asgi_app(label="list-personas")
def list_personas_endpoint():
@list_web.get("/")
async def handler():
personas = []
if os.path.exists("/personas"):
for f in Path("/personas").glob("*.json"):
with open(f) as fp:
data = json.load(fp)
personas.append({
"id": f.stem,
"name": data.get("name"),
"relationship": data.get("relationship"),
})
return {"personas": personas}
return list_web