Spaces:
Running
Running
| """ | |
| Agent inference using Modal GPU endpoint, HuggingFace Inference API, or mock mode. | |
| No llama.cpp dependency. Inference is handled by: | |
| - "modal" -> remote Modal GPU endpoint (if MODAL_INFERENCE_URL set) | |
| - "hf" -> HuggingFace Inference API (if HF_API_URL + HF_TOKEN set) | |
| - "mock" -> deterministic test mode (MOCK_LLM=1 or fallback) | |
| All features have deterministic fallbacks so the app works without any LLM. | |
| """ | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, List | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| ASSETS = ["cash", "fd", "gov_bonds", "nifty_50", "nifty_it", "real_estate", "crypto", "gold"] | |
| PERSONAS = ["whale", "retail", "permabull"] | |
| MODAL_URL = os.getenv("MODAL_INFERENCE_URL", "").rstrip("/") | |
| USE_MODAL = bool(MODAL_URL) | |
| HF_API_URL = os.getenv("HF_API_URL", "").rstrip("/") | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| USE_HF = bool(HF_API_URL) and bool(HF_TOKEN) | |
| _llm_status = "uninitialized" | |
| _llm_error = "" | |
| if os.getenv("MOCK_LLM") == "1": | |
| _llm_status = "mock" | |
| _llm_error = "MOCK_LLM=1 (test mode)" | |
| elif USE_MODAL: | |
| _llm_status = "modal" | |
| _llm_error = "" | |
| elif USE_HF: | |
| _llm_status = "hf" | |
| _llm_error = "" | |
| else: | |
| _llm_status = "mock" | |
| _llm_error = "No inference backend configured (set MODAL_INFERENCE_URL or HF_API_URL+HF_TOKEN, or MOCK_LLM=1)" | |
| def llm_status() -> str: | |
| return _llm_status | |
| def llm_error() -> str: | |
| return _llm_error | |
| def start_background_load() -> None: | |
| pass | |
| def strip_reasoning_narration(text: str) -> str: | |
| """Detect and remove model's internal monologue where it repeats | |
| instructions/processes the prompt before giving the actual answer. | |
| Nemotron often outputs its reasoning as plain text, e.g.: | |
| 'User wants a single sentence... Output only the sentence. Hold cash.' | |
| We keep only the actual answer portion.""" | |
| if not text: | |
| return text | |
| # Reasoning markers: phrases the model uses when talking to itself | |
| reasoning_markers = [ | |
| r'^user\s+(wants|says|asks|is\s|needs|has|gave|provided)', | |
| r'^the\s+user\s', | |
| r'^(i\s+)?need\s+to\s', | |
| r'^(let|let\'s)\s+(me\s+|us\s+)?(think|analyze|consider|check|review|break|figure|process|reason)', | |
| r'^(we|i)\s+(need|should|must|have\s+to|want)\s', | |
| r'^we\s+need\s+(to\s+)?output\s+(one|a)\s+sentence', | |
| r'^output\s+only\s', | |
| r'^(this|it)\s+(is|seems|appears|looks)\s+(like|to\s+be)', | |
| r'^(okay|ok|so|alright|well|now|right|hmm|hmmm)[\s,]+', | |
| r'^the\s+(task|prompt|instruction|request|question)\s', | |
| r'^(based|given)\s+(on|the)\s', | |
| r'^respond\s+(with|to|as)\s', | |
| r'^reply\s+(with|to|as)\s', | |
| r'^(my|the)\s+(response|reply|answer|output)\s+(should|must|needs|will|is)\s', | |
| r'^starting\s+portfolio', | |
| r'^portfolio[\s:]+', | |
| r'^\d+%\s+cash', | |
| r'^(total|pnl|sharpe|drawdown)[\s:]+', | |
| r'^that\'?s\s+\d+\s+sentenc', | |
| r'^in\s+(ai|the)\s+(insight|chat|advisory)', | |
| r'^need\s+(to\s+)?be\s+under\s', | |
| r'^so\s+reply', | |
| r'^keep\s+in\s+character', | |
| r'^i\s+(am|will|would|can)\s+(now\s+)?(give|provide|output|share|generate)', | |
| r'^(here\s+is|here\'s)\s+(the|my|a|an)\s+(insight|response|answer|sentence)', | |
| ] | |
| # Split into paragraphs (double-newline preferred, single newline as fallback) | |
| paras = re.split(r'\n\s*\n', text) | |
| paras = [p.strip() for p in paras if p.strip()] | |
| if len(paras) <= 1: | |
| lines = [l.strip() for l in text.split('\n') if l.strip()] | |
| if len(lines) <= 1: | |
| # Single block — try sentence-level extraction | |
| return _strip_reasoning_sentences(text, reasoning_markers) | |
| paras = lines | |
| if len(paras) <= 1: | |
| return _strip_reasoning_sentences(text, reasoning_markers) | |
| # Classify each paragraph as reasoning or answer | |
| results = [] | |
| for para in paras: | |
| plow = para.lower().strip() | |
| is_reasoning = False | |
| for pattern in reasoning_markers: | |
| if re.search(pattern, plow): | |
| is_reasoning = True | |
| break | |
| results.append((para, is_reasoning)) | |
| if results and results[0][1]: | |
| for para, is_r in reversed(results): | |
| if not is_r: | |
| return para.strip() | |
| return results[-1][0].strip() | |
| return text | |
| def _strip_reasoning_sentences(text: str, reasoning_markers: list) -> str: | |
| """For single-paragraph text, split into sentences and remove reasoning ones.""" | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| if len(sentences) <= 1: | |
| # Try comma-splitting for run-on model output | |
| sentences = re.split(r'(?<=[.,;])\s+(?=[A-Z])', text) | |
| if len(sentences) <= 1: | |
| return text | |
| results = [] | |
| for s in sentences: | |
| slow = s.lower().strip() | |
| is_reasoning = False | |
| for pattern in reasoning_markers: | |
| if re.search(pattern, slow): | |
| is_reasoning = True | |
| break | |
| results.append((s, is_reasoning)) | |
| answer_parts = [s for s, is_r in results if not is_r] | |
| if answer_parts: | |
| return ' '.join(answer_parts).strip() | |
| # If all sentences look like reasoning, take the last one (model often ends with answer) | |
| return results[-1][0].strip() | |
| def _strip_prompt_echo(text: str, prompt: str = "", system: str = "") -> str: | |
| """Remove the echoed prompt from the model output. | |
| Some backends return prompt + generated text.""" | |
| if not text: | |
| return text | |
| candidates = [] | |
| if system: | |
| candidates.append(system.strip().rstrip('.')) | |
| if prompt: | |
| candidates.append(prompt.strip().rstrip('.')) | |
| for cand in candidates: | |
| if not cand: | |
| continue | |
| idx = text.lower().find(cand.lower()[:min(len(cand), 60)]) | |
| if idx == 0 or (idx > 0 and idx < 20 and text[:idx].strip() in ("", "system\n", "System:", "Assistant:")): | |
| # Found the prompt at the start; cut right after it | |
| end = idx + len(cand) | |
| # Also consume trailing whitespace/newlines/delimiters | |
| while end < len(text) and text[end] in (' ', '\n', '\r', '\t', ':', ',', '-', '.'): | |
| end += 1 | |
| text = text[end:].strip() | |
| break | |
| return text | |
| def clean_text(text: str, prompt: str = "", system: str = "") -> str: | |
| """Aggressively strip model cruft: think blocks, AI prefixes, markdown, noise.""" | |
| if not text or not text.strip(): | |
| return "" | |
| text = text.strip() | |
| # Strip echoed prompt (model repeating the instruction back) | |
| if prompt or system: | |
| text = _strip_prompt_echo(text, prompt, system) | |
| # Strip all <think>...</think> blocks (including nested/malformed) | |
| while "<think" in text.lower(): | |
| s = text.lower().find("<think") | |
| e = text.find(">", s) | |
| tag_end = e + 1 if e != -1 else s + 7 | |
| close = text.lower().find("</think", tag_end) | |
| if close != -1: | |
| close_end = text.find(">", close) | |
| text = (text[:s] + text[(close_end + 1) if close_end != -1 else (close + 8):]).strip() | |
| else: | |
| text = text[:s].strip() | |
| break | |
| # Strip reasoning narration (model talking to itself) | |
| text = strip_reasoning_narration(text) | |
| # Remove common AI preamble patterns (must be at start of text followed by colon/newline) | |
| prefixes_to_strip = [ | |
| "assistant:", "ai:", "bot:", "response:", "reply:", | |
| "here is", "here's", "okay", | |
| ] | |
| for prefix in prefixes_to_strip: | |
| low = text.lower().strip() | |
| if low.startswith(prefix): | |
| after = text[len(prefix):].strip() | |
| if after.startswith(':') or after.startswith(',') or after.startswith('-'): | |
| after = after[1:].strip() | |
| if len(after) > len(prefix): | |
| text = after | |
| break | |
| # Remove markdown formatting | |
| text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) | |
| text = re.sub(r'\*(.+?)\*', r'\1', text) | |
| text = re.sub(r'`(.+?)`', r'\1', text) | |
| text = re.sub(r'^[#\-\*>]+\s*', '', text, flags=re.MULTILINE) | |
| # Collapse multiple newlines into max 2 | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| # Strip JSON wrapper if present | |
| try: | |
| if text.startswith('{') and text.endswith('}'): | |
| data = json.loads(text) | |
| for key in ('insight', 'reply', 'text', 'content', 'response', 'message', 'output'): | |
| if key in data and isinstance(data[key], str) and data[key].strip(): | |
| text = data[key] | |
| break | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| return text.strip() | |
| def sanitize_for_display(text: str, max_chars: int = 500) -> str: | |
| """Final polish before showing to the player: full clean + truncate.""" | |
| text = clean_text(text) | |
| if not text or not text.strip(): | |
| return "" | |
| text = text.strip() | |
| # Remove any remaining <think> fragments (case insensitive) | |
| text = re.sub(r'</?think[^>]*>', '', text, flags=re.IGNORECASE) | |
| # Strip field-name prefixes from structured output (insight:, roast:, etc.) | |
| for field in ('insight', 'roast', 'lesson', 'suggestion', 'reply', 'response', | |
| 'agent', 'action', 'reason', 'sentiment', 'headline', 'output', | |
| 'text', 'content'): | |
| prefix = field + ':' | |
| low = text.lower() | |
| if low.startswith(prefix): | |
| text = text[len(prefix):].strip() | |
| # Remove lines that are just whitespace | |
| text = re.sub(r'\n\s*\n\s*\n', '\n\n', text) | |
| # Ensure it starts with a capital letter | |
| if text and text[0].islower(): | |
| text = text[0].upper() + text[1:] | |
| # Truncate to max chars at word boundary | |
| if len(text) > max_chars: | |
| text = text[:max_chars].rsplit(' ', 1)[0] | |
| return text.strip() | |
| def generate(prompt: str, system: str = "", max_tokens: int = 256, temperature: float = 0.7) -> str: | |
| if _llm_status == "mock": | |
| return mock_generate(prompt, system) | |
| if USE_MODAL: | |
| return _modal_generate(prompt, system, max_tokens, temperature) | |
| if USE_HF: | |
| return _hf_generate(prompt, system, max_tokens, temperature) | |
| return "" | |
| def _modal_generate(prompt: str, system: str, max_tokens: int = 256, temperature: float = 0.7) -> str: | |
| import time | |
| try: | |
| import httpx | |
| except ImportError: | |
| print("httpx not installed. Install it: pip install httpx") | |
| return "" | |
| messages = [] | |
| if system: | |
| messages.append({"role": "system", "content": system}) | |
| messages.append({"role": "user", "content": prompt}) | |
| for attempt in range(2): | |
| try: | |
| resp = httpx.post( | |
| f"{MODAL_URL}/chat", | |
| json={"messages": messages, "max_tokens": max_tokens, "temperature": temperature}, | |
| timeout=180.0, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| content = data["choices"][0]["message"]["content"] | |
| if isinstance(content, str) and content.strip(): | |
| return clean_text(content, prompt=prompt, system=system) | |
| except Exception as e: | |
| print(f"Modal inference attempt {attempt + 1} failed: {e}") | |
| if attempt == 0: | |
| time.sleep(2) | |
| print("Warning: Modal inference returned empty content after retries.") | |
| return "" | |
| def _hf_generate(prompt: str, system: str, max_tokens: int = 256, temperature: float = 0.7) -> str: | |
| try: | |
| import httpx | |
| except ImportError: | |
| print("httpx not installed. Install it: pip install httpx") | |
| return "" | |
| messages = [] | |
| if system: | |
| messages.append({"role": "system", "content": system}) | |
| messages.append({"role": "user", "content": prompt}) | |
| try: | |
| resp = httpx.post( | |
| HF_API_URL, | |
| json={ | |
| "inputs": messages, | |
| "parameters": {"max_new_tokens": max_tokens, "temperature": temperature}, | |
| }, | |
| headers={"Authorization": f"Bearer {HF_TOKEN}"}, | |
| timeout=120.0, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # Handle various HF response formats | |
| if isinstance(data, list) and data and "generated_text" in data[0]: | |
| content = data[0]["generated_text"] | |
| if isinstance(content, str) and content.strip(): | |
| return clean_text(content, prompt=prompt, system=system) | |
| if isinstance(data, dict) and "generated_text" in data: | |
| content = data["generated_text"] | |
| if isinstance(content, str) and content.strip(): | |
| return clean_text(content, prompt=prompt, system=system) | |
| # Chat-format response (choices array) | |
| if isinstance(data, dict) and "choices" in data: | |
| content = data["choices"][0].get("message", {}).get("content", "") | |
| if isinstance(content, str) and content.strip(): | |
| return clean_text(content, prompt=prompt, system=system) | |
| except Exception as e: | |
| print(f"HF inference failed: {e}") | |
| return "" | |
| def mock_generate(prompt: str, system: str = "") -> str: | |
| p = prompt.lower() | |
| s = system.lower() | |
| if "agent" in p and "whale" in p: | |
| return "agent: whale\naction: buy gov_bonds 0.10\nreason: safety first\nsentiment: cautious" | |
| if "agent" in p and "retail" in p: | |
| return "agent: retail\naction: sell nifty_it 0.10\nreason: panic selling\nsentiment: panic" | |
| if "agent" in p: | |
| return "agent: permabull\naction: buy crypto 0.10\nreason: buy the dip\nsentiment: bullish" | |
| if "roast" in p or "sharpe_ratio" in p: | |
| return "roast: diversify more\nsharpe_ratio: 0.5\nlesson: Sharpe ratio measures risk-adjusted return\nsuggestion: add bonds" | |
| if "insight" in p or "commentary" in p or "commentator" in s: | |
| return "insight: Markets are reacting to the headline. Watch for follow-through." | |
| if "headline" in p: | |
| return "headline: RBI holds rates steady\nimpact: cash:0 fd:0 gov_bonds:0 nifty_50:0 nifty_it:0 real_estate:0 crypto:0 gold:0\nduration: 1" | |
| return "" | |
| def parse_agent_response(response: str, persona: str) -> Dict: | |
| response = clean_text(response) | |
| try: | |
| m_agent = re.search(r"agent:\s*(\w+)", response, re.IGNORECASE) | |
| agent = (m_agent.group(1).lower() if m_agent else persona) or persona | |
| m_action = re.search(r"action:\s*(buy|sell|hold)\s+(\w+)\s+([\d.%]+)", response, re.IGNORECASE) | |
| m_reason = re.search(r"reason:\s*(.+)", response, re.IGNORECASE) | |
| m_sent = re.search(r"sentiment:\s*(\w+)", response, re.IGNORECASE) | |
| if not m_action: | |
| return {"agent": agent, "actions": [{"asset": "cash", "action": "hold", "amount_pct": 0.0, "reason": "no action"}], "sentiment": "neutral"} | |
| return { | |
| "agent": agent, | |
| "actions": [{ | |
| "asset": m_action.group(2), | |
| "action": m_action.group(1), | |
| "amount_pct": float(m_action.group(3)), | |
| "reason": (m_reason.group(1).strip() if m_reason else ""), | |
| }], | |
| "sentiment": (m_sent.group(1).lower() if m_sent else "neutral"), | |
| } | |
| except Exception as e: | |
| return {"agent": persona, "actions": [{"asset": "cash", "action": "hold", "amount_pct": 0.0, "reason": f"parse error: {e}"}], "sentiment": "neutral"} | |
| def parse_news_response(response: str) -> Dict: | |
| response = clean_text(response) | |
| try: | |
| m_head = re.search(r"headline:\s*(.+)", response, re.IGNORECASE) | |
| m_imp = re.search(r"impact:\s*(.+?)(?:\nduration:|$)", response, re.DOTALL | re.IGNORECASE) | |
| m_dur = re.search(r"duration:\s*(\d+)", response, re.IGNORECASE) | |
| headline = m_head.group(1).strip() if m_head else "Markets mixed" | |
| impact = {} | |
| if m_imp: | |
| for token in m_imp.group(1).strip().split(): | |
| if ":" in token: | |
| k, v = token.split(":") | |
| try: | |
| impact[k] = float(v) | |
| except ValueError: | |
| pass | |
| for a in ASSETS: | |
| impact.setdefault(a, 0.0) | |
| duration = int(m_dur.group(1)) if m_dur else 1 | |
| return {"headline": headline, "impact": impact, "duration_months": duration} | |
| except Exception as e: | |
| return {"headline": "Markets mixed", "impact": {a: 0.0 for a in ASSETS}, "duration_months": 1, "error": str(e)} | |
| def decide_agent(persona: str, state: Dict) -> Dict: | |
| system = ( | |
| f"You are an NPC trader in an Indian stock-market game. " | |
| f"Output the {persona}'s decision in EXACT format:\n" | |
| f"agent: {persona}\naction: <buy|sell|hold> <asset> <amount_pct>\n" | |
| f"reason: <short reason>\nsentiment: <bullish|bearish|neutral|panic|cautious>" | |
| ) | |
| compact = { | |
| "month": state.get("month"), | |
| "year": state.get("year"), | |
| "cash": state.get("cash"), | |
| "total_value": state.get("total_value"), | |
| } | |
| prompt = f"State: {json.dumps(compact)}. Persona: {persona}. Decide." | |
| response = generate(prompt, system=system, max_tokens=150, temperature=0.6) | |
| return parse_agent_response(response, persona) | |
| def generate_news(event: Dict) -> Dict: | |
| headline = event.get("headline", "Markets trade in tight range") | |
| regime = event.get("regime", "stagnation") | |
| impact = event.get("impact", {}) | |
| for a in ASSETS: | |
| impact.setdefault(a, 0.0) | |
| return { | |
| "headline": headline, | |
| "regime": regime, | |
| "impact": {k: float(v) for k, v in impact.items()}, | |
| "duration_months": int(event.get("duration_months", 1)), | |
| "year": int(event.get("year", 0)), | |
| "month": int(event.get("month", 0)), | |
| } | |
| def generate_insight(event: Dict, state_snapshot: Dict) -> str: | |
| if not event: | |
| return "Markets are quiet. Use the time to review your allocation." | |
| pnl = float(state_snapshot.get("unrealized_pnl", 0.0)) | |
| cash = float(state_snapshot.get("cash", 0.0)) | |
| total = float(state_snapshot.get("total_value", 0.0)) | |
| cash_pct = (cash / total * 100.0) if total else 0.0 | |
| regime = str(event.get("regime", "stagnation")) | |
| headline = str(event.get("headline", "")) | |
| system = ( | |
| "You are a sharp Indian markets commentator. Given a market event " | |
| "and a player's portfolio snapshot, output ONE sentence (under 140 chars) " | |
| "of actionable insight. Reply ONLY with the insight text. " | |
| "No prefixes, no markdown, no thinking tags, no explanations." | |
| ) | |
| prompt = ( | |
| f"Event: {headline} (regime: {regime}). " | |
| f"Player P&L ₹{pnl:,.0f}, cash {cash_pct:.0f}%, total ₹{total:,.0f}. " | |
| f"One actionable sentence." | |
| ) | |
| try: | |
| text = generate(prompt, system=system, max_tokens=100, temperature=0.4).strip() | |
| text = sanitize_for_display(text, 200) | |
| except Exception: | |
| text = "" | |
| if not text: | |
| if pnl < -50_000: | |
| text = f"Cut losers in {regime.replace('_', ' ')} regimes and rotate into defensives." | |
| elif pnl > 50_000: | |
| text = f"Book partial profits; {regime.replace('_', ' ')} trends rarely last." | |
| elif cash_pct > 60: | |
| text = "Heavy cash drag. Deploy into bonds or Nifty on dips." | |
| else: | |
| text = f"Hold the line through this {regime.replace('_', ' ')} phase." | |
| return text[:200] | |
| def chat_reply(user_message: str, state_snapshot: Dict) -> str: | |
| pnl = float(state_snapshot.get("unrealized_pnl", 0.0)) | |
| cash = float(state_snapshot.get("cash", 0.0)) | |
| total = float(state_snapshot.get("total_value", 0.0)) | |
| positions = state_snapshot.get("positions", []) | |
| pos_lines = ", ".join( | |
| f"{p['asset']} {p['qty']:.2f} @ ₹{p['price']:.0f}" for p in positions[:8] | |
| ) or "no positions" | |
| system = ( | |
| "You are Retro Alpha, a sharp Indian markets assistant in a 1990s " | |
| "stock-trading game. Be concise, witty, and grounded in the player's " | |
| "actual positions. Output ONLY 2-3 short sentences. " | |
| "No thinking tags, no markdown, no prefixes, no explanations." | |
| ) | |
| prompt = ( | |
| f"Portfolio: total ₹{total:,.0f}, cash ₹{cash:,.0f}, " | |
| f"unrealized P&L ₹{pnl:,.0f}. Positions: {pos_lines}.\n" | |
| f"Player: {user_message}\nReply in 2-3 short sentences." | |
| ) | |
| try: | |
| text = generate(prompt, system=system, max_tokens=140, temperature=0.5).strip() | |
| text = sanitize_for_display(text, 500) | |
| except Exception: | |
| text = "" | |
| if not text: | |
| if "buy" in user_message.lower() or "should i" in user_message.lower(): | |
| text = f"With cash at ₹{cash:,.0f} and P&L ₹{pnl:,.0f}, I'd wait for a confirmed trend before adding. Check the chart for support levels." | |
| elif "sell" in user_message.lower(): | |
| text = "Selling into strength is a discipline. If your position is >20% of portfolio, trim 10% and rebalance." | |
| elif pnl < 0: | |
| text = f"You're down ₹{abs(pnl):,.0f}. Don't add to losers. Rotate into bonds or gold until the regime clarifies." | |
| else: | |
| text = f"Up ₹{pnl:,.0f} — not bad. Lock in some gains into FDs so the win isn't just on paper." | |
| return text[:500] | |
| def all_agents_decide(state: Dict) -> List[Dict]: | |
| return [decide_agent(p, state) for p in PERSONAS] | |