|
|
""" |
|
|
Code execution tool for the forensic agent. |
|
|
|
|
|
This allows the agent to execute Python code dynamically, similar to ChatGPT's code interpreter. |
|
|
Useful for custom image analysis, zooming, cropping, statistical analysis, etc. |
|
|
""" |
|
|
|
|
|
import io |
|
|
import sys |
|
|
import os |
|
|
import traceback |
|
|
from typing import Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
import base64 |
|
|
import json |
|
|
|
|
|
try: |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
except ImportError: |
|
|
Image = None |
|
|
np = None |
|
|
|
|
|
|
|
|
def execute_python_code(code: str, image_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> str: |
|
|
""" |
|
|
Execute Python code in a sandboxed environment with image processing capabilities. |
|
|
|
|
|
Args: |
|
|
code: Python code to execute |
|
|
image_path: Optional path to the current image being analyzed |
|
|
context: Optional dictionary of variables to make available to the code |
|
|
|
|
|
Returns: |
|
|
String result of code execution (stdout + return value if any) |
|
|
""" |
|
|
|
|
|
old_stdout = sys.stdout |
|
|
sys.stdout = captured_output = io.StringIO() |
|
|
|
|
|
|
|
|
old_stderr = sys.stderr |
|
|
sys.stderr = captured_error = io.StringIO() |
|
|
|
|
|
try: |
|
|
|
|
|
namespace = { |
|
|
'__builtins__': { |
|
|
'abs': abs, 'all': all, 'any': any, 'bool': bool, 'dict': dict, |
|
|
'enumerate': enumerate, 'float': float, 'int': int, 'len': len, |
|
|
'list': list, 'max': max, 'min': min, 'print': print, 'range': range, |
|
|
'round': round, 'set': set, 'sorted': sorted, 'str': str, 'sum': sum, |
|
|
'tuple': tuple, 'type': type, 'zip': zip, |
|
|
|
|
|
'abs': abs, 'round': round, 'min': min, 'max': max, 'sum': sum, |
|
|
}, |
|
|
'np': np, |
|
|
'Image': Image, |
|
|
'Path': Path, |
|
|
'base64': base64, |
|
|
'json': json, |
|
|
} |
|
|
|
|
|
|
|
|
if image_path: |
|
|
namespace['image_path'] = image_path |
|
|
namespace['current_image_path'] = image_path |
|
|
|
|
|
|
|
|
if Image: |
|
|
try: |
|
|
img = Image.open(image_path) |
|
|
namespace['image'] = img |
|
|
namespace['img'] = img |
|
|
|
|
|
if np: |
|
|
img_array = np.array(img) |
|
|
namespace['image_array'] = img_array |
|
|
namespace['img_array'] = img_array |
|
|
except Exception as e: |
|
|
namespace['image_load_error'] = str(e) |
|
|
|
|
|
|
|
|
if context: |
|
|
namespace.update(context) |
|
|
|
|
|
|
|
|
exec(code, namespace) |
|
|
|
|
|
|
|
|
result_value = namespace.get('result', None) |
|
|
|
|
|
|
|
|
stdout_output = captured_output.getvalue() |
|
|
stderr_output = captured_error.getvalue() |
|
|
|
|
|
|
|
|
output_parts = [] |
|
|
if stdout_output: |
|
|
output_parts.append(stdout_output) |
|
|
if stderr_output: |
|
|
output_parts.append(f"STDERR:\n{stderr_output}") |
|
|
if result_value is not None: |
|
|
output_parts.append(f"\nReturn value: {result_value}") |
|
|
|
|
|
return '\n'.join(output_parts) if output_parts else "Code executed successfully (no output)" |
|
|
|
|
|
except Exception as e: |
|
|
error_traceback = traceback.format_exc() |
|
|
return f"Error executing code:\n{error_traceback}" |
|
|
|
|
|
finally: |
|
|
|
|
|
sys.stdout = old_stdout |
|
|
sys.stderr = old_stderr |
|
|
|
|
|
|
|
|
def run_code_interpreter(input_str: str) -> str: |
|
|
""" |
|
|
LangChain tool wrapper for code execution. |
|
|
|
|
|
Input format: JSON string with 'code' and optionally 'image_path' |
|
|
Example: '{"code": "print(image.size)", "image_path": "path/to/image.jpg"}' |
|
|
Or simple string: just the code (will try to extract image_path from context if available) |
|
|
|
|
|
The agent should include the image_path in the JSON if available from the analysis context. |
|
|
""" |
|
|
try: |
|
|
|
|
|
try: |
|
|
params = json.loads(input_str) |
|
|
code = params.get('code', '') |
|
|
image_path = params.get('image_path') |
|
|
context = params.get('context', {}) |
|
|
except (json.JSONDecodeError, AttributeError): |
|
|
|
|
|
code = input_str |
|
|
image_path = None |
|
|
context = {} |
|
|
|
|
|
|
|
|
|
|
|
import re |
|
|
path_match = re.search(r'image_path\s*=\s*["\']([^"\']+)["\']', code) |
|
|
if path_match: |
|
|
image_path = path_match.group(1) |
|
|
|
|
|
if not code.strip(): |
|
|
return "Error: No code provided. Provide Python code to execute.\n" \ |
|
|
"Format: {\"code\": \"your_python_code_here\", \"image_path\": \"path/to/image.jpg\"}\n" \ |
|
|
"Or: just the Python code as a string (image_path should be in agent context)." |
|
|
|
|
|
|
|
|
if not image_path: |
|
|
import re |
|
|
|
|
|
path_patterns = [ |
|
|
r'image_path\s*=\s*["\']([^"\']+)["\']', |
|
|
r'["\']([^"\']+\.(jpg|jpeg|png|gif|bmp))["\']', |
|
|
] |
|
|
for pattern in path_patterns: |
|
|
match = re.search(pattern, code, re.IGNORECASE) |
|
|
if match: |
|
|
potential_path = match.group(1) |
|
|
if os.path.exists(potential_path): |
|
|
image_path = potential_path |
|
|
break |
|
|
|
|
|
|
|
|
result = execute_python_code(code, image_path=image_path, context=context) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error in code interpreter: {str(e)}\n{traceback.format_exc()}" |
|
|
|
|
|
|