| """ |
| Robust utilities for the research pipeline |
| - JSON parsing with multiple fallback layers |
| - Retry with exponential backoff |
| - Model fallback chain |
| - Content cleaning |
| """ |
|
|
| import json |
| import re |
| import asyncio |
| import time |
| from typing import Optional, Any |
|
|
|
|
| def robust_json_parse(text: str) -> Optional[dict]: |
| """Parse JSON with 7 fallback layers (faithful to original Next.js).""" |
| if not text or not text.strip(): |
| return None |
| |
| text = text.strip() |
| |
| |
| try: |
| result = json.loads(text) |
| if isinstance(result, dict): |
| return result |
| except: |
| pass |
| |
| |
| if text.startswith("```"): |
| text = text.split("\n", 1)[1].rsplit("```", 1)[0].strip() |
| try: |
| return json.loads(text) |
| except: |
| pass |
| |
| |
| start = text.find("{") |
| end = text.rfind("}") + 1 |
| if start >= 0 and end > start: |
| candidate = text[start:end] |
| try: |
| return json.loads(candidate) |
| except: |
| pass |
| |
| |
| start = text.find("[") |
| end = text.rfind("]") + 1 |
| if start >= 0 and end > start: |
| candidate = text[start:end] |
| try: |
| result = json.loads(candidate) |
| if isinstance(result, list): |
| return {"plan": result} |
| except: |
| pass |
| |
| |
| for alias in ["plan", "sections", "structure", "outline", "document", "research", "chapters", "content"]: |
| try: |
| full = json.loads(text) |
| if isinstance(full, dict) and alias in full: |
| val = full[alias] |
| if isinstance(val, list): |
| return {"plan": val, "summary": full.get("summary", "")} |
| elif isinstance(val, str): |
| return {"plan": [{"section": "Content", "content": val}], "summary": val[:200]} |
| except: |
| pass |
| |
| |
| depth = 0 |
| json_start = -1 |
| for i, c in enumerate(text): |
| if c == '{': |
| if depth == 0: |
| json_start = i |
| depth += 1 |
| elif c == '}': |
| depth -= 1 |
| if depth == 0 and json_start >= 0: |
| candidate = text[json_start:i+1] |
| try: |
| return json.loads(candidate) |
| except: |
| json_start = -1 |
| |
| |
| return {"plan": [{"section": "Research Report", "content": text[:5000]}], "summary": text[:500]} |
|
|
|
|
| def clean_agent_content(text: str) -> str: |
| """Remove monologue, think tags, loops, and other AI artifacts.""" |
| if not text: |
| return "" |
| |
| |
| text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL) |
| text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL) |
| |
| |
| prefixes = [ |
| "Here is", "Here's", "Below is", "The following", "I'll", |
| "Let me", "Sure", "Okay", "Alright", "Certainly", |
| "Claro", "Aquí está", "A continuación", "Voy a" |
| ] |
| for prefix in prefixes: |
| if text.startswith(prefix): |
| text = text[len(prefix):].lstrip(":").lstrip().lstrip("\n") |
| |
| |
| lines = text.split("\n") |
| unique_lines = [] |
| seen = set() |
| for line in lines: |
| normalized = line.strip().lower() |
| if normalized and normalized in seen: |
| continue |
| seen.add(normalized) |
| unique_lines.append(line) |
| |
| return "\n".join(unique_lines).strip() |
|
|
|
|
| def strip_latex(text: str) -> str: |
| """Remove LaTeX commands from text.""" |
| text = re.sub(r'\\\\[a-zA-Z]+\{[^}]*\}', '', text) |
| text = re.sub(r'\\[a-zA-Z]+', '', text) |
| text = re.sub(r'\$[^$]+\$', '', text) |
| text = re.sub(r'\$\$.*?\$\$', '', text, flags=re.DOTALL) |
| return text.strip() |
|
|
|
|
| def sanitize_latex(text: str) -> str: |
| """Sanitize text for LaTeX output.""" |
| text = text.replace('&', '\\&') |
| text = text.replace('%', '\\%') |
| text = text.replace('$', '\\$') |
| text = text.replace('#', '\\#') |
| text = text.replace('_', '\\_') |
| text = text.replace('{', '\\{') |
| text = text.replace('}', '\\}') |
| return text |
|
|
|
|
| def normalize_boolean(val: Any) -> Any: |
| """Normalize boolean-like values.""" |
| if isinstance(val, str): |
| val = val.strip().lower() |
| if val in ("true", "yes", "1", "on"): return True |
| if val in ("false", "no", "0", "off", ""): return False |
| return val |
|
|
|
|
| def clean_stop_words(text: str, stop_words: list = None) -> str: |
| """Remove stop words from text.""" |
| default_stops = [ |
| "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", |
| "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", |
| "el", "la", "los", "las", "un", "una", "y", "o", "pero", "en", "de", |
| "del", "con", "por", "para", "como", "que", "se", "su", "al" |
| ] |
| words = stop_words or default_stops |
| tokens = text.split() |
| filtered = [t for t in tokens if t.lower() not in words] |
| return " ".join(filtered) |
|
|
|
|
| async def with_retry(func, retries: int = 2, delay: float = 1.0, backoff: float = 2.0): |
| """Execute function with retry and exponential backoff.""" |
| last_error = None |
| for attempt in range(retries + 1): |
| try: |
| return await func() |
| except Exception as e: |
| last_error = e |
| if attempt < retries: |
| await asyncio.sleep(delay * (backoff ** attempt)) |
| raise last_error |
|
|
|
|
| def extract_research_plan(text: str) -> dict: |
| """Extract research plan from various response formats.""" |
| |
| parsed = robust_json_parse(text) |
| if parsed and "plan" in parsed: |
| return parsed |
| |
| |
| if isinstance(parsed, dict): |
| for key in ["sections", "structure", "outline", "document", "research", "chapters", "content"]: |
| if key in parsed: |
| val = parsed[key] |
| if isinstance(val, list): |
| return {"plan": val, "summary": parsed.get("summary", "")} |
| |
| |
| return { |
| "plan": [{"section": "Research Report", "content": text[:5000]}], |
| "summary": text[:500] |
| } |
|
|
|
|
| def is_plan_weak(plan: dict) -> bool: |
| """Check if plan needs retry (too few sections or short names).""" |
| items = plan.get("plan", []) |
| if not isinstance(items, list) or len(items) < 2: |
| return True |
| for item in items: |
| if not isinstance(item, dict): |
| return True |
| section = item.get("section", "") |
| if not section or len(section) < 5: |
| return True |
| return False |
|
|