| """JSON parsing utilities with safe fallbacks.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import re |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def strip_markdown_fences(text: str) -> str: |
| """Remove markdown code fences and trim surrounding whitespace.""" |
| if not text: |
| return "" |
| stripped = text.strip() |
| fenced = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", stripped, re.IGNORECASE) |
| if fenced: |
| return fenced.group(1).strip() |
| |
| lines = stripped.splitlines() |
| if lines and lines[0].strip().startswith("```"): |
| lines = lines[1:] |
| if lines and lines[-1].strip() == "```": |
| lines = lines[:-1] |
| return "\n".join(lines).strip() |
|
|
|
|
| def _balanced_blocks(text: str, opener: str, closer: str) -> list[str]: |
| """Return all balanced opener/closer blocks found in text.""" |
| blocks: list[str] = [] |
| for start in range(len(text)): |
| if text[start] != opener: |
| continue |
| depth = 0 |
| for index in range(start, len(text)): |
| char = text[index] |
| if char == opener: |
| depth += 1 |
| elif char == closer: |
| depth -= 1 |
| if depth == 0: |
| blocks.append(text[start : index + 1]) |
| break |
| return blocks |
|
|
|
|
| def extract_largest_json_object(text: str) -> str | None: |
| """Extract the largest parseable JSON object from mixed model output.""" |
| if not text: |
| return None |
|
|
| cleaned = strip_markdown_fences(text) |
| candidates = _balanced_blocks(cleaned, "{", "}") |
| if not candidates: |
| return None |
|
|
| |
| for block in sorted(candidates, key=len, reverse=True): |
| try: |
| parsed = json.loads(block) |
| if isinstance(parsed, dict): |
| return block |
| except json.JSONDecodeError: |
| continue |
|
|
| |
| return max(candidates, key=len) |
|
|
|
|
| def extract_json_block(text: str) -> str | None: |
| """Extract the largest JSON object block from text (legacy name, improved behavior).""" |
| if not text: |
| return None |
| return extract_largest_json_object(text) |
|
|
|
|
| def sanitize_for_log(text: str, limit: int = 200) -> str: |
| """Return a safe preview string for debug logs (no secrets, truncated).""" |
| preview = strip_markdown_fences(text or "") |
| preview = re.sub(r"\s+", " ", preview).strip() |
| return preview[:limit] |
|
|
|
|
| def safe_json_parse(text: str, default: Any = None) -> Any: |
| """Parse JSON from raw text, attempting block extraction on failure.""" |
| if default is None: |
| default = {} |
|
|
| if not text: |
| return default |
|
|
| cleaned = strip_markdown_fences(text) |
|
|
| try: |
| return json.loads(cleaned) |
| except json.JSONDecodeError: |
| pass |
|
|
| block = extract_largest_json_object(cleaned) |
| if not block: |
| return default |
| try: |
| return json.loads(block) |
| except json.JSONDecodeError: |
| return default |
|
|
|
|
| def ends_abruptly(text: str) -> bool: |
| """Return True if text looks cut off mid-sentence.""" |
| t = (text or "").strip() |
| if not t: |
| return True |
| if t[-1] in ".!?": |
| return False |
| if len(t) < 50: |
| return True |
| last_word = t.split()[-1] if t.split() else "" |
| return len(last_word) <= 2 and len(t) < 80 |
|
|
|
|
| def normalize_parsed_root(parsed: Any) -> dict[str, Any] | None: |
| """Unwrap array-wrapped or nested model JSON into a single object.""" |
| if isinstance(parsed, dict): |
| return parsed |
| if isinstance(parsed, list): |
| for item in parsed: |
| if isinstance(item, dict) and item: |
| return item |
| return None |
|
|
|
|
| def extract_partial_string_fields(text: str, keys: list[str]) -> dict[str, str]: |
| """Best-effort regex extraction of string fields from truncated JSON.""" |
| if not text: |
| return {} |
| cleaned = strip_markdown_fences(text) |
| found: dict[str, str] = {} |
| for key in keys: |
| pattern = rf'"{re.escape(key)}"\s*:\s*"((?:[^"\\]|\\.)*)"' |
| match = re.search(pattern, cleaned, re.DOTALL) |
| if match: |
| try: |
| found[key] = json.loads(f'"{match.group(1)}"') |
| except json.JSONDecodeError: |
| found[key] = match.group(1).replace('\\"', '"').strip() |
| return found |
|
|
|
|
| def extract_partial_string_list(text: str, key: str, min_items: int = 1) -> list[str]: |
| """Extract a JSON string array field from truncated output.""" |
| if not text: |
| return [] |
| cleaned = strip_markdown_fences(text) |
| match = re.search(rf'"{re.escape(key)}"\s*:\s*\[([\s\S]*?)\]', cleaned) |
| if not match: |
| return [] |
| items: list[str] = [] |
| for item_match in re.finditer(r'"((?:[^"\\]|\\.)*)"', match.group(1)): |
| try: |
| items.append(json.loads(f'"{item_match.group(1)}"')) |
| except json.JSONDecodeError: |
| items.append(item_match.group(1).replace('\\"', '"').strip()) |
| return [i for i in items if i][:max(min_items, 8)] |
|
|
|
|
| def parse_json_object( |
| text: str, |
| reasoning_fallback: str | None = None, |
| string_fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| """Parse model output into a dict using multiple extraction strategies.""" |
| parsed, _ = parse_model_json(text, reasoning_fallback=reasoning_fallback) |
| root = normalize_parsed_root(parsed) |
| if root: |
| return root |
|
|
| partial = extract_partial_string_fields(text, string_fields or []) |
| if partial: |
| return partial |
|
|
| fallback = safe_json_parse(text) |
| root = normalize_parsed_root(fallback) |
| return root if root else {} |
|
|
|
|
| def parse_model_json( |
| text: str, |
| reasoning_fallback: str | None = None, |
| ) -> tuple[Any, bool]: |
| """Parse model JSON output with extraction fallbacks. |
| |
| Returns (parsed_value, repair_needed). |
| repair_needed is True when direct parse failed and extraction/reasoning was used. |
| """ |
| default: dict[str, Any] = {} |
| if not text and not reasoning_fallback: |
| return default, False |
|
|
| content = strip_markdown_fences(text or "") |
| repair_needed = False |
|
|
| if content: |
| try: |
| parsed = json.loads(content) |
| if isinstance(parsed, dict): |
| return parsed, False |
| if isinstance(parsed, list) and len(parsed) == 1 and isinstance(parsed[0], dict): |
| return parsed[0], True |
| if isinstance(parsed, list): |
| return parsed, True |
| except json.JSONDecodeError: |
| repair_needed = True |
|
|
| block = extract_largest_json_object(content) |
| if block: |
| try: |
| parsed = json.loads(block) |
| if isinstance(parsed, (dict, list)): |
| return parsed, repair_needed |
| except json.JSONDecodeError: |
| pass |
|
|
| if reasoning_fallback: |
| fb = strip_markdown_fences(reasoning_fallback) |
| block = extract_largest_json_object(fb) |
| if block: |
| try: |
| parsed = json.loads(block) |
| if isinstance(parsed, (dict, list)): |
| logger.info( |
| "json_utils: parsed JSON from reasoning_content fallback (len=%d)", |
| len(fb), |
| ) |
| return parsed, True |
| except json.JSONDecodeError: |
| pass |
|
|
| return default, True |
|
|
|
|
| def fallback_scorecard() -> dict[str, Any]: |
| """Return a minimal scorecard when model JSON parsing fails.""" |
| return { |
| "overall": 0, |
| "scores": {}, |
| "best_answer": "No scorecard could be generated.", |
| "weakest_answer": "", |
| "improved_answer": "", |
| "improved_pitch": "", |
| "top_3_questions": [], |
| } |
|
|
|
|
| _REQUIRED_SCORECARD_DIMS = { |
| "clarity", |
| "problem_understanding", |
| "market_awareness", |
| "differentiation", |
| "business_model", |
| "objection_handling", |
| } |
|
|
|
|
| def _coerce_score(value: Any) -> int: |
| """Clamp a raw score value to integer 0–100.""" |
| try: |
| return max(0, min(100, int(float(value)))) |
| except (TypeError, ValueError): |
| return 0 |
|
|
|
|
| def _score_label(score: int) -> str: |
| """Map an integer score 0–100 to a human-readable label. |
| |
| Phase 5C bands (claim-based calibration): |
| 0–30: Not addressed |
| 31–50: Developing |
| 51–70: Solid |
| 71–85: Strong |
| 86–100: Excellent |
| """ |
| if score <= 30: |
| return "Not addressed" |
| if score <= 50: |
| return "Developing" |
| if score <= 70: |
| return "Solid" |
| if score <= 85: |
| return "Strong" |
| return "Excellent" |
|
|
|
|
| def _validate_dim(raw: Any) -> dict[str, Any]: |
| """Normalise a raw score dimension into {score, label, reason, quote, signals_used}.""" |
| if not isinstance(raw, dict): |
| return { |
| "score": 0, |
| "label": _score_label(0), |
| "reason": "No data.", |
| "quote": "", |
| "signals_used": [], |
| } |
| score = _coerce_score(raw.get("score", 0)) |
| raw_signals = raw.get("signals_used", []) |
| signals = ( |
| [str(s).strip() for s in raw_signals if str(s).strip()] |
| if isinstance(raw_signals, list) |
| else [] |
| ) |
| return { |
| "score": score, |
| "label": _score_label(score), |
| "reason": str(raw.get("reason", "")).strip() or "No reasoning provided.", |
| "quote": str(raw.get("quote", "")).strip(), |
| "signals_used": signals[:8], |
| } |
|
|
|
|
| def parse_scorecard_json(raw_text: str) -> dict[str, Any] | None: |
| """Parse and validate Nemotron scorecard JSON. |
| |
| Fallback order: |
| 1. json.loads(raw_text) |
| 2. extract_json_block → json.loads |
| 3. safe_json_parse |
| |
| Returns a validated dict with all required keys, or None if parsing fails |
| completely so the caller can fall back to mock_scorecard. |
| |
| Voice mode note: |
| This function is input-source agnostic — it receives only the text |
| output from the model and does not need to change for voice mode. |
| """ |
| parsed = safe_json_parse(raw_text) |
| if not parsed or not isinstance(parsed, dict): |
| return None |
|
|
| |
| raw_scores = parsed.get("scores", {}) |
| if not isinstance(raw_scores, dict): |
| raw_scores = {} |
|
|
| scores: dict[str, Any] = {} |
| for dim in _REQUIRED_SCORECARD_DIMS: |
| scores[dim] = _validate_dim(raw_scores.get(dim)) |
|
|
| |
| if "overall" in parsed and parsed["overall"] is not None: |
| overall = _coerce_score(parsed["overall"]) |
| else: |
| dim_scores = [scores[d]["score"] for d in _REQUIRED_SCORECARD_DIMS] |
| overall = round(sum(dim_scores) / len(dim_scores)) if dim_scores else 0 |
|
|
| def _str(key: str, default: str = "") -> str: |
| return str(parsed.get(key, default)).strip() or default |
|
|
| def _list_of_str(key: str) -> list[str]: |
| val = parsed.get(key, []) |
| if isinstance(val, list): |
| return [str(v).strip() for v in val if str(v).strip()] |
| return [] |
|
|
| top_3 = _list_of_str("top_3_questions")[:3] |
| |
| while len(top_3) < 3: |
| top_3.append("What concrete evidence do you have to support this claim?") |
|
|
| return { |
| "overall": overall, |
| "overall_label": _score_label(overall), |
| "scores": scores, |
| "best_answer": _str("best_answer", "Not identified."), |
| "weakest_answer": _str("weakest_answer", "Not identified."), |
| "why_weak": _str("why_weak", ""), |
| "improved_answer": _str("improved_answer", ""), |
| "improved_pitch": _str("improved_pitch", ""), |
| "top_3_questions": top_3, |
| } |
|
|