Spaces:
Sleeping
Sleeping
| """ | |
| Robust JSON parsing utilities for LLM responses | |
| """ | |
| import json | |
| import re | |
| from typing import Any, Dict, List, Optional | |
| def extract_json_from_text(text: str) -> Optional[str]: | |
| """ | |
| Extract JSON from text by removing markdown code block markers | |
| Args: | |
| text: Text that may contain JSON in markdown code blocks or plain JSON | |
| Returns: | |
| Extracted JSON string or None if not found | |
| """ | |
| if not text: | |
| return None | |
| text_stripped = text.strip() | |
| # Try to parse as plain JSON first (no code blocks) | |
| try: | |
| json.loads(text_stripped) | |
| return text_stripped | |
| except json.JSONDecodeError: | |
| pass | |
| # Remove markdown code block markers: ```json ... ``` or ``` ... ``` | |
| if text_stripped.startswith('```json'): | |
| # Remove ```json at start and ``` at end | |
| if text_stripped.endswith('```'): | |
| text_stripped = text_stripped[7:-3].strip() | |
| else: | |
| # No closing ```, just remove opening | |
| text_stripped = text_stripped[7:].strip() | |
| elif text_stripped.startswith('```'): | |
| # Handle ``` ... ``` (without json label) | |
| if text_stripped.endswith('```'): | |
| text_stripped = text_stripped[3:-3].strip() | |
| else: | |
| return None | |
| # Try to parse as JSON after removing code block markers | |
| try: | |
| json.loads(text_stripped) | |
| return text_stripped | |
| except json.JSONDecodeError: | |
| return None | |
| def parse_json_response(text: str, fallback: Any = None) -> Any: | |
| """ | |
| Parse JSON from LLM response with robust error handling | |
| Args: | |
| text: LLM response text | |
| fallback: Fallback value if parsing fails | |
| Returns: | |
| Parsed JSON object or fallback | |
| """ | |
| if not text: | |
| return fallback | |
| # Extract JSON from text | |
| json_str = extract_json_from_text(text) | |
| if json_str is None: | |
| return fallback | |
| try: | |
| return json.loads(json_str) | |
| except json.JSONDecodeError as e: | |
| # Try to fix common JSON issues | |
| json_str = fix_json_common_issues(json_str) | |
| try: | |
| return json.loads(json_str) | |
| except json.JSONDecodeError: | |
| return fallback | |
| def fix_json_common_issues(json_str: str) -> str: | |
| """ | |
| Fix common JSON formatting issues | |
| Args: | |
| json_str: JSON string that may have issues | |
| Returns: | |
| Fixed JSON string | |
| """ | |
| # Remove trailing commas | |
| json_str = re.sub(r',\s*}', '}', json_str) | |
| json_str = re.sub(r',\s*]', ']', json_str) | |
| # Fix single quotes to double quotes (basic) | |
| json_str = re.sub(r"'(\w+)':", r'"\1":', json_str) | |
| # Remove comments (basic) | |
| json_str = re.sub(r'//.*?$', '', json_str, flags=re.MULTILINE) | |
| json_str = re.sub(r'/\*.*?\*/', '', json_str, flags=re.DOTALL) | |
| return json_str | |
| def parse_keywords_json(response: str) -> List[str]: | |
| """ | |
| Parse keywords from JSON response | |
| Expected format: | |
| {"keywords": ["keyword1", "keyword2", ...]} | |
| or | |
| ["keyword1", "keyword2", ...] | |
| Args: | |
| response: LLM response text | |
| Returns: | |
| List of keywords, or empty list if parsing fails | |
| """ | |
| if response is None: | |
| return [] | |
| parsed = parse_json_response(response, fallback=None) | |
| if parsed is None: | |
| return [] | |
| # Handle dict format: {"keywords": [...]} | |
| if isinstance(parsed, dict): | |
| if "keywords" in parsed and isinstance(parsed["keywords"], list): | |
| return parsed["keywords"][:5] | |
| return [] | |
| # Handle list format: ["keyword1", "keyword2", ...] | |
| if isinstance(parsed, list): | |
| return parsed[:5] | |
| return [] | |
| def parse_summary_json(response: str) -> str: | |
| """ | |
| Parse summary from JSON response | |
| Expected format: | |
| {"summary": "summary text"} | |
| or | |
| {"text": "summary text", "summary": "summary text"} | |
| Args: | |
| response: LLM response text | |
| Returns: | |
| Summary text | |
| """ | |
| parsed = parse_json_response(response, fallback=None) | |
| if parsed is None: | |
| # Fallback to text parsing | |
| return response.strip() | |
| if isinstance(parsed, dict): | |
| # Try different possible keys | |
| for key in ["summary", "text", "content", "description"]: | |
| if key in parsed: | |
| summary = str(parsed[key]).strip() | |
| if summary: | |
| return summary | |
| # Fallback to text parsing | |
| return response.strip() | |
| def parse_review_json(response: str, review_format: str = "detailed") -> Dict[str, Any]: | |
| """ | |
| Parse review from JSON or markdown response | |
| Expected formats: | |
| - JSON: {"summary": "...", "soundness": 5, ...} | |
| - Markdown: ## Summary\n\n...\n## Soundness\n\n... | |
| Args: | |
| response: LLM response text (JSON or markdown) | |
| review_format: Review format type (detailed, summary, structured) | |
| Returns: | |
| Review dictionary with parsed fields | |
| """ | |
| # First try to parse as JSON | |
| parsed = parse_json_response(response, fallback=None) | |
| if parsed is not None and isinstance(parsed, dict): | |
| # JSON format - ensure it has required fields | |
| if "review" not in parsed: | |
| parsed["review"] = response.strip() | |
| return parsed | |
| # If not JSON, try to parse as markdown | |
| if "## " in response or "##" in response: | |
| markdown_parsed = parse_review_markdown(response) | |
| if len(markdown_parsed) > 1: # More than just "review" field | |
| return markdown_parsed | |
| # Fallback to text parsing | |
| return {"review": response.strip()} | |
| def parse_review_markdown(markdown_text: str) -> Dict[str, Any]: | |
| """ | |
| Parse review from markdown format with sections like: | |
| ## Summary | |
| ... | |
| ## Soundness | |
| ... | |
| etc. | |
| Args: | |
| markdown_text: Markdown formatted review text | |
| Returns: | |
| Review dictionary with parsed fields | |
| """ | |
| review_dict = {"review": markdown_text.strip()} | |
| # Pattern to match markdown sections: ## SectionName\n\ncontent | |
| section_pattern = r'##\s*([^\n]+)\s*\n\n(.*?)(?=\n##\s*|$)' | |
| matches = re.finditer(section_pattern, markdown_text, re.DOTALL) | |
| for match in matches: | |
| section_name = match.group(1).strip() | |
| section_content = match.group(2).strip() | |
| # Normalize section name (case-insensitive, remove extra spaces) | |
| section_name_lower = section_name.lower() | |
| # Map section names to dictionary keys | |
| if "summary" in section_name_lower: | |
| review_dict["summary"] = section_content | |
| elif "soundness" in section_name_lower: | |
| # Extract score - prioritize single float number (e.g., "3.0", "4.5") | |
| # If format is "3 / 5" or "**3 / 5**", extract the number before the slash | |
| score_val = None | |
| lines = section_content.split('\n') | |
| if lines: | |
| first_line = lines[0].strip() | |
| first_line_clean = re.sub(r'[`\*]', '', first_line) | |
| # Try to match number at start that's NOT followed by "/" | |
| num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean) | |
| if num_match: | |
| remaining = first_line_clean[len(num_match.group(0)):].strip() | |
| if not remaining.startswith('/'): | |
| try: | |
| score_val = float(num_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| # If not found and there's a "/", try to extract number before "/" (e.g., "3 / 5" -> 3) | |
| if score_val is None and '/' in first_line_clean: | |
| fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean) | |
| if fraction_match: | |
| try: | |
| score_val = float(fraction_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| # If not found, try to find number after "score:" or "rating:" | |
| if score_val is None: | |
| score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE) | |
| if score_match: | |
| try: | |
| score_val = float(score_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is not None: | |
| review_dict["soundness"] = score_val # Keep as float | |
| elif "presentation" in section_name_lower: | |
| score_val = None | |
| lines = section_content.split('\n') | |
| if lines: | |
| first_line = lines[0].strip() | |
| first_line_clean = re.sub(r'[`\*]', '', first_line) | |
| num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean) | |
| if num_match: | |
| remaining = first_line_clean[len(num_match.group(0)):].strip() | |
| if not remaining.startswith('/'): | |
| try: | |
| score_val = float(num_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None and '/' in first_line_clean: | |
| fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean) | |
| if fraction_match: | |
| try: | |
| score_val = float(fraction_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None: | |
| score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE) | |
| if score_match: | |
| try: | |
| score_val = float(score_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is not None: | |
| review_dict["presentation"] = score_val | |
| elif "contribution" in section_name_lower: | |
| score_val = None | |
| lines = section_content.split('\n') | |
| if lines: | |
| first_line = lines[0].strip() | |
| first_line_clean = re.sub(r'[`\*]', '', first_line) | |
| num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean) | |
| if num_match: | |
| remaining = first_line_clean[len(num_match.group(0)):].strip() | |
| if not remaining.startswith('/'): | |
| try: | |
| score_val = float(num_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None and '/' in first_line_clean: | |
| fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean) | |
| if fraction_match: | |
| try: | |
| score_val = float(fraction_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None: | |
| score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE) | |
| if score_match: | |
| try: | |
| score_val = float(score_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is not None: | |
| review_dict["contribution"] = score_val | |
| elif "strength" in section_name_lower: | |
| review_dict["strengths"] = section_content | |
| elif "weakness" in section_name_lower: | |
| review_dict["weaknesses"] = section_content | |
| elif "question" in section_name_lower: | |
| review_dict["questions"] = section_content | |
| elif "rating" in section_name_lower and "confidence" not in section_name_lower: | |
| score_val = None | |
| lines = section_content.split('\n') | |
| if lines: | |
| first_line = lines[0].strip() | |
| first_line_clean = re.sub(r'[`\*]', '', first_line) | |
| num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean) | |
| if num_match: | |
| remaining = first_line_clean[len(num_match.group(0)):].strip() | |
| if not remaining.startswith('/'): | |
| try: | |
| score_val = float(num_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None and '/' in first_line_clean: | |
| fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean) | |
| if fraction_match: | |
| try: | |
| score_val = float(fraction_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None: | |
| score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE) | |
| if score_match: | |
| try: | |
| score_val = float(score_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is not None: | |
| review_dict["rating"] = score_val | |
| elif "confidence" in section_name_lower: | |
| score_val = None | |
| lines = section_content.split('\n') | |
| if lines: | |
| first_line = lines[0].strip() | |
| first_line_clean = re.sub(r'[`\*]', '', first_line) | |
| num_match = re.match(r'^(\d+\.?\d*)(\s*)', first_line_clean) | |
| if num_match: | |
| remaining = first_line_clean[len(num_match.group(0)):].strip() | |
| if not remaining.startswith('/'): | |
| try: | |
| score_val = float(num_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None and '/' in first_line_clean: | |
| fraction_match = re.match(r'^\s*[`\*]*\s*(\d+\.?\d*)\s*[`\*]*\s*/\s*\d+', first_line_clean) | |
| if fraction_match: | |
| try: | |
| score_val = float(fraction_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is None: | |
| score_match = re.search(r'(?:score|rating)\s*[:=]\s*(\d+\.?\d*)', section_content, re.IGNORECASE) | |
| if score_match: | |
| try: | |
| score_val = float(score_match.group(1)) | |
| except (ValueError, IndexError): | |
| pass | |
| if score_val is not None: | |
| review_dict["confidence"] = score_val | |
| elif "decision" in section_name_lower: | |
| review_dict["decision"] = section_content | |
| return review_dict | |