Spaces:
Build error
Build error
| import re | |
| import json | |
| def parse_expected_output_fields(expected_output_text): | |
| """ | |
| Parses expected_output_text into a list of (key, description) tuples. | |
| """ | |
| fields = [] | |
| lines = expected_output_text.strip().splitlines() | |
| for line in lines: | |
| if ':' not in line: | |
| continue | |
| key, description = line.split(':', 1) | |
| fields.append((key.strip(), description.strip())) | |
| return fields | |
| def extract_fields_from_expected_output(expected_output_text): | |
| """ | |
| Returns just the list of keys (field names) from expected_output_text. | |
| """ | |
| parsed_fields = parse_expected_output_fields(expected_output_text) | |
| return [key for key, _ in parsed_fields] | |
| def split_json_string(text): | |
| """ | |
| Best of both worlds: | |
| - Splits text into 'thought' and 'JSON' parts | |
| - Scans for all possible { positions | |
| - Cleans unescaped newlines inside quotes | |
| - Strips junk between </think> and JSON if JSON exists | |
| - Preserves full text after </think> if no JSON | |
| """ | |
| # Step 1: Split at </think> if exists | |
| if '</think>' in text: | |
| thought_part, possible_json_part = text.split('</think>', 1) | |
| thought_part = thought_part.strip() | |
| possible_json_part = possible_json_part.strip() | |
| else: | |
| thought_part = None | |
| possible_json_part = text.strip() | |
| # Step 2: Find all { positions | |
| brace_positions = [m.start() for m in re.finditer(r'{', possible_json_part)] | |
| # Clean function: fix newlines inside quoted strings | |
| def clean_json_formatting(text): | |
| def fix_inside_quotes(match): | |
| content = match.group(1) | |
| fixed = content.replace('\n', '\\n').replace('\r', '\\n') | |
| return f'"{fixed}"' | |
| return re.sub(r'"(.*?)"', fix_inside_quotes, text, flags=re.DOTALL) | |
| for pos in brace_positions: | |
| candidate = possible_json_part[pos:].strip() | |
| # Pre-clean | |
| candidate = clean_json_formatting(candidate) | |
| # Fix double braces if necessary | |
| if candidate.startswith("{{") and "}}" in candidate: | |
| candidate = candidate.replace("{{", "{", 1).replace("}}", "}", 1) | |
| # Must start with {" or {' | |
| if not re.match(r'^\{\s*["\']', candidate): | |
| continue # not real JSON, skip | |
| try: | |
| json.loads(candidate) | |
| # β Successful parse | |
| return thought_part, candidate | |
| except json.JSONDecodeError: | |
| continue # try next | |
| # π No valid JSON found β return thought and full original remainder (no chopping) | |
| return thought_part, possible_json_part | |
| def extract_and_parse_json(result_text): | |
| """ | |
| Extracts and parses JSON output, handling cases where JSON is enclosed in triple backticks | |
| (```json ... ```) or already correctly formatted `{}`. | |
| Args: | |
| result_text (str): The raw text output containing JSON data. | |
| Returns: | |
| dict or None: Parsed JSON object if successful, None otherwise. | |
| """ | |
| if not result_text: | |
| print("π¨ No result text data received.") | |
| return None | |
| # π Clean unescaped line breaks that often break LLM JSON output | |
| def clean_json_formatting(text): | |
| # Replace unescaped newlines with a space | |
| return re.sub(r'(?<!\\)\n', ' ', text) | |
| # β Try parsing directly after cleaning line breaks | |
| cleaned_direct = clean_json_formatting(result_text) | |
| try: | |
| return json.loads(cleaned_direct) | |
| except json.JSONDecodeError: | |
| print("Unable to parse cleaned direct JSON.") | |
| pass | |
| # β Try extracting JSON from triple backticks | |
| match = re.search(r'```json\s*\n({[\s\S]+?})\n```', result_text, re.DOTALL) | |
| if match: | |
| try: | |
| return json.loads(match.group(1).strip()) | |
| except json.JSONDecodeError: | |
| pass # If still invalid, return None | |
| print("π¨ No valid JSON found.") | |
| return None # No valid JSON detected | |
| def generate_json_expected_output(expected_output_text): | |
| """ | |
| Generates a JSON-style expected output based on expected_output_text. | |
| """ | |
| parsed_fields = parse_expected_output_fields(expected_output_text) | |
| json_fields = [] | |
| for key, description in parsed_fields: | |
| # Convert to JSON-style key (lowercase, underscores preserved) | |
| json_key = key.lower() | |
| json_fields.append(f' "{json_key}": {description},') | |
| # Remove trailing comma from the last entry | |
| if json_fields: | |
| json_fields[-1] = json_fields[-1].rstrip(',') | |
| # Join fields | |
| json_body = "\n".join(json_fields) | |
| output = ( | |
| "You must return your answer strictly in the following JSON format. " | |
| "Do not include any markdown, commentary, or extra text. The response must be valid JSON:\n\n" | |
| "{\n" | |
| f"{json_body}\n" | |
| "}" | |
| ) | |
| return output | |