File size: 13,432 Bytes
4f24301
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
from typing import Dict, List, Any, Optional
from PIL import Image
import json
import re
import time
import torch
import gc

from deepforest_agent.models.qwen_vl_3b_instruct import QwenVL3BModelManager
from deepforest_agent.utils.image_utils import encode_pil_image_to_base64_url, determine_patch_size, get_image_dimensions_fast
from deepforest_agent.utils.state_manager import session_state_manager
from deepforest_agent.conf.config import Config
from deepforest_agent.utils.parsing_utils import (
    parse_image_quality_for_deepforest,
    parse_deepforest_objects_present,
    parse_visual_analysis,
    parse_additional_objects_json
)
from deepforest_agent.prompts.prompt_templates import create_full_image_quality_analysis_prompt, create_individual_tile_analysis_prompt
from deepforest_agent.utils.logging_utils import multi_agent_logger
from deepforest_agent.utils.tile_manager import tile_image_for_analysis


class VisualAnalysisAgent:
    """
    Visual analysis agent responsible for analyzing images with unified full/tiled approach.
    Uses Qwen VL model for multimodal understanding.
    """
    
    def __init__(self):
        """Initialize the Visual Analysis Agent."""
        self.agent_config = Config.AGENT_CONFIGS["visual_analysis"]
        self.model_manager = QwenVL3BModelManager(Config.AGENT_MODELS["visual_analysis"])

    def analyze_full_image(self, user_message: str, session_id: str) -> Dict[str, Any]:
        """
        Analyze full image with automatic fallback to tiling on OOM.
        
        Args:
            user_message: User's query
            session_id: Session identifier
            
        Returns:
            Dict with unified structure for both full and tiled analysis
        """
        if not session_state_manager.session_exists(session_id):
            return {
                "image_quality_for_deepforest": "No",
                "deepforest_objects_present": [],
                "additional_objects": [],
                "visual_analysis": f"Session {session_id} not found.",
                "status": "error",
                "analysis_type": "error"
            }

        image = session_state_manager.get(session_id, "current_image")
        if image is None:
            return {
                "image_quality_for_deepforest": "No", 
                "deepforest_objects_present": [],
                "additional_objects": [],
                "visual_analysis": f"No image available in session {session_id}.",
                "status": "error",
                "analysis_type": "error"
            }
        
        # Try full image analysis first
        try:
            print(f"Session {session_id} - Attempting full image analysis")
            result = self._analyze_single_image(image, user_message, session_id, is_full_image=True)
            
            if result["status"] == "success":
                multi_agent_logger.log_agent_execution(
                    session_id=session_id,
                    agent_name="visual_analysis",
                    agent_input=f"Full image analysis for: {user_message}",
                    agent_output=result["visual_analysis"],
                    execution_time=0.0
                )
                return result
                
        except Exception as e:
            print(f"Session {session_id} - Full image analysis failed (likely OOM): {e}")
            return self._analyze_with_tiling(user_message, session_id, str(e))

        return self._analyze_with_tiling(user_message, session_id, "Full image analysis failed")

    def _analyze_single_image(self, image: Image.Image, user_message: str, session_id: str, 
                             is_full_image: bool = True, tile_location: str = "") -> Dict[str, Any]:
        """
        Analyze a single image (full image or tile) with unified structure.
        
        Args:
            image: PIL Image to analyze
            user_message: User's query
            session_id: Session identifier
            is_full_image: Whether this is full image or tile
            tile_location: Location description for tiles
            
        Returns:
            Unified analysis result
        """
        system_prompt = create_full_image_quality_analysis_prompt(user_message)
        image_base64_url = encode_pil_image_to_base64_url(image)
        
        messages = [
            {"role": "system", "content": [{"type": "text", "text": system_prompt}]},
            {
                "role": "user",
                "content": [
                    {"type": "image", "image": image_base64_url},
                    {"type": "text", "text": user_message}
                ]
            }
        ]
        
        response = self.model_manager.generate_response(
            messages=messages,
            max_new_tokens=self.agent_config["max_new_tokens"],
            temperature=self.agent_config["temperature"]
        )

        # Parse structured response
        image_quality = parse_image_quality_for_deepforest(response)
        deepforest_objects = parse_deepforest_objects_present(response)
        additional_objects = parse_additional_objects_json(response)
        raw_visual_analysis = parse_visual_analysis(response)
        
        # Format visual analysis with consistent prefix
        if is_full_image:
            width, height = image.size
            visual_analysis = f"Full image analysis of image ({width}x{height}) is done. Here's the analysis: {raw_visual_analysis}"
            analysis_type = "full_image"
        else:
            visual_analysis = f"The visual analysis of tiled image on ({tile_location}) this location is done. Here's the analysis: {raw_visual_analysis}"
            analysis_type = "tiled_image"
        
        return {
            "image_quality_for_deepforest": image_quality,
            "deepforest_objects_present": deepforest_objects,
            "additional_objects": additional_objects,
            "visual_analysis": visual_analysis,
            "status": "success",
            "analysis_type": analysis_type,
            "raw_response": response
        }

    def _analyze_with_tiling(self, user_message: str, session_id: str, error_msg: str) -> Dict[str, Any]:
        """
        Analyze image using tiling approach when full image fails.
        
        Args:
            user_message: User's query
            session_id: Session identifier  
            error_msg: Original error message
            
        Returns:
            Combined analysis from tiled approach with same structure as full image
        """
        print(f"Session {session_id} - Falling back to tiled analysis due to: {error_msg}")
        
        image = session_state_manager.get(session_id, "current_image")
        image_file_path = session_state_manager.get(session_id, "image_file_path")
        
        if not image:
            return {
                "image_quality_for_deepforest": "No",
                "deepforest_objects_present": [],
                "additional_objects": [],
                "visual_analysis": "No image available for tiled analysis.",
                "status": "error",
                "analysis_type": "error"
            }
        
        # Determine appropriate patch size
        if image_file_path:
            patch_size = determine_patch_size(image_file_path, image.size)
        else:
            max_dim = max(image.size)
            if max_dim >= 5000:
                patch_size = 1500 if max_dim <= 7500 else 2000
            else:
                patch_size = 1000
        
        print(f"Session {session_id} - Using patch size {patch_size} for tiled analysis")
        
        try:
            tiles, tile_metadata = tile_image_for_analysis(
                image=image,
                patch_size=patch_size,
                patch_overlap=Config.DEEPFOREST_DEFAULTS["patch_overlap"],
                image_file_path=image_file_path
            )
            
            print(f"Session {session_id} - Created {len(tiles)} tiles for analysis")
            
            # Analyze all tiles and combine results
            all_visual_analyses = []
            all_additional_objects = []
            tile_results = []
            
            for i, (tile, metadata) in enumerate(zip(tiles, tile_metadata)):
                try:
                    tile_coords = metadata.get("window_coords", {})
                    location_desc = f"x:{tile_coords.get('x', 0)}-{tile_coords.get('x', 0) + tile_coords.get('width', 0)}, y:{tile_coords.get('y', 0)}-{tile_coords.get('y', 0) + tile_coords.get('height', 0)}"
                    
                    # Analyze individual tile
                    tile_result = self._analyze_single_image(
                        image=tile,
                        user_message=user_message,
                        session_id=session_id,
                        is_full_image=False,
                        tile_location=location_desc
                    )
                    
                    if tile_result["status"] == "success":
                        all_visual_analyses.append(tile_result["visual_analysis"])
                        all_additional_objects.extend(tile_result["additional_objects"])
                        
                        # Store tile result for potential reuse
                        tile_results.append({
                            "tile_id": i,
                            "location": location_desc,
                            "coordinates": tile_coords,
                            "visual_analysis": tile_result["visual_analysis"],
                            "additional_objects": tile_result["additional_objects"]
                        })
                    
                    # Log individual tile analysis
                    multi_agent_logger.log_agent_execution(
                        session_id=session_id,
                        agent_name=f"visual_tile_{i}",
                        agent_input=f"Tile {i+1} analysis: {user_message}",
                        agent_output=tile_result["visual_analysis"],
                        execution_time=0.0
                    )
                    
                    print(f"Session {session_id} - Analyzed tile {i+1}/{len(tiles)}")
                    
                    # Memory cleanup
                    del tile
                    gc.collect()
                    if torch.cuda.is_available():
                        torch.cuda.empty_cache()
                        
                except Exception as tile_error:
                    print(f"Session {session_id} - Tile {i} analysis failed: {tile_error}")
                    continue
            
            if all_visual_analyses:
                # Store tile results for potential reuse
                session_state_manager.set(session_id, "tile_analysis_results", tile_results)
                session_state_manager.set(session_id, "tiled_patch_size", patch_size)
                
                # Combine all tile analyses
                combined_visual_analysis = " ".join(all_visual_analyses)
                
                return {
                    "image_quality_for_deepforest": "Yes",
                    "deepforest_objects_present": ["tree", "bird", "livestock"],
                    "additional_objects": all_additional_objects,
                    "visual_analysis": combined_visual_analysis,
                    "status": "tiled_success", 
                    "analysis_type": "tiled_combined",
                    "tile_count": len(tiles),
                    "successful_tiles": len(all_visual_analyses),
                    "patch_size_used": patch_size
                }
            
        except Exception as tiling_error:
            print(f"Session {session_id} - Tiled analysis also failed: {tiling_error}")
        
        # Final fallback - resolution-based assessment
        resolution_result = session_state_manager.get(session_id, "resolution_result")
        if resolution_result and resolution_result.get("is_suitable"):
            width, height = image.size
            return {
                "image_quality_for_deepforest": "Yes",
                "deepforest_objects_present": ["tree", "bird", "livestock"],
                "additional_objects": [],
                "visual_analysis": f"Full image analysis of image ({width}x{height}) is done. Here's the analysis: Large image analyzed using resolution-based assessment. Original error: {error_msg}",
                "status": "resolution_fallback",
                "analysis_type": "resolution_based"
            }
        
        # Complete failure
        width, height = image.size
        return {
            "image_quality_for_deepforest": "No",
            "deepforest_objects_present": [],
            "additional_objects": [],
            "visual_analysis": f"Full image analysis of image ({width}x{height}) failed. Analysis could not be completed due to: {error_msg}",
            "status": "error",
            "analysis_type": "failed"
        }

    def get_tile_analysis_results(self, session_id: str) -> List[Dict[str, Any]]:
        """
        Get stored tile analysis results for reuse.
        
        Args:
            session_id: Session identifier
            
        Returns:
            List of tile analysis results or empty list
        """
        return session_state_manager.get(session_id, "tile_analysis_results", [])