import modal import json import os import base64 from pathlib import Path app = modal.App("memory-keeper") # ── Volumes ─────────────────────────────────────────────────────────────────── volume = modal.Volume.from_name("memory-keeper-personas", create_if_missing=True) hf_cache_vol = modal.Volume.from_name("memory-keeper-hf-cache", create_if_missing=True) # ── MiniCPM Hosted API ──────────────────────────────────────────────────────── MINICPM_LLM_URL = "http://35.203.155.71:8001/v1" # MiniCPM4.1-8B MINICPM_VISION_URL = "http://35.203.155.71:8003/v1" # MiniCPM-V-4.6 MINICPM_AUTH = "Bearer sk-minicpm-V8bcD-YTAMxECagaKOnbwTCN69IlN2LhSezGOgq2Ues" MINICPM_LLM_MODEL = "MiniCPM4.1-8B" MINICPM_VIS_MODEL = "MiniCPM-V-4.6" # ── Model IDs (Modal-hosted) ────────────────────────────────────────────────── ASR_MODEL = "CohereLabs/cohere-transcribe-03-2026" OCR_MODEL = "nvidia/NVIDIA-Nemotron-Parse-v1.2" TTS_MODEL = "openbmb/VoxCPM2" MULTILINGUAL_LM_FIRE = "CohereLabs/tiny-aya-fire" # South Asian (Bengali, Hindi, Urdu) MULTILINGUAL_LM_WATER = "CohereLabs/tiny-aya-water" # Asia Pacific MINUTES = 60 # ── Images ──────────────────────────────────────────────────────────────────── base_image = ( modal.Image.debian_slim(python_version="3.11") .pip_install("openai", "requests", "fastapi[standard]", "huggingface_hub") ) asr_image = ( modal.Image.debian_slim(python_version="3.11") .pip_install( "torch", "transformers>=4.50.0", "torchaudio", "huggingface_hub", "soundfile", "fastapi[standard]", ) ) ocr_image = ( modal.Image.debian_slim(python_version="3.11") .pip_install( "torch", "torchvision", "transformers>=4.50.0", "Pillow", "huggingface_hub", "fastapi[standard]", ) ) tts_image = ( modal.Image.debian_slim(python_version="3.11") .pip_install( "torch", "soundfile", "huggingface_hub", "fastapi[standard]", "voxcpm", ) ) multilingual_image = ( modal.Image.debian_slim(python_version="3.11") .pip_install( "torch", "transformers>=4.50.0", "huggingface_hub", "fastapi[standard]", ) ) # ── Helper: MiniCPM client ──────────────────────────────────────────────────── def get_llm_client(): from openai import OpenAI return OpenAI( base_url=MINICPM_LLM_URL, api_key=MINICPM_AUTH.replace("Bearer ", ""), ) def get_vision_client(): from openai import OpenAI return OpenAI( base_url=MINICPM_VISION_URL, api_key=MINICPM_AUTH.replace("Bearer ", ""), ) # ───────────────────────────────────────────────────────────────────────────── # 1. ASR — Cohere Transcribe 2B # ───────────────────────────────────────────────────────────────────────────── @app.function(gpu="T4", image=asr_image, timeout=5 * MINUTES, volumes={"/root/.cache/huggingface": hf_cache_vol}) def transcribe_audio(audio_bytes: bytes, filename: str = "audio.wav") -> str: import tempfile, torch, soundfile as sf from transformers import pipeline pipe = pipeline( "automatic-speech-recognition", model=ASR_MODEL, trust_remote_code=True, device="cuda", torch_dtype=torch.float16, ) with tempfile.NamedTemporaryFile(suffix=Path(filename).suffix, delete=False) as f: f.write(audio_bytes) tmp_path = f.name result = pipe(tmp_path, return_timestamps=False) os.unlink(tmp_path) return result["text"] if isinstance(result, dict) else str(result) # ───────────────────────────────────────────────────────────────────────────── # 2. OCR — Nemotron Parse v1.2 # ───────────────────────────────────────────────────────────────────────────── @app.function(gpu="T4", image=ocr_image, timeout=5 * MINUTES, volumes={"/root/.cache/huggingface": hf_cache_vol}) def ocr_document(image_bytes: bytes) -> str: import tempfile, torch from PIL import Image from transformers import AutoModelForImageTextToText, AutoProcessor processor = AutoProcessor.from_pretrained(OCR_MODEL, trust_remote_code=True) model = AutoModelForImageTextToText.from_pretrained( OCR_MODEL, trust_remote_code=True, torch_dtype=torch.float16, device_map="auto", ) model.eval() with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: f.write(image_bytes) tmp_path = f.name image = Image.open(tmp_path).convert("RGB") os.unlink(tmp_path) messages = [{"role": "user", "content": [ {"type": "image"}, {"type": "text", "text": "Extract all text from this document. Preserve structure and formatting."}, ]}] inputs = processor.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_dict=True, return_tensors="pt", ).to(model.device) generated_ids = model.generate(**inputs, max_new_tokens=2048) trimmed = [out[len(inp):] for inp, out in zip(inputs.input_ids, generated_ids)] return processor.batch_decode(trimmed, skip_special_tokens=True)[0] # ───────────────────────────────────────────────────────────────────────────── # 3. TTS — VoxCPM2 # ───────────────────────────────────────────────────────────────────────────── @app.function(gpu="T4", image=tts_image, timeout=5 * MINUTES, volumes={"/root/.cache/huggingface": hf_cache_vol}) def text_to_speech(text: str, voice_description: str = None) -> bytes: import soundfile as sf, io from voxcpm import VoxCPM model = VoxCPM.from_pretrained(TTS_MODEL, load_denoiser=False) if voice_description: text = f"({voice_description}){text}" wav = model.generate(text=text, cfg_value=2.0, inference_timesteps=10) buf = io.BytesIO() sf.write(buf, wav, model.tts_model.sample_rate, format="WAV") return buf.getvalue() # ───────────────────────────────────────────────────────────────────────────── # 4. Multilingual — Tiny Aya Fire (Bengali/South Asian fallback) # ───────────────────────────────────────────────────────────────────────────── @app.function(gpu="T4", image=multilingual_image, timeout=5 * MINUTES, volumes={"/root/.cache/huggingface": hf_cache_vol}) def chat_multilingual(system_prompt: str, history: list, user_message: str, model_id: str = None) -> str: import torch from transformers import AutoTokenizer, AutoModelForCausalLM if model_id is None: model_id = MULTILINGUAL_LM_FIRE tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.float16, device_map="auto", ) model.eval() messages = [{"role": "system", "content": system_prompt}] for msg in history[-8:]: messages.append({"role": msg["role"], "content": msg["content"]}) messages.append({"role": "user", "content": user_message}) input_ids = tokenizer.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_tensors="pt", ).to(model.device) gen_tokens = model.generate( input_ids, max_new_tokens=512, do_sample=True, temperature=0.8, top_p=0.95, ) output = gen_tokens[0][input_ids.shape[-1]:] return tokenizer.decode(output, skip_special_tokens=True) # ───────────────────────────────────────────────────────────────────────────── # 5. Photo Description — MiniCPM-V 4.6 (hosted API) # ───────────────────────────────────────────────────────────────────────────── @app.function(image=base_image, timeout=5 * MINUTES) def describe_photo(image_bytes: bytes, prompt: str = None) -> str: import base64 client = get_vision_client() img_b64 = base64.b64encode(image_bytes).decode() if prompt is None: prompt = ( "Describe this person in detail: their appearance, expression, what they are doing, " "the setting, and any emotional tone you sense. " "This will help reconstruct their memory and personality." ) response = client.chat.completions.create( model=MINICPM_VIS_MODEL, messages=[{"role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}, {"type": "text", "text": prompt}, ]}], max_tokens=512, temperature=0.3, ) return response.choices[0].message.content.strip() # ───────────────────────────────────────────────────────────────────────────── # 6. Persona Builder — MiniCPM4.1-8B (hosted API) # ───────────────────────────────────────────────────────────────────────────── @app.function(image=base_image, timeout=20 * MINUTES, volumes={"/personas": volume}) def build_persona( persona_id: str, name: str, relationship: str, texts: list, photo_captions: list, voice_transcripts: list, ) -> dict: client = get_llm_client() all_content = [] if texts: all_content.append("=== LETTERS / DIARY ENTRIES ===\n" + "\n---\n".join(texts)) if photo_captions: all_content.append("=== PHOTO DESCRIPTIONS ===\n" + "\n---\n".join(photo_captions)) if voice_transcripts: all_content.append("=== VOICE TRANSCRIPTS ===\n" + "\n---\n".join(voice_transcripts)) combined = "\n\n".join(all_content) prompt = f"""You are helping preserve the memory of {name} ({relationship}). Below is everything we have from them: {combined} Extract a rich persona profile. Return ONLY valid JSON, no markdown, no backticks: {{ "name": "{name}", "relationship": "{relationship}", "personality_traits": ["5-8 traits"], "speech_style": "how they speak, tone, vocabulary, sentence length", "common_phrases": ["phrases or expressions they often used"], "key_memories": ["10-15 specific memories or life events"], "values": ["what they cared about most"], "language": "primary language (English/Bengali/Hindi/mixed)", "emotional_tone": "overall emotional quality", "topics_they_loved": ["subjects they talked about often"], "advice_they_gave": ["wisdom or advice they shared"], "voice_description": "describe their voice: age, gender, tone, accent e.g. elderly Bengali man, warm gentle voice", "system_prompt": "A 300-word system prompt starting with: You are {name}..." }}""" response = client.chat.completions.create( model=MINICPM_LLM_MODEL, messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=2048, ) import re raw = response.choices[0].message.content.strip() print(f"RAW RESPONSE (first 500 chars): {raw[:500]}") # Strip ... tags raw = re.sub(r'.*?', '', raw, flags=re.DOTALL).strip() print(f"AFTER THINK STRIP (first 500 chars): {raw[:500]}") # Strip markdown code blocks if "```" in raw: parts = raw.split("```") for part in parts: if part.startswith("json"): part = part[4:] part = part.strip() if part.startswith("{"): raw = part break # Extract JSON object json_match = re.search(r'[{].*[}]', raw, re.DOTALL) if json_match: raw = json_match.group(0) print(f"FINAL RAW FOR PARSING (first 300 chars): {raw[:300]}") persona = json.loads(raw) os.makedirs("/personas", exist_ok=True) with open(f"/personas/{persona_id}.json", "w", encoding="utf-8") as f: json.dump(persona, f, ensure_ascii=False, indent=2) volume.commit() return persona # ───────────────────────────────────────────────────────────────────────────── # 7. Chat — MiniCPM4.1-8B (hosted API) # ───────────────────────────────────────────────────────────────────────────── BENGALI_CHARS = set("অআইঈউঊঋএঐওঔকখগঘঙচছজঝঞটঠডঢণতথদধনপফবভমযরলশষসহড়ঢ়য়ংঃ") # Asia Pacific: Chinese, Japanese, Korean, Thai, Vietnamese tones etc. ASIA_PACIFIC_RANGES = [(0x4E00, 0x9FFF), (0x3040, 0x30FF), (0xAC00, 0xD7AF), (0x0E00, 0x0E7F)] def is_asia_pacific(text): for ch in text: cp = ord(ch) if any(lo <= cp <= hi for lo, hi in ASIA_PACIFIC_RANGES): return True return False @app.function(image=base_image, timeout=5 * MINUTES, volumes={"/personas": volume}) def chat_with_persona( persona_id: str, history: list, user_message: str, language: str = "auto", ) -> dict: persona_path = f"/personas/{persona_id}.json" if not os.path.exists(persona_path): raise FileNotFoundError(f"Persona '{persona_id}' not found.") with open(persona_path, "r", encoding="utf-8") as f: persona = json.load(f) system_prompt = persona.get("system_prompt", f"You are {persona['name']}.") voice_desc = persona.get("voice_description", "warm elderly voice") full_system = f"""{system_prompt} IMPORTANT: - You ARE {persona['name']}. Never break character. - Use their speech style, phrases, and memories naturally. - Be warm and personal, not like an AI. - Detect the user's language and respond in the same language. - Keep responses 2-4 sentences unless sharing a story.""" import re is_bengali = any(c in BENGALI_CHARS for c in user_message) is_ap = is_asia_pacific(user_message) # All languages through MiniCPM4.1-8B (handles Bengali/Hindi/Chinese natively) # Tiny Aya Fire/Water kept for dedicated multilingual endpoint (future use) client = get_llm_client() # Add language instruction to system prompt if is_bengali or language == "Bengali": full_system = full_system + "\n\nIMPORTANT: The user is writing in Bengali. You MUST respond in Bengali only." elif is_ap or language in ("Chinese", "Japanese", "Korean", "Thai"): full_system = full_system + "\n\nIMPORTANT: Respond in " + str(language) + " only." messages = [{"role": "system", "content": full_system}] for msg in history[-10:]: messages.append({"role": msg["role"], "content": msg["content"]}) messages.append({"role": "user", "content": user_message}) response = client.chat.completions.create( model=MINICPM_LLM_MODEL, messages=messages, temperature=0.8, max_tokens=1024, stop=None, ) response_text = response.choices[0].message.content.strip() response_text = re.sub(r'.*?', '', response_text, flags=re.DOTALL).strip() # Remove any truncated incomplete sentence at end if response_text and not response_text[-1] in '.!?।': last_punct = max( response_text.rfind('.'), response_text.rfind('!'), response_text.rfind('?'), response_text.rfind('।'), ) if last_punct > len(response_text) // 2: response_text = response_text[:last_punct+1] return {"text": response_text, "voice_description": voice_desc} # ───────────────────────────────────────────────────────────────────────────── # 8. Web Endpoints # ───────────────────────────────────────────────────────────────────────────── from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import Response def make_app(): web_app = FastAPI() web_app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) return web_app # Health check — tests MiniCPM API reachability health_web = make_app() @app.function(image=base_image, timeout=30) @modal.asgi_app(label="health") def health_endpoint(): @health_web.get("/") async def handler(): try: client = get_llm_client() r = client.chat.completions.create( model=MINICPM_LLM_MODEL, messages=[{"role": "user", "content": "Hi"}], max_tokens=5, ) return {"status": "ok", "minicpm": "reachable", "reply": r.choices[0].message.content} except Exception as e: return {"status": "error", "minicpm": "unreachable", "error": str(e)} return health_web # Build persona build_web = make_app() @app.function(image=base_image, volumes={"/personas": volume}, timeout=20 * MINUTES) @modal.asgi_app(label="build-persona") def build_persona_endpoint(): @build_web.post("/") async def handler(request: Request): try: data = await request.json() persona = build_persona.remote( persona_id=data["persona_id"], name=data["name"], relationship=data["relationship"], texts=data.get("texts", []), photo_captions=data.get("photo_captions", []), voice_transcripts=data.get("voice_transcripts", []), ) return {"success": True, "persona": persona} except Exception as e: return {"success": False, "error": str(e)} return build_web # Chat chat_web = make_app() @app.function(image=base_image, volumes={"/personas": volume}, timeout=5 * MINUTES) @modal.asgi_app(label="chat") def chat_endpoint(): @chat_web.post("/") async def handler(request: Request): data = await request.json() result = chat_with_persona.remote( persona_id=data["persona_id"], history=data.get("history", []), user_message=data["message"], language=data.get("language", "auto"), ) return result return chat_web # Transcribe transcribe_web = make_app() @app.function(image=asr_image, volumes={"/root/.cache/huggingface": hf_cache_vol}, timeout=5 * MINUTES) @modal.asgi_app(label="transcribe") def transcribe_endpoint(): @transcribe_web.post("/") async def handler(request: Request): data = await request.json() audio_bytes = base64.b64decode(data["audio_b64"]) transcript = transcribe_audio.remote(audio_bytes, data.get("filename", "audio.wav")) return {"transcript": transcript} return transcribe_web # Vision vision_web = make_app() @app.function(image=base_image, timeout=5 * MINUTES) @modal.asgi_app(label="describe-photo") def vision_endpoint(): @vision_web.post("/") async def handler(request: Request): data = await request.json() image_bytes = base64.b64decode(data["image_b64"]) description = describe_photo.remote(image_bytes, data.get("prompt")) return {"description": description} return vision_web # OCR ocr_web = make_app() @app.function(image=ocr_image, volumes={"/root/.cache/huggingface": hf_cache_vol}, timeout=5 * MINUTES) @modal.asgi_app(label="ocr-document") def ocr_endpoint(): @ocr_web.post("/") async def handler(request: Request): data = await request.json() image_bytes = base64.b64decode(data["image_b64"]) text = ocr_document.remote(image_bytes) return {"text": text} return ocr_web # TTS tts_web = make_app() @app.function(image=tts_image, volumes={"/root/.cache/huggingface": hf_cache_vol}, timeout=5 * MINUTES) @modal.asgi_app(label="text-to-speech") def tts_endpoint(): @tts_web.post("/") async def handler(request: Request): data = await request.json() wav_bytes = text_to_speech.remote(data["text"], data.get("voice_description")) return Response(content=wav_bytes, media_type="audio/wav") return tts_web # List personas list_web = make_app() @app.function(image=base_image, volumes={"/personas": volume}, timeout=MINUTES) @modal.asgi_app(label="list-personas") def list_personas_endpoint(): @list_web.get("/") async def handler(): personas = [] if os.path.exists("/personas"): for f in Path("/personas").glob("*.json"): with open(f) as fp: data = json.load(fp) personas.append({ "id": f.stem, "name": data.get("name"), "relationship": data.get("relationship"), }) return {"personas": personas} return list_web