""" Robust utilities for the research pipeline - JSON parsing with multiple fallback layers - Retry with exponential backoff - Model fallback chain - Content cleaning """ import json import re import asyncio import time from typing import Optional, Any def robust_json_parse(text: str) -> Optional[dict]: """Parse JSON with 7 fallback layers (faithful to original Next.js).""" if not text or not text.strip(): return None text = text.strip() # Layer 1: Direct parse try: result = json.loads(text) if isinstance(result, dict): return result except: pass # Layer 2: Strip markdown code blocks if text.startswith("```"): text = text.split("\n", 1)[1].rsplit("```", 1)[0].strip() try: return json.loads(text) except: pass # Layer 3: Find first { to last } (the main JSON object) start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: candidate = text[start:end] try: return json.loads(candidate) except: pass # Layer 4: Try to find array [ start = text.find("[") end = text.rfind("]") + 1 if start >= 0 and end > start: candidate = text[start:end] try: result = json.loads(candidate) if isinstance(result, list): return {"plan": result} except: pass # Layer 5: Try plan aliases for alias in ["plan", "sections", "structure", "outline", "document", "research", "chapters", "content"]: try: full = json.loads(text) if isinstance(full, dict) and alias in full: val = full[alias] if isinstance(val, list): return {"plan": val, "summary": full.get("summary", "")} elif isinstance(val, str): return {"plan": [{"section": "Content", "content": val}], "summary": val[:200]} except: pass # Layer 6: Extract JSON from monologue (find balanced braces) depth = 0 json_start = -1 for i, c in enumerate(text): if c == '{': if depth == 0: json_start = i depth += 1 elif c == '}': depth -= 1 if depth == 0 and json_start >= 0: candidate = text[json_start:i+1] try: return json.loads(candidate) except: json_start = -1 # Layer 7: Last resort - wrap as single section return {"plan": [{"section": "Research Report", "content": text[:5000]}], "summary": text[:500]} def clean_agent_content(text: str) -> str: """Remove monologue, think tags, loops, and other AI artifacts.""" if not text: return "" # Remove think tags text = re.sub(r'.*?', '', text, flags=re.DOTALL) text = re.sub(r'.*?', '', text, flags=re.DOTALL) # Remove monologue prefixes prefixes = [ "Here is", "Here's", "Below is", "The following", "I'll", "Let me", "Sure", "Okay", "Alright", "Certainly", "Claro", "Aquí está", "A continuación", "Voy a" ] for prefix in prefixes: if text.startswith(prefix): text = text[len(prefix):].lstrip(":").lstrip().lstrip("\n") # Remove repeated lines (loops) lines = text.split("\n") unique_lines = [] seen = set() for line in lines: normalized = line.strip().lower() if normalized and normalized in seen: continue seen.add(normalized) unique_lines.append(line) return "\n".join(unique_lines).strip() def strip_latex(text: str) -> str: """Remove LaTeX commands from text.""" text = re.sub(r'\\\\[a-zA-Z]+\{[^}]*\}', '', text) # Remove \command{arg} text = re.sub(r'\\[a-zA-Z]+', '', text) # Remove \command text = re.sub(r'\$[^$]+\$', '', text) # Remove inline math text = re.sub(r'\$\$.*?\$\$', '', text, flags=re.DOTALL) # Remove display math return text.strip() def sanitize_latex(text: str) -> str: """Sanitize text for LaTeX output.""" text = text.replace('&', '\\&') text = text.replace('%', '\\%') text = text.replace('$', '\\$') text = text.replace('#', '\\#') text = text.replace('_', '\\_') text = text.replace('{', '\\{') text = text.replace('}', '\\}') return text def normalize_boolean(val: Any) -> Any: """Normalize boolean-like values.""" if isinstance(val, str): val = val.strip().lower() if val in ("true", "yes", "1", "on"): return True if val in ("false", "no", "0", "off", ""): return False return val def clean_stop_words(text: str, stop_words: list = None) -> str: """Remove stop words from text.""" default_stops = [ "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", "el", "la", "los", "las", "un", "una", "y", "o", "pero", "en", "de", "del", "con", "por", "para", "como", "que", "se", "su", "al" ] words = stop_words or default_stops tokens = text.split() filtered = [t for t in tokens if t.lower() not in words] return " ".join(filtered) async def with_retry(func, retries: int = 2, delay: float = 1.0, backoff: float = 2.0): """Execute function with retry and exponential backoff.""" last_error = None for attempt in range(retries + 1): try: return await func() except Exception as e: last_error = e if attempt < retries: await asyncio.sleep(delay * (backoff ** attempt)) raise last_error def extract_research_plan(text: str) -> dict: """Extract research plan from various response formats.""" # Try JSON first parsed = robust_json_parse(text) if parsed and "plan" in parsed: return parsed # Try to find plan-like content if isinstance(parsed, dict): for key in ["sections", "structure", "outline", "document", "research", "chapters", "content"]: if key in parsed: val = parsed[key] if isinstance(val, list): return {"plan": val, "summary": parsed.get("summary", "")} # Fallback: wrap text as single section return { "plan": [{"section": "Research Report", "content": text[:5000]}], "summary": text[:500] } def is_plan_weak(plan: dict) -> bool: """Check if plan needs retry (too few sections or short names).""" items = plan.get("plan", []) if not isinstance(items, list) or len(items) < 2: return True for item in items: if not isinstance(item, dict): return True section = item.get("section", "") if not section or len(section) < 5: return True return False