SamiaHaque's picture
Adding files for initial deepforest-agent implementation
4f24301
raw
history blame
9.13 kB
import json
import re
from typing import Dict, List, Any, Optional
def parse_image_quality_for_deepforest(response: str) -> str:
"""
Parse IMAGE_QUALITY_FOR_DEEPFOREST from response.
Args:
response: Model response text
Returns:
"Yes" or "No"
"""
quality_match = re.search(r'(?:\*\*)?IMAGE_QUALITY_FOR_DEEPFOREST[:\*\s]+\[?(YES|NO|Yes|No|yes|no)\]?', response, re.IGNORECASE)
if quality_match:
quality_value = quality_match.group(1).upper()
return "Yes" if quality_value == "YES" else "No"
return "No"
def parse_deepforest_objects_present(response: str) -> List[str]:
"""
Parse DEEPFOREST_OBJECTS_PRESENT from response.
Args:
response: Model response text
Returns:
List of objects present
"""
objects_match = re.search(r'(?:\*\*)?DEEPFOREST_OBJECTS_PRESENT[:\*\s]+(\[.*?\])', response, re.DOTALL)
if objects_match:
try:
objects_str = objects_match.group(1)
objects_str = re.sub(r'[`\'"]', '"', objects_str)
objects_list = json.loads(objects_str)
allowed_objects = ["bird", "tree", "livestock"]
validated_objects = [obj for obj in objects_list if obj in allowed_objects]
return validated_objects
except json.JSONDecodeError:
objects_str = objects_match.group(1)
manual_objects = re.findall(r'"(bird|tree|livestock)"', objects_str)
return list(set(manual_objects))
return []
def parse_additional_objects_json(response: str) -> List[Dict[str, Any]]:
"""
Parse ADDITIONAL_OBJECTS_JSON from response.
Args:
response: Model response text
Returns:
List of additional objects with coordinates
"""
additional_match = re.search(r'(?:\*\*)?ADDITIONAL_OBJECTS_JSON[:\*\s]+(.*?)(?=\n(?:\*\*)?(?:VISUAL_ANALYSIS|IMAGE_QUALITY|DEEPFOREST_OBJECTS)|$)', response, re.DOTALL)
if additional_match:
try:
additional_str = additional_match.group(1).strip()
if additional_str.startswith('```json'):
additional_str = additional_str[7:]
if additional_str.startswith('```'):
additional_str = additional_str[3:]
if additional_str.endswith('```'):
additional_str = additional_str[:-3]
additional_str = additional_str.strip()
if additional_str.startswith('[') and additional_str.endswith(']'):
additional_objects = json.loads(additional_str)
if isinstance(additional_objects, list):
return additional_objects
else:
additional_objects = []
for line in additional_str.split('\n'):
line = line.strip().rstrip(',')
if line and line.startswith('{') and line.endswith('}'):
try:
obj = json.loads(line)
additional_objects.append(obj)
except json.JSONDecodeError:
continue
return additional_objects
except Exception as e:
print(f"Error parsing additional objects JSON: {e}")
return []
def parse_visual_analysis(response: str) -> str:
"""
Parse VISUAL_ANALYSIS from response.
Args:
response: Model response text
Returns:
Visual analysis text
"""
analysis_match = re.search(r'(?:\*\*)?VISUAL_ANALYSIS[:\*\s]+(.*?)(?=\n(?:\*\*)?(?:IMAGE_QUALITY|DEEPFOREST_OBJECTS|ADDITIONAL_OBJECTS)|$)', response, re.IGNORECASE | re.DOTALL)
if analysis_match:
return analysis_match.group(1).strip()
else:
fallback_match = re.search(r'(?:\*\*)?VISUAL_ANALYSIS[:\*\s]+(.*)', response, re.IGNORECASE | re.DOTALL)
if fallback_match:
return fallback_match.group(1).strip()
return response
def parse_deepforest_agent_response_with_reasoning(response: str) -> Dict[str, Any]:
"""
Parse DeepForest detector agent response with reasoning.
Args:
response: Model response text
Returns:
Dictionary with reasoning and tool calls
"""
from deepforest_agent.tools.tool_handler import extract_all_tool_calls
try:
tool_calls = extract_all_tool_calls(response)
if not tool_calls:
return {"error": "No valid tool calls found in response"}
reasoning_text = ""
first_json_match = re.search(r'\{[^}]*"name"[^}]*"arguments"[^}]*\}', response)
if first_json_match:
reasoning_text = response[:first_json_match.start()].strip()
reasoning_text = re.sub(r'^(REASONING:|Reasoning:|Analysis:|\*\*REASONING:\*\*)', '', reasoning_text).strip()
if not reasoning_text:
reasoning_text = "Tool calls generated based on analysis"
return {
"reasoning": reasoning_text,
"tool_calls": tool_calls
}
except Exception as e:
return {"error": f"Unexpected error parsing response: {str(e)}"}
def parse_memory_agent_response(response: str) -> Dict[str, Any]:
"""
Parse memory agent structured response format with new TOOL_CACHE_ID field.
Args:
response: Model response text
Returns:
Dictionary with answer_present, direct_answer, tool_cache_id, and relevant_context
"""
try:
# Parse ANSWER_PRESENT
answer_present_match = re.search(r'(?:\*\*)?ANSWER_PRESENT:(?:\*\*)?\s*\[?(YES|NO)\]?', response, re.IGNORECASE)
answer_present = False
if answer_present_match:
answer_present = answer_present_match.group(1).upper() == "YES"
# Parse TOOL_CACHE_ID
tool_cache_id_match = re.search(r'(?:\*\*)?TOOL_CACHE_ID:(?:\*\*)?\s*(.*?)(?=\n(?:\*\*)?(?:RELEVANT_CONTEXT|$))', response, re.IGNORECASE | re.DOTALL)
tool_cache_id = None
if tool_cache_id_match:
tool_cache_id_text = tool_cache_id_match.group(1).strip()
# Extract all cache IDs using multiple patterns
cache_ids = []
# Pattern 1: IDs within brackets [id1, id2, ...]
bracket_pattern = r'\[([^\[\]]*)\]'
bracket_matches = re.findall(bracket_pattern, tool_cache_id_text)
for bracket_content in bracket_matches:
if bracket_content.strip(): # Skip empty brackets
# Extract hex IDs from bracket content
hex_ids = re.findall(r'([a-fA-F0-9]{8,})', bracket_content)
cache_ids.extend(hex_ids)
# Pattern 2: Direct hex IDs (not in brackets)
# Remove bracketed content first, then find remaining hex IDs
text_without_brackets = re.sub(r'\[[^\[\]]*\]', '', tool_cache_id_text)
direct_hex_ids = re.findall(r'([a-fA-F0-9]{8,})', text_without_brackets)
cache_ids.extend(direct_hex_ids)
# Pattern 3: Standalone hex IDs on separate lines (check the whole response)
standalone_pattern = r'^([a-fA-F0-9]{8,})$'
standalone_matches = re.findall(standalone_pattern, response, re.MULTILINE)
cache_ids.extend(standalone_matches)
# Remove duplicates while preserving order
seen = set()
unique_cache_ids = []
for cache_id in cache_ids:
if cache_id not in seen:
seen.add(cache_id)
unique_cache_ids.append(cache_id)
if unique_cache_ids:
tool_cache_id = ", ".join(unique_cache_ids) if len(unique_cache_ids) > 1 else unique_cache_ids[0]
elif tool_cache_id_text and tool_cache_id_text.lower() not in ["", "empty", "none", "no tool cache id"]:
tool_cache_id = tool_cache_id_text
# Parse RELEVANT_CONTEXT
context_match = re.search(
r'(?:\*\*)?RELEVANT_CONTEXT:(?:\*\*)?\s*(.*?)(?=\n\*\*[A-Z_]+:|\Z)',
response,
re.IGNORECASE | re.DOTALL
)
relevant_context = ""
if context_match:
relevant_context = context_match.group(1).strip()
elif not answer_present:
relevant_context = response
return {
"answer_present": answer_present,
"direct_answer": "YES" if answer_present else "NO",
"tool_cache_id": tool_cache_id,
"relevant_context": relevant_context,
"raw_response": response
}
except Exception as e:
print(f"Error parsing memory response: {e}")
return {
"answer_present": False,
"direct_answer": "NO",
"tool_cache_id": None,
"relevant_context": response,
"raw_response": response
}