PITCHFIGHT_AI / core /json_utils.py
Aspectgg's picture
Prepare PitchFight AI completion
8fab536
Raw
History Blame Contribute Delete
11.8 kB
"""JSON parsing utilities with safe fallbacks."""
from __future__ import annotations
import json
import logging
import re
from typing import Any
logger = logging.getLogger(__name__)
def strip_markdown_fences(text: str) -> str:
"""Remove markdown code fences and trim surrounding whitespace."""
if not text:
return ""
stripped = text.strip()
fenced = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", stripped, re.IGNORECASE)
if fenced:
return fenced.group(1).strip()
# Strip lone opening/closing fence lines
lines = stripped.splitlines()
if lines and lines[0].strip().startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
return "\n".join(lines).strip()
def _balanced_blocks(text: str, opener: str, closer: str) -> list[str]:
"""Return all balanced opener/closer blocks found in text."""
blocks: list[str] = []
for start in range(len(text)):
if text[start] != opener:
continue
depth = 0
for index in range(start, len(text)):
char = text[index]
if char == opener:
depth += 1
elif char == closer:
depth -= 1
if depth == 0:
blocks.append(text[start : index + 1])
break
return blocks
def extract_largest_json_object(text: str) -> str | None:
"""Extract the largest parseable JSON object from mixed model output."""
if not text:
return None
cleaned = strip_markdown_fences(text)
candidates = _balanced_blocks(cleaned, "{", "}")
if not candidates:
return None
# Prefer the largest block that parses cleanly
for block in sorted(candidates, key=len, reverse=True):
try:
parsed = json.loads(block)
if isinstance(parsed, dict):
return block
except json.JSONDecodeError:
continue
# Fall back to largest balanced block even if not yet parseable
return max(candidates, key=len)
def extract_json_block(text: str) -> str | None:
"""Extract the largest JSON object block from text (legacy name, improved behavior)."""
if not text:
return None
return extract_largest_json_object(text)
def sanitize_for_log(text: str, limit: int = 200) -> str:
"""Return a safe preview string for debug logs (no secrets, truncated)."""
preview = strip_markdown_fences(text or "")
preview = re.sub(r"\s+", " ", preview).strip()
return preview[:limit]
def safe_json_parse(text: str, default: Any = None) -> Any:
"""Parse JSON from raw text, attempting block extraction on failure."""
if default is None:
default = {}
if not text:
return default
cleaned = strip_markdown_fences(text)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
pass
block = extract_largest_json_object(cleaned)
if not block:
return default
try:
return json.loads(block)
except json.JSONDecodeError:
return default
def ends_abruptly(text: str) -> bool:
"""Return True if text looks cut off mid-sentence."""
t = (text or "").strip()
if not t:
return True
if t[-1] in ".!?":
return False
if len(t) < 50:
return True
last_word = t.split()[-1] if t.split() else ""
return len(last_word) <= 2 and len(t) < 80
def normalize_parsed_root(parsed: Any) -> dict[str, Any] | None:
"""Unwrap array-wrapped or nested model JSON into a single object."""
if isinstance(parsed, dict):
return parsed
if isinstance(parsed, list):
for item in parsed:
if isinstance(item, dict) and item:
return item
return None
def extract_partial_string_fields(text: str, keys: list[str]) -> dict[str, str]:
"""Best-effort regex extraction of string fields from truncated JSON."""
if not text:
return {}
cleaned = strip_markdown_fences(text)
found: dict[str, str] = {}
for key in keys:
pattern = rf'"{re.escape(key)}"\s*:\s*"((?:[^"\\]|\\.)*)"'
match = re.search(pattern, cleaned, re.DOTALL)
if match:
try:
found[key] = json.loads(f'"{match.group(1)}"')
except json.JSONDecodeError:
found[key] = match.group(1).replace('\\"', '"').strip()
return found
def extract_partial_string_list(text: str, key: str, min_items: int = 1) -> list[str]:
"""Extract a JSON string array field from truncated output."""
if not text:
return []
cleaned = strip_markdown_fences(text)
match = re.search(rf'"{re.escape(key)}"\s*:\s*\[([\s\S]*?)\]', cleaned)
if not match:
return []
items: list[str] = []
for item_match in re.finditer(r'"((?:[^"\\]|\\.)*)"', match.group(1)):
try:
items.append(json.loads(f'"{item_match.group(1)}"'))
except json.JSONDecodeError:
items.append(item_match.group(1).replace('\\"', '"').strip())
return [i for i in items if i][:max(min_items, 8)]
def parse_json_object(
text: str,
reasoning_fallback: str | None = None,
string_fields: list[str] | None = None,
) -> dict[str, Any]:
"""Parse model output into a dict using multiple extraction strategies."""
parsed, _ = parse_model_json(text, reasoning_fallback=reasoning_fallback)
root = normalize_parsed_root(parsed)
if root:
return root
partial = extract_partial_string_fields(text, string_fields or [])
if partial:
return partial
fallback = safe_json_parse(text)
root = normalize_parsed_root(fallback)
return root if root else {}
def parse_model_json(
text: str,
reasoning_fallback: str | None = None,
) -> tuple[Any, bool]:
"""Parse model JSON output with extraction fallbacks.
Returns (parsed_value, repair_needed).
repair_needed is True when direct parse failed and extraction/reasoning was used.
"""
default: dict[str, Any] = {}
if not text and not reasoning_fallback:
return default, False
content = strip_markdown_fences(text or "")
repair_needed = False
if content:
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
return parsed, False
if isinstance(parsed, list) and len(parsed) == 1 and isinstance(parsed[0], dict):
return parsed[0], True
if isinstance(parsed, list):
return parsed, True
except json.JSONDecodeError:
repair_needed = True
block = extract_largest_json_object(content)
if block:
try:
parsed = json.loads(block)
if isinstance(parsed, (dict, list)):
return parsed, repair_needed
except json.JSONDecodeError:
pass
if reasoning_fallback:
fb = strip_markdown_fences(reasoning_fallback)
block = extract_largest_json_object(fb)
if block:
try:
parsed = json.loads(block)
if isinstance(parsed, (dict, list)):
logger.info(
"json_utils: parsed JSON from reasoning_content fallback (len=%d)",
len(fb),
)
return parsed, True
except json.JSONDecodeError:
pass
return default, True
def fallback_scorecard() -> dict[str, Any]:
"""Return a minimal scorecard when model JSON parsing fails."""
return {
"overall": 0,
"scores": {},
"best_answer": "No scorecard could be generated.",
"weakest_answer": "",
"improved_answer": "",
"improved_pitch": "",
"top_3_questions": [],
}
_REQUIRED_SCORECARD_DIMS = {
"clarity",
"problem_understanding",
"market_awareness",
"differentiation",
"business_model",
"objection_handling",
}
def _coerce_score(value: Any) -> int:
"""Clamp a raw score value to integer 0–100."""
try:
return max(0, min(100, int(float(value))))
except (TypeError, ValueError):
return 0
def _score_label(score: int) -> str:
"""Map an integer score 0–100 to a human-readable label.
Phase 5C bands (claim-based calibration):
0–30: Not addressed
31–50: Developing
51–70: Solid
71–85: Strong
86–100: Excellent
"""
if score <= 30:
return "Not addressed"
if score <= 50:
return "Developing"
if score <= 70:
return "Solid"
if score <= 85:
return "Strong"
return "Excellent"
def _validate_dim(raw: Any) -> dict[str, Any]:
"""Normalise a raw score dimension into {score, label, reason, quote, signals_used}."""
if not isinstance(raw, dict):
return {
"score": 0,
"label": _score_label(0),
"reason": "No data.",
"quote": "",
"signals_used": [],
}
score = _coerce_score(raw.get("score", 0))
raw_signals = raw.get("signals_used", [])
signals = (
[str(s).strip() for s in raw_signals if str(s).strip()]
if isinstance(raw_signals, list)
else []
)
return {
"score": score,
"label": _score_label(score),
"reason": str(raw.get("reason", "")).strip() or "No reasoning provided.",
"quote": str(raw.get("quote", "")).strip(),
"signals_used": signals[:8],
}
def parse_scorecard_json(raw_text: str) -> dict[str, Any] | None:
"""Parse and validate Nemotron scorecard JSON.
Fallback order:
1. json.loads(raw_text)
2. extract_json_block → json.loads
3. safe_json_parse
Returns a validated dict with all required keys, or None if parsing fails
completely so the caller can fall back to mock_scorecard.
Voice mode note:
This function is input-source agnostic — it receives only the text
output from the model and does not need to change for voice mode.
"""
parsed = safe_json_parse(raw_text)
if not parsed or not isinstance(parsed, dict):
return None
# Validate and normalise scores dict
raw_scores = parsed.get("scores", {})
if not isinstance(raw_scores, dict):
raw_scores = {}
scores: dict[str, Any] = {}
for dim in _REQUIRED_SCORECARD_DIMS:
scores[dim] = _validate_dim(raw_scores.get(dim))
# overall: prefer explicit field, else average of dimension scores
if "overall" in parsed and parsed["overall"] is not None:
overall = _coerce_score(parsed["overall"])
else:
dim_scores = [scores[d]["score"] for d in _REQUIRED_SCORECARD_DIMS]
overall = round(sum(dim_scores) / len(dim_scores)) if dim_scores else 0
def _str(key: str, default: str = "") -> str:
return str(parsed.get(key, default)).strip() or default
def _list_of_str(key: str) -> list[str]:
val = parsed.get(key, [])
if isinstance(val, list):
return [str(v).strip() for v in val if str(v).strip()]
return []
top_3 = _list_of_str("top_3_questions")[:3]
# Pad to 3 if model returned fewer
while len(top_3) < 3:
top_3.append("What concrete evidence do you have to support this claim?")
return {
"overall": overall,
"overall_label": _score_label(overall),
"scores": scores,
"best_answer": _str("best_answer", "Not identified."),
"weakest_answer": _str("weakest_answer", "Not identified."),
"why_weak": _str("why_weak", ""),
"improved_answer": _str("improved_answer", ""),
"improved_pitch": _str("improved_pitch", ""),
"top_3_questions": top_3,
}