Spaces:
No application file
No application file
| import json | |
| import re | |
| from typing import Dict, List, Any, Optional | |
| def parse_image_quality_for_deepforest(response: str) -> str: | |
| """ | |
| Parse IMAGE_QUALITY_FOR_DEEPFOREST from response. | |
| Args: | |
| response: Model response text | |
| Returns: | |
| "Yes" or "No" | |
| """ | |
| quality_match = re.search(r'(?:\*\*)?IMAGE_QUALITY_FOR_DEEPFOREST[:\*\s]+\[?(YES|NO|Yes|No|yes|no)\]?', response, re.IGNORECASE) | |
| if quality_match: | |
| quality_value = quality_match.group(1).upper() | |
| return "Yes" if quality_value == "YES" else "No" | |
| return "No" | |
| def parse_deepforest_objects_present(response: str) -> List[str]: | |
| """ | |
| Parse DEEPFOREST_OBJECTS_PRESENT from response. | |
| Args: | |
| response: Model response text | |
| Returns: | |
| List of objects present | |
| """ | |
| objects_match = re.search(r'(?:\*\*)?DEEPFOREST_OBJECTS_PRESENT[:\*\s]+(\[.*?\])', response, re.DOTALL) | |
| if objects_match: | |
| try: | |
| objects_str = objects_match.group(1) | |
| objects_str = re.sub(r'[`\'"]', '"', objects_str) | |
| objects_list = json.loads(objects_str) | |
| allowed_objects = ["bird", "tree", "livestock"] | |
| validated_objects = [obj for obj in objects_list if obj in allowed_objects] | |
| return validated_objects | |
| except json.JSONDecodeError: | |
| objects_str = objects_match.group(1) | |
| manual_objects = re.findall(r'"(bird|tree|livestock)"', objects_str) | |
| return list(set(manual_objects)) | |
| return [] | |
| def parse_additional_objects_json(response: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parse ADDITIONAL_OBJECTS_JSON from response. | |
| Args: | |
| response: Model response text | |
| Returns: | |
| List of additional objects with coordinates | |
| """ | |
| additional_match = re.search(r'(?:\*\*)?ADDITIONAL_OBJECTS_JSON[:\*\s]+(.*?)(?=\n(?:\*\*)?(?:VISUAL_ANALYSIS|IMAGE_QUALITY|DEEPFOREST_OBJECTS)|$)', response, re.DOTALL) | |
| if additional_match: | |
| try: | |
| additional_str = additional_match.group(1).strip() | |
| if additional_str.startswith('```json'): | |
| additional_str = additional_str[7:] | |
| if additional_str.startswith('```'): | |
| additional_str = additional_str[3:] | |
| if additional_str.endswith('```'): | |
| additional_str = additional_str[:-3] | |
| additional_str = additional_str.strip() | |
| if additional_str.startswith('[') and additional_str.endswith(']'): | |
| additional_objects = json.loads(additional_str) | |
| if isinstance(additional_objects, list): | |
| return additional_objects | |
| else: | |
| additional_objects = [] | |
| for line in additional_str.split('\n'): | |
| line = line.strip().rstrip(',') | |
| if line and line.startswith('{') and line.endswith('}'): | |
| try: | |
| obj = json.loads(line) | |
| additional_objects.append(obj) | |
| except json.JSONDecodeError: | |
| continue | |
| return additional_objects | |
| except Exception as e: | |
| print(f"Error parsing additional objects JSON: {e}") | |
| return [] | |
| def parse_visual_analysis(response: str) -> str: | |
| """ | |
| Parse VISUAL_ANALYSIS from response. | |
| Args: | |
| response: Model response text | |
| Returns: | |
| Visual analysis text | |
| """ | |
| analysis_match = re.search(r'(?:\*\*)?VISUAL_ANALYSIS[:\*\s]+(.*?)(?=\n(?:\*\*)?(?:IMAGE_QUALITY|DEEPFOREST_OBJECTS|ADDITIONAL_OBJECTS)|$)', response, re.IGNORECASE | re.DOTALL) | |
| if analysis_match: | |
| return analysis_match.group(1).strip() | |
| else: | |
| fallback_match = re.search(r'(?:\*\*)?VISUAL_ANALYSIS[:\*\s]+(.*)', response, re.IGNORECASE | re.DOTALL) | |
| if fallback_match: | |
| return fallback_match.group(1).strip() | |
| return response | |
| def parse_deepforest_agent_response_with_reasoning(response: str) -> Dict[str, Any]: | |
| """ | |
| Parse DeepForest detector agent response with reasoning. | |
| Args: | |
| response: Model response text | |
| Returns: | |
| Dictionary with reasoning and tool calls | |
| """ | |
| from deepforest_agent.tools.tool_handler import extract_all_tool_calls | |
| try: | |
| tool_calls = extract_all_tool_calls(response) | |
| if not tool_calls: | |
| return {"error": "No valid tool calls found in response"} | |
| reasoning_text = "" | |
| first_json_match = re.search(r'\{[^}]*"name"[^}]*"arguments"[^}]*\}', response) | |
| if first_json_match: | |
| reasoning_text = response[:first_json_match.start()].strip() | |
| reasoning_text = re.sub(r'^(REASONING:|Reasoning:|Analysis:|\*\*REASONING:\*\*)', '', reasoning_text).strip() | |
| if not reasoning_text: | |
| reasoning_text = "Tool calls generated based on analysis" | |
| return { | |
| "reasoning": reasoning_text, | |
| "tool_calls": tool_calls | |
| } | |
| except Exception as e: | |
| return {"error": f"Unexpected error parsing response: {str(e)}"} | |
| def parse_memory_agent_response(response: str) -> Dict[str, Any]: | |
| """ | |
| Parse memory agent structured response format with new TOOL_CACHE_ID field. | |
| Args: | |
| response: Model response text | |
| Returns: | |
| Dictionary with answer_present, direct_answer, tool_cache_id, and relevant_context | |
| """ | |
| try: | |
| # Parse ANSWER_PRESENT | |
| answer_present_match = re.search(r'(?:\*\*)?ANSWER_PRESENT:(?:\*\*)?\s*\[?(YES|NO)\]?', response, re.IGNORECASE) | |
| answer_present = False | |
| if answer_present_match: | |
| answer_present = answer_present_match.group(1).upper() == "YES" | |
| # Parse TOOL_CACHE_ID | |
| tool_cache_id_match = re.search(r'(?:\*\*)?TOOL_CACHE_ID:(?:\*\*)?\s*(.*?)(?=\n(?:\*\*)?(?:RELEVANT_CONTEXT|$))', response, re.IGNORECASE | re.DOTALL) | |
| tool_cache_id = None | |
| if tool_cache_id_match: | |
| tool_cache_id_text = tool_cache_id_match.group(1).strip() | |
| # Extract all cache IDs using multiple patterns | |
| cache_ids = [] | |
| # Pattern 1: IDs within brackets [id1, id2, ...] | |
| bracket_pattern = r'\[([^\[\]]*)\]' | |
| bracket_matches = re.findall(bracket_pattern, tool_cache_id_text) | |
| for bracket_content in bracket_matches: | |
| if bracket_content.strip(): # Skip empty brackets | |
| # Extract hex IDs from bracket content | |
| hex_ids = re.findall(r'([a-fA-F0-9]{8,})', bracket_content) | |
| cache_ids.extend(hex_ids) | |
| # Pattern 2: Direct hex IDs (not in brackets) | |
| # Remove bracketed content first, then find remaining hex IDs | |
| text_without_brackets = re.sub(r'\[[^\[\]]*\]', '', tool_cache_id_text) | |
| direct_hex_ids = re.findall(r'([a-fA-F0-9]{8,})', text_without_brackets) | |
| cache_ids.extend(direct_hex_ids) | |
| # Pattern 3: Standalone hex IDs on separate lines (check the whole response) | |
| standalone_pattern = r'^([a-fA-F0-9]{8,})$' | |
| standalone_matches = re.findall(standalone_pattern, response, re.MULTILINE) | |
| cache_ids.extend(standalone_matches) | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_cache_ids = [] | |
| for cache_id in cache_ids: | |
| if cache_id not in seen: | |
| seen.add(cache_id) | |
| unique_cache_ids.append(cache_id) | |
| if unique_cache_ids: | |
| tool_cache_id = ", ".join(unique_cache_ids) if len(unique_cache_ids) > 1 else unique_cache_ids[0] | |
| elif tool_cache_id_text and tool_cache_id_text.lower() not in ["", "empty", "none", "no tool cache id"]: | |
| tool_cache_id = tool_cache_id_text | |
| # Parse RELEVANT_CONTEXT | |
| context_match = re.search( | |
| r'(?:\*\*)?RELEVANT_CONTEXT:(?:\*\*)?\s*(.*?)(?=\n\*\*[A-Z_]+:|\Z)', | |
| response, | |
| re.IGNORECASE | re.DOTALL | |
| ) | |
| relevant_context = "" | |
| if context_match: | |
| relevant_context = context_match.group(1).strip() | |
| elif not answer_present: | |
| relevant_context = response | |
| return { | |
| "answer_present": answer_present, | |
| "direct_answer": "YES" if answer_present else "NO", | |
| "tool_cache_id": tool_cache_id, | |
| "relevant_context": relevant_context, | |
| "raw_response": response | |
| } | |
| except Exception as e: | |
| print(f"Error parsing memory response: {e}") | |
| return { | |
| "answer_present": False, | |
| "direct_answer": "NO", | |
| "tool_cache_id": None, | |
| "relevant_context": response, | |
| "raw_response": response | |
| } |