GradioDemo / shared /utils /json_parser.py
eigentom
Initial Update
90c099b
"""
Robust JSON parsing utilities for LLM responses
"""
import json
import re
from typing import Any, Dict, List, Optional
def extract_json_from_text(text: str) -> Optional[str]:
"""
Extract JSON from text by removing markdown code block markers
Args:
text: Text that may contain JSON in markdown code blocks or plain JSON
Returns:
Extracted JSON string or None if not found
"""
if not text:
return None
text_stripped = text.strip()
# Try to parse as plain JSON first (no code blocks)
try:
json.loads(text_stripped)
return text_stripped
except json.JSONDecodeError:
pass
# Remove markdown code block markers: ```json ... ``` or ``` ... ```
if text_stripped.startswith('```json'):
# Remove ```json at start and ``` at end
if text_stripped.endswith('```'):
text_stripped = text_stripped[7:-3].strip()
else:
# No closing ```, just remove opening
text_stripped = text_stripped[7:].strip()
elif text_stripped.startswith('```'):
# Handle ``` ... ``` (without json label)
if text_stripped.endswith('```'):
text_stripped = text_stripped[3:-3].strip()
else:
return None
# Try to parse as JSON after removing code block markers
try:
json.loads(text_stripped)
return text_stripped
except json.JSONDecodeError:
return None
def parse_json_response(text: str, fallback: Any = None) -> Any:
"""
Parse JSON from LLM response with robust error handling
Args:
text: LLM response text
fallback: Fallback value if parsing fails
Returns:
Parsed JSON object or fallback
"""
if not text:
return fallback
# Extract JSON from text
json_str = extract_json_from_text(text)
if json_str is None:
return fallback
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
# Try to fix common JSON issues
json_str = fix_json_common_issues(json_str)
try:
return json.loads(json_str)
except json.JSONDecodeError:
return fallback
def fix_json_common_issues(json_str: str) -> str:
"""
Fix common JSON formatting issues
Args:
json_str: JSON string that may have issues
Returns:
Fixed JSON string
"""
# Remove trailing commas
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
# Fix single quotes to double quotes (basic)
json_str = re.sub(r"'(\w+)':", r'"\1":', json_str)
# Remove comments (basic)
json_str = re.sub(r'//.*?$', '', json_str, flags=re.MULTILINE)
json_str = re.sub(r'/\*.*?\*/', '', json_str, flags=re.DOTALL)
return json_str
def parse_keywords_json(response: str) -> List[str]:
"""
Parse keywords from JSON response
Expected format:
{"keywords": ["keyword1", "keyword2", ...]}
or
["keyword1", "keyword2", ...]
Args:
response: LLM response text
Returns:
List of keywords, or empty list if parsing fails
"""
if response is None:
return []
parsed = parse_json_response(response, fallback=None)
if parsed is None:
return []
# Handle dict format: {"keywords": [...]}
if isinstance(parsed, dict):
if "keywords" in parsed and isinstance(parsed["keywords"], list):
return parsed["keywords"][:5]
return []
# Handle list format: ["keyword1", "keyword2", ...]
if isinstance(parsed, list):
return parsed[:5]
return []
def parse_summary_json(response: str) -> str:
"""
Parse summary from JSON response
Expected format:
{"summary": "summary text"}
or
{"text": "summary text", "summary": "summary text"}
Args:
response: LLM response text
Returns:
Summary text
"""
parsed = parse_json_response(response, fallback=None)
if parsed is None:
# Fallback to text parsing
return response.strip()
if isinstance(parsed, dict):
# Try different possible keys
for key in ["summary", "text", "content", "description"]:
if key in parsed:
summary = str(parsed[key]).strip()
if summary:
return summary
# Fallback to text parsing
return response.strip()
def parse_review_json(response: str, review_format: str = "detailed") -> Dict[str, Any]:
"""
Parse review from JSON or markdown response
Expected formats:
- JSON: {"summary": "...", "soundness": 5, ...}
- Markdown: ## Summary\n\n...\n## Soundness\n\n...
Args:
response: LLM response text (JSON or markdown)
review_format: Review format type (detailed, summary, structured)
Returns:
Review dictionary with parsed fields
"""
# First try to parse as JSON
parsed = parse_json_response(response, fallback=None)
if parsed is not None and isinstance(parsed, dict):
# JSON format - ensure it has required fields
if "review" not in parsed:
parsed["review"] = response.strip()
return parsed
# If not JSON, try to parse as markdown
if "## " in response or "##" in response:
markdown_parsed = parse_review_markdown(response)
if len(markdown_parsed) > 1: # More than just "review" field
return markdown_parsed
# Fallback to text parsing
return {"review": response.strip()}
def parse_review_markdown(markdown_text: str) -> Dict[str, Any]:
"""
Parse review from markdown format with sections like:
## Summary
...
## Soundness
...
etc.
Args:
markdown_text: Markdown formatted review text
Returns:
Review dictionary with parsed fields
"""
review_dict = {"review": markdown_text.strip()}
# Pattern to match markdown sections: ## SectionName\n\ncontent
section_pattern = r'##\s*([^\n]+)\s*\n\n(.*?)(?=\n##\s*|$)'
matches = re.finditer(section_pattern, markdown_text, re.DOTALL)
for match in matches:
section_name = match.group(1).strip()
section_content = match.group(2).strip()
# Normalize section name (case-insensitive, remove extra spaces)
section_name_lower = section_name.lower()
# Map section names to dictionary keys
if "summary" in section_name_lower:
review_dict["summary"] = section_content
elif "soundness" in section_name_lower:
# Extract score - prioritize single float number (e.g., "3.0", "4.5")
# If format is "3 / 5" or "**3 / 5**", extract the number before the slash
score_val = None
lines = section_content.split('\n')
if lines:
first_line = lines[0].strip()
first_line_clean = re.sub(r'[`\*]', '', first_line)
# Try to match number at start that's NOT followed by "/"
num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean)
if num_match:
remaining = first_line_clean[len(num_match.group(0)):].strip()
if not remaining.startswith('/'):
try:
score_val = float(num_match.group(1))
except (ValueError, IndexError):
pass
# If not found and there's a "/", try to extract number before "/" (e.g., "3 / 5" -> 3)
if score_val is None and '/' in first_line_clean:
fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean)
if fraction_match:
try:
score_val = float(fraction_match.group(1))
except (ValueError, IndexError):
pass
# If not found, try to find number after "score:" or "rating:"
if score_val is None:
score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE)
if score_match:
try:
score_val = float(score_match.group(1))
except (ValueError, IndexError):
pass
if score_val is not None:
review_dict["soundness"] = score_val # Keep as float
elif "presentation" in section_name_lower:
score_val = None
lines = section_content.split('\n')
if lines:
first_line = lines[0].strip()
first_line_clean = re.sub(r'[`\*]', '', first_line)
num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean)
if num_match:
remaining = first_line_clean[len(num_match.group(0)):].strip()
if not remaining.startswith('/'):
try:
score_val = float(num_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None and '/' in first_line_clean:
fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean)
if fraction_match:
try:
score_val = float(fraction_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None:
score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE)
if score_match:
try:
score_val = float(score_match.group(1))
except (ValueError, IndexError):
pass
if score_val is not None:
review_dict["presentation"] = score_val
elif "contribution" in section_name_lower:
score_val = None
lines = section_content.split('\n')
if lines:
first_line = lines[0].strip()
first_line_clean = re.sub(r'[`\*]', '', first_line)
num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean)
if num_match:
remaining = first_line_clean[len(num_match.group(0)):].strip()
if not remaining.startswith('/'):
try:
score_val = float(num_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None and '/' in first_line_clean:
fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean)
if fraction_match:
try:
score_val = float(fraction_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None:
score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE)
if score_match:
try:
score_val = float(score_match.group(1))
except (ValueError, IndexError):
pass
if score_val is not None:
review_dict["contribution"] = score_val
elif "strength" in section_name_lower:
review_dict["strengths"] = section_content
elif "weakness" in section_name_lower:
review_dict["weaknesses"] = section_content
elif "question" in section_name_lower:
review_dict["questions"] = section_content
elif "rating" in section_name_lower and "confidence" not in section_name_lower:
score_val = None
lines = section_content.split('\n')
if lines:
first_line = lines[0].strip()
first_line_clean = re.sub(r'[`\*]', '', first_line)
num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean)
if num_match:
remaining = first_line_clean[len(num_match.group(0)):].strip()
if not remaining.startswith('/'):
try:
score_val = float(num_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None and '/' in first_line_clean:
fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean)
if fraction_match:
try:
score_val = float(fraction_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None:
score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE)
if score_match:
try:
score_val = float(score_match.group(1))
except (ValueError, IndexError):
pass
if score_val is not None:
review_dict["rating"] = score_val
elif "confidence" in section_name_lower:
score_val = None
lines = section_content.split('\n')
if lines:
first_line = lines[0].strip()
first_line_clean = re.sub(r'[`\*]', '', first_line)
num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean)
if num_match:
remaining = first_line_clean[len(num_match.group(0)):].strip()
if not remaining.startswith('/'):
try:
score_val = float(num_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None and '/' in first_line_clean:
fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean)
if fraction_match:
try:
score_val = float(fraction_match.group(1))
except (ValueError, IndexError):
pass
if score_val is None:
score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE)
if score_match:
try:
score_val = float(score_match.group(1))
except (ValueError, IndexError):
pass
if score_val is not None:
review_dict["confidence"] = score_val
elif "decision" in section_name_lower:
review_dict["decision"] = section_content
return review_dict