deepforest-agent / src /deepforest_agent /agents /visual_analysis_agent.py
SamiaHaque's picture
Adding files for initial deepforest-agent implementation
4f24301
raw
history blame
13.4 kB
from typing import Dict, List, Any, Optional
from PIL import Image
import json
import re
import time
import torch
import gc
from deepforest_agent.models.qwen_vl_3b_instruct import QwenVL3BModelManager
from deepforest_agent.utils.image_utils import encode_pil_image_to_base64_url, determine_patch_size, get_image_dimensions_fast
from deepforest_agent.utils.state_manager import session_state_manager
from deepforest_agent.conf.config import Config
from deepforest_agent.utils.parsing_utils import (
parse_image_quality_for_deepforest,
parse_deepforest_objects_present,
parse_visual_analysis,
parse_additional_objects_json
)
from deepforest_agent.prompts.prompt_templates import create_full_image_quality_analysis_prompt, create_individual_tile_analysis_prompt
from deepforest_agent.utils.logging_utils import multi_agent_logger
from deepforest_agent.utils.tile_manager import tile_image_for_analysis
class VisualAnalysisAgent:
"""
Visual analysis agent responsible for analyzing images with unified full/tiled approach.
Uses Qwen VL model for multimodal understanding.
"""
def __init__(self):
"""Initialize the Visual Analysis Agent."""
self.agent_config = Config.AGENT_CONFIGS["visual_analysis"]
self.model_manager = QwenVL3BModelManager(Config.AGENT_MODELS["visual_analysis"])
def analyze_full_image(self, user_message: str, session_id: str) -> Dict[str, Any]:
"""
Analyze full image with automatic fallback to tiling on OOM.
Args:
user_message: User's query
session_id: Session identifier
Returns:
Dict with unified structure for both full and tiled analysis
"""
if not session_state_manager.session_exists(session_id):
return {
"image_quality_for_deepforest": "No",
"deepforest_objects_present": [],
"additional_objects": [],
"visual_analysis": f"Session {session_id} not found.",
"status": "error",
"analysis_type": "error"
}
image = session_state_manager.get(session_id, "current_image")
if image is None:
return {
"image_quality_for_deepforest": "No",
"deepforest_objects_present": [],
"additional_objects": [],
"visual_analysis": f"No image available in session {session_id}.",
"status": "error",
"analysis_type": "error"
}
# Try full image analysis first
try:
print(f"Session {session_id} - Attempting full image analysis")
result = self._analyze_single_image(image, user_message, session_id, is_full_image=True)
if result["status"] == "success":
multi_agent_logger.log_agent_execution(
session_id=session_id,
agent_name="visual_analysis",
agent_input=f"Full image analysis for: {user_message}",
agent_output=result["visual_analysis"],
execution_time=0.0
)
return result
except Exception as e:
print(f"Session {session_id} - Full image analysis failed (likely OOM): {e}")
return self._analyze_with_tiling(user_message, session_id, str(e))
return self._analyze_with_tiling(user_message, session_id, "Full image analysis failed")
def _analyze_single_image(self, image: Image.Image, user_message: str, session_id: str,
is_full_image: bool = True, tile_location: str = "") -> Dict[str, Any]:
"""
Analyze a single image (full image or tile) with unified structure.
Args:
image: PIL Image to analyze
user_message: User's query
session_id: Session identifier
is_full_image: Whether this is full image or tile
tile_location: Location description for tiles
Returns:
Unified analysis result
"""
system_prompt = create_full_image_quality_analysis_prompt(user_message)
image_base64_url = encode_pil_image_to_base64_url(image)
messages = [
{"role": "system", "content": [{"type": "text", "text": system_prompt}]},
{
"role": "user",
"content": [
{"type": "image", "image": image_base64_url},
{"type": "text", "text": user_message}
]
}
]
response = self.model_manager.generate_response(
messages=messages,
max_new_tokens=self.agent_config["max_new_tokens"],
temperature=self.agent_config["temperature"]
)
# Parse structured response
image_quality = parse_image_quality_for_deepforest(response)
deepforest_objects = parse_deepforest_objects_present(response)
additional_objects = parse_additional_objects_json(response)
raw_visual_analysis = parse_visual_analysis(response)
# Format visual analysis with consistent prefix
if is_full_image:
width, height = image.size
visual_analysis = f"Full image analysis of image ({width}x{height}) is done. Here's the analysis: {raw_visual_analysis}"
analysis_type = "full_image"
else:
visual_analysis = f"The visual analysis of tiled image on ({tile_location}) this location is done. Here's the analysis: {raw_visual_analysis}"
analysis_type = "tiled_image"
return {
"image_quality_for_deepforest": image_quality,
"deepforest_objects_present": deepforest_objects,
"additional_objects": additional_objects,
"visual_analysis": visual_analysis,
"status": "success",
"analysis_type": analysis_type,
"raw_response": response
}
def _analyze_with_tiling(self, user_message: str, session_id: str, error_msg: str) -> Dict[str, Any]:
"""
Analyze image using tiling approach when full image fails.
Args:
user_message: User's query
session_id: Session identifier
error_msg: Original error message
Returns:
Combined analysis from tiled approach with same structure as full image
"""
print(f"Session {session_id} - Falling back to tiled analysis due to: {error_msg}")
image = session_state_manager.get(session_id, "current_image")
image_file_path = session_state_manager.get(session_id, "image_file_path")
if not image:
return {
"image_quality_for_deepforest": "No",
"deepforest_objects_present": [],
"additional_objects": [],
"visual_analysis": "No image available for tiled analysis.",
"status": "error",
"analysis_type": "error"
}
# Determine appropriate patch size
if image_file_path:
patch_size = determine_patch_size(image_file_path, image.size)
else:
max_dim = max(image.size)
if max_dim >= 5000:
patch_size = 1500 if max_dim <= 7500 else 2000
else:
patch_size = 1000
print(f"Session {session_id} - Using patch size {patch_size} for tiled analysis")
try:
tiles, tile_metadata = tile_image_for_analysis(
image=image,
patch_size=patch_size,
patch_overlap=Config.DEEPFOREST_DEFAULTS["patch_overlap"],
image_file_path=image_file_path
)
print(f"Session {session_id} - Created {len(tiles)} tiles for analysis")
# Analyze all tiles and combine results
all_visual_analyses = []
all_additional_objects = []
tile_results = []
for i, (tile, metadata) in enumerate(zip(tiles, tile_metadata)):
try:
tile_coords = metadata.get("window_coords", {})
location_desc = f"x:{tile_coords.get('x', 0)}-{tile_coords.get('x', 0) + tile_coords.get('width', 0)}, y:{tile_coords.get('y', 0)}-{tile_coords.get('y', 0) + tile_coords.get('height', 0)}"
# Analyze individual tile
tile_result = self._analyze_single_image(
image=tile,
user_message=user_message,
session_id=session_id,
is_full_image=False,
tile_location=location_desc
)
if tile_result["status"] == "success":
all_visual_analyses.append(tile_result["visual_analysis"])
all_additional_objects.extend(tile_result["additional_objects"])
# Store tile result for potential reuse
tile_results.append({
"tile_id": i,
"location": location_desc,
"coordinates": tile_coords,
"visual_analysis": tile_result["visual_analysis"],
"additional_objects": tile_result["additional_objects"]
})
# Log individual tile analysis
multi_agent_logger.log_agent_execution(
session_id=session_id,
agent_name=f"visual_tile_{i}",
agent_input=f"Tile {i+1} analysis: {user_message}",
agent_output=tile_result["visual_analysis"],
execution_time=0.0
)
print(f"Session {session_id} - Analyzed tile {i+1}/{len(tiles)}")
# Memory cleanup
del tile
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception as tile_error:
print(f"Session {session_id} - Tile {i} analysis failed: {tile_error}")
continue
if all_visual_analyses:
# Store tile results for potential reuse
session_state_manager.set(session_id, "tile_analysis_results", tile_results)
session_state_manager.set(session_id, "tiled_patch_size", patch_size)
# Combine all tile analyses
combined_visual_analysis = " ".join(all_visual_analyses)
return {
"image_quality_for_deepforest": "Yes",
"deepforest_objects_present": ["tree", "bird", "livestock"],
"additional_objects": all_additional_objects,
"visual_analysis": combined_visual_analysis,
"status": "tiled_success",
"analysis_type": "tiled_combined",
"tile_count": len(tiles),
"successful_tiles": len(all_visual_analyses),
"patch_size_used": patch_size
}
except Exception as tiling_error:
print(f"Session {session_id} - Tiled analysis also failed: {tiling_error}")
# Final fallback - resolution-based assessment
resolution_result = session_state_manager.get(session_id, "resolution_result")
if resolution_result and resolution_result.get("is_suitable"):
width, height = image.size
return {
"image_quality_for_deepforest": "Yes",
"deepforest_objects_present": ["tree", "bird", "livestock"],
"additional_objects": [],
"visual_analysis": f"Full image analysis of image ({width}x{height}) is done. Here's the analysis: Large image analyzed using resolution-based assessment. Original error: {error_msg}",
"status": "resolution_fallback",
"analysis_type": "resolution_based"
}
# Complete failure
width, height = image.size
return {
"image_quality_for_deepforest": "No",
"deepforest_objects_present": [],
"additional_objects": [],
"visual_analysis": f"Full image analysis of image ({width}x{height}) failed. Analysis could not be completed due to: {error_msg}",
"status": "error",
"analysis_type": "failed"
}
def get_tile_analysis_results(self, session_id: str) -> List[Dict[str, Any]]:
"""
Get stored tile analysis results for reuse.
Args:
session_id: Session identifier
Returns:
List of tile analysis results or empty list
"""
return session_state_manager.get(session_id, "tile_analysis_results", [])