Spaces:
No application file
No application file
| from typing import Dict, List, Any, Optional | |
| from PIL import Image | |
| import json | |
| import re | |
| import time | |
| import torch | |
| import gc | |
| from deepforest_agent.models.qwen_vl_3b_instruct import QwenVL3BModelManager | |
| from deepforest_agent.utils.image_utils import encode_pil_image_to_base64_url, determine_patch_size, get_image_dimensions_fast | |
| from deepforest_agent.utils.state_manager import session_state_manager | |
| from deepforest_agent.conf.config import Config | |
| from deepforest_agent.utils.parsing_utils import ( | |
| parse_image_quality_for_deepforest, | |
| parse_deepforest_objects_present, | |
| parse_visual_analysis, | |
| parse_additional_objects_json | |
| ) | |
| from deepforest_agent.prompts.prompt_templates import create_full_image_quality_analysis_prompt, create_individual_tile_analysis_prompt | |
| from deepforest_agent.utils.logging_utils import multi_agent_logger | |
| from deepforest_agent.utils.tile_manager import tile_image_for_analysis | |
| class VisualAnalysisAgent: | |
| """ | |
| Visual analysis agent responsible for analyzing images with unified full/tiled approach. | |
| Uses Qwen VL model for multimodal understanding. | |
| """ | |
| def __init__(self): | |
| """Initialize the Visual Analysis Agent.""" | |
| self.agent_config = Config.AGENT_CONFIGS["visual_analysis"] | |
| self.model_manager = QwenVL3BModelManager(Config.AGENT_MODELS["visual_analysis"]) | |
| def analyze_full_image(self, user_message: str, session_id: str) -> Dict[str, Any]: | |
| """ | |
| Analyze full image with automatic fallback to tiling on OOM. | |
| Args: | |
| user_message: User's query | |
| session_id: Session identifier | |
| Returns: | |
| Dict with unified structure for both full and tiled analysis | |
| """ | |
| if not session_state_manager.session_exists(session_id): | |
| return { | |
| "image_quality_for_deepforest": "No", | |
| "deepforest_objects_present": [], | |
| "additional_objects": [], | |
| "visual_analysis": f"Session {session_id} not found.", | |
| "status": "error", | |
| "analysis_type": "error" | |
| } | |
| image = session_state_manager.get(session_id, "current_image") | |
| if image is None: | |
| return { | |
| "image_quality_for_deepforest": "No", | |
| "deepforest_objects_present": [], | |
| "additional_objects": [], | |
| "visual_analysis": f"No image available in session {session_id}.", | |
| "status": "error", | |
| "analysis_type": "error" | |
| } | |
| # Try full image analysis first | |
| try: | |
| print(f"Session {session_id} - Attempting full image analysis") | |
| result = self._analyze_single_image(image, user_message, session_id, is_full_image=True) | |
| if result["status"] == "success": | |
| multi_agent_logger.log_agent_execution( | |
| session_id=session_id, | |
| agent_name="visual_analysis", | |
| agent_input=f"Full image analysis for: {user_message}", | |
| agent_output=result["visual_analysis"], | |
| execution_time=0.0 | |
| ) | |
| return result | |
| except Exception as e: | |
| print(f"Session {session_id} - Full image analysis failed (likely OOM): {e}") | |
| return self._analyze_with_tiling(user_message, session_id, str(e)) | |
| return self._analyze_with_tiling(user_message, session_id, "Full image analysis failed") | |
| def _analyze_single_image(self, image: Image.Image, user_message: str, session_id: str, | |
| is_full_image: bool = True, tile_location: str = "") -> Dict[str, Any]: | |
| """ | |
| Analyze a single image (full image or tile) with unified structure. | |
| Args: | |
| image: PIL Image to analyze | |
| user_message: User's query | |
| session_id: Session identifier | |
| is_full_image: Whether this is full image or tile | |
| tile_location: Location description for tiles | |
| Returns: | |
| Unified analysis result | |
| """ | |
| system_prompt = create_full_image_quality_analysis_prompt(user_message) | |
| image_base64_url = encode_pil_image_to_base64_url(image) | |
| messages = [ | |
| {"role": "system", "content": [{"type": "text", "text": system_prompt}]}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image_base64_url}, | |
| {"type": "text", "text": user_message} | |
| ] | |
| } | |
| ] | |
| response = self.model_manager.generate_response( | |
| messages=messages, | |
| max_new_tokens=self.agent_config["max_new_tokens"], | |
| temperature=self.agent_config["temperature"] | |
| ) | |
| # Parse structured response | |
| image_quality = parse_image_quality_for_deepforest(response) | |
| deepforest_objects = parse_deepforest_objects_present(response) | |
| additional_objects = parse_additional_objects_json(response) | |
| raw_visual_analysis = parse_visual_analysis(response) | |
| # Format visual analysis with consistent prefix | |
| if is_full_image: | |
| width, height = image.size | |
| visual_analysis = f"Full image analysis of image ({width}x{height}) is done. Here's the analysis: {raw_visual_analysis}" | |
| analysis_type = "full_image" | |
| else: | |
| visual_analysis = f"The visual analysis of tiled image on ({tile_location}) this location is done. Here's the analysis: {raw_visual_analysis}" | |
| analysis_type = "tiled_image" | |
| return { | |
| "image_quality_for_deepforest": image_quality, | |
| "deepforest_objects_present": deepforest_objects, | |
| "additional_objects": additional_objects, | |
| "visual_analysis": visual_analysis, | |
| "status": "success", | |
| "analysis_type": analysis_type, | |
| "raw_response": response | |
| } | |
| def _analyze_with_tiling(self, user_message: str, session_id: str, error_msg: str) -> Dict[str, Any]: | |
| """ | |
| Analyze image using tiling approach when full image fails. | |
| Args: | |
| user_message: User's query | |
| session_id: Session identifier | |
| error_msg: Original error message | |
| Returns: | |
| Combined analysis from tiled approach with same structure as full image | |
| """ | |
| print(f"Session {session_id} - Falling back to tiled analysis due to: {error_msg}") | |
| image = session_state_manager.get(session_id, "current_image") | |
| image_file_path = session_state_manager.get(session_id, "image_file_path") | |
| if not image: | |
| return { | |
| "image_quality_for_deepforest": "No", | |
| "deepforest_objects_present": [], | |
| "additional_objects": [], | |
| "visual_analysis": "No image available for tiled analysis.", | |
| "status": "error", | |
| "analysis_type": "error" | |
| } | |
| # Determine appropriate patch size | |
| if image_file_path: | |
| patch_size = determine_patch_size(image_file_path, image.size) | |
| else: | |
| max_dim = max(image.size) | |
| if max_dim >= 5000: | |
| patch_size = 1500 if max_dim <= 7500 else 2000 | |
| else: | |
| patch_size = 1000 | |
| print(f"Session {session_id} - Using patch size {patch_size} for tiled analysis") | |
| try: | |
| tiles, tile_metadata = tile_image_for_analysis( | |
| image=image, | |
| patch_size=patch_size, | |
| patch_overlap=Config.DEEPFOREST_DEFAULTS["patch_overlap"], | |
| image_file_path=image_file_path | |
| ) | |
| print(f"Session {session_id} - Created {len(tiles)} tiles for analysis") | |
| # Analyze all tiles and combine results | |
| all_visual_analyses = [] | |
| all_additional_objects = [] | |
| tile_results = [] | |
| for i, (tile, metadata) in enumerate(zip(tiles, tile_metadata)): | |
| try: | |
| tile_coords = metadata.get("window_coords", {}) | |
| location_desc = f"x:{tile_coords.get('x', 0)}-{tile_coords.get('x', 0) + tile_coords.get('width', 0)}, y:{tile_coords.get('y', 0)}-{tile_coords.get('y', 0) + tile_coords.get('height', 0)}" | |
| # Analyze individual tile | |
| tile_result = self._analyze_single_image( | |
| image=tile, | |
| user_message=user_message, | |
| session_id=session_id, | |
| is_full_image=False, | |
| tile_location=location_desc | |
| ) | |
| if tile_result["status"] == "success": | |
| all_visual_analyses.append(tile_result["visual_analysis"]) | |
| all_additional_objects.extend(tile_result["additional_objects"]) | |
| # Store tile result for potential reuse | |
| tile_results.append({ | |
| "tile_id": i, | |
| "location": location_desc, | |
| "coordinates": tile_coords, | |
| "visual_analysis": tile_result["visual_analysis"], | |
| "additional_objects": tile_result["additional_objects"] | |
| }) | |
| # Log individual tile analysis | |
| multi_agent_logger.log_agent_execution( | |
| session_id=session_id, | |
| agent_name=f"visual_tile_{i}", | |
| agent_input=f"Tile {i+1} analysis: {user_message}", | |
| agent_output=tile_result["visual_analysis"], | |
| execution_time=0.0 | |
| ) | |
| print(f"Session {session_id} - Analyzed tile {i+1}/{len(tiles)}") | |
| # Memory cleanup | |
| del tile | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| except Exception as tile_error: | |
| print(f"Session {session_id} - Tile {i} analysis failed: {tile_error}") | |
| continue | |
| if all_visual_analyses: | |
| # Store tile results for potential reuse | |
| session_state_manager.set(session_id, "tile_analysis_results", tile_results) | |
| session_state_manager.set(session_id, "tiled_patch_size", patch_size) | |
| # Combine all tile analyses | |
| combined_visual_analysis = " ".join(all_visual_analyses) | |
| return { | |
| "image_quality_for_deepforest": "Yes", | |
| "deepforest_objects_present": ["tree", "bird", "livestock"], | |
| "additional_objects": all_additional_objects, | |
| "visual_analysis": combined_visual_analysis, | |
| "status": "tiled_success", | |
| "analysis_type": "tiled_combined", | |
| "tile_count": len(tiles), | |
| "successful_tiles": len(all_visual_analyses), | |
| "patch_size_used": patch_size | |
| } | |
| except Exception as tiling_error: | |
| print(f"Session {session_id} - Tiled analysis also failed: {tiling_error}") | |
| # Final fallback - resolution-based assessment | |
| resolution_result = session_state_manager.get(session_id, "resolution_result") | |
| if resolution_result and resolution_result.get("is_suitable"): | |
| width, height = image.size | |
| return { | |
| "image_quality_for_deepforest": "Yes", | |
| "deepforest_objects_present": ["tree", "bird", "livestock"], | |
| "additional_objects": [], | |
| "visual_analysis": f"Full image analysis of image ({width}x{height}) is done. Here's the analysis: Large image analyzed using resolution-based assessment. Original error: {error_msg}", | |
| "status": "resolution_fallback", | |
| "analysis_type": "resolution_based" | |
| } | |
| # Complete failure | |
| width, height = image.size | |
| return { | |
| "image_quality_for_deepforest": "No", | |
| "deepforest_objects_present": [], | |
| "additional_objects": [], | |
| "visual_analysis": f"Full image analysis of image ({width}x{height}) failed. Analysis could not be completed due to: {error_msg}", | |
| "status": "error", | |
| "analysis_type": "failed" | |
| } | |
| def get_tile_analysis_results(self, session_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get stored tile analysis results for reuse. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| List of tile analysis results or empty list | |
| """ | |
| return session_state_manager.get(session_id, "tile_analysis_results", []) |