Spaces:
Runtime error
Runtime error
| """ | |
| CellposeAgent with proper VLM configuration, JPEG compression, and reliable path injection. | |
| Key change: The agent stores the current image path in a global context that tools can access, | |
| preventing the LLM from corrupting file paths when passing them as arguments. | |
| """ | |
| import torch | |
| import json | |
| from io import BytesIO | |
| from datetime import datetime | |
| from PIL import Image | |
| from smolagents import ToolCallingAgent, InferenceClientModel | |
| from smolagents.agents import ActionStep | |
| from langfuse import get_client, observe | |
| from config import settings | |
| from utils.gpu import clear_gpu_cache | |
| from tools import all_tools | |
| langfuse = get_client() | |
| # ============================================================================= | |
| # GLOBAL IMAGE PATH CONTEXT | |
| # ============================================================================= | |
| # This module-level variable stores the current image path reliably. | |
| # Tools can access this directly instead of relying on the LLM to pass the path. | |
| _current_image_context = { | |
| "image_path": None, | |
| "output_path": None, | |
| } | |
| def set_current_image_path(path: str) -> None: | |
| """Set the current image path for tools to access.""" | |
| global _current_image_context | |
| _current_image_context["image_path"] = path | |
| _current_image_context["output_path"] = None # Reset output on new image | |
| print(f"[Context] Set current image path: {path}") | |
| def get_current_image_path() -> str | None: | |
| """Get the current image path (used by tools).""" | |
| return _current_image_context.get("image_path") | |
| def set_current_output_path(path: str) -> None: | |
| """Set the current output path after segmentation.""" | |
| global _current_image_context | |
| _current_image_context["output_path"] = path | |
| def get_current_output_path() -> str | None: | |
| """Get the current output path (used by tools).""" | |
| return _current_image_context.get("output_path") | |
| def get_image_context() -> dict: | |
| """Get the full image context dictionary.""" | |
| return _current_image_context.copy() | |
| class CellposeAgent: | |
| def attach_images_callback(step_log: ActionStep, agent: ToolCallingAgent) -> None: | |
| """ | |
| Callback to attach actual PIL images for VLM inspection. | |
| Images are automatically resized and compressed to reduce token consumption. | |
| """ | |
| if not isinstance(step_log, ActionStep): | |
| return | |
| if not step_log.observations: | |
| return | |
| def resize_and_compress_image(img: Image.Image, max_size: int = 512, quality: int = 75) -> Image.Image: | |
| """ | |
| Resize and compress image to reduce payload size. | |
| Args: | |
| img: Input PIL Image | |
| max_size: Maximum dimension (width or height) | |
| quality: JPEG quality (1-95, lower = smaller file) | |
| Returns: | |
| Compressed PIL Image | |
| """ | |
| # Convert to RGB if needed (JPEG doesn't support RGBA) | |
| if img.mode in ('RGBA', 'LA', 'P'): | |
| background = Image.new('RGB', img.size, (255, 255, 255)) | |
| if img.mode == 'P': | |
| img = img.convert('RGBA') | |
| background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None) | |
| img = background | |
| elif img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Resize maintaining aspect ratio | |
| if max(img.size) > max_size: | |
| ratio = max_size / max(img.size) | |
| new_size = tuple(int(dim * ratio) for dim in img.size) | |
| img = img.resize(new_size, Image.Resampling.LANCZOS) | |
| # Compress using JPEG encoding | |
| buffer = BytesIO() | |
| img.save(buffer, format='JPEG', quality=quality, optimize=True) | |
| buffer.seek(0) | |
| compressed_img = Image.open(buffer) | |
| print(f" Resized and compressed to {compressed_img.size}, quality={quality}") | |
| return compressed_img | |
| try: | |
| obs_data = json.loads(step_log.observations) | |
| # Pattern 1: Single image from get_segmentation_parameters | |
| if obs_data.get("status") == "success" and "image_path" in obs_data: | |
| image_path = obs_data["image_path"] | |
| print(f"[Callback] Attaching image: {image_path}") | |
| try: | |
| img = Image.open(image_path) | |
| compressed_img = resize_and_compress_image(img, max_size=512, quality=75) | |
| # Attach compressed PIL Image | |
| step_log.observations_images = [compressed_img] | |
| # Keep metadata for context | |
| obs_data["image_info"] = { | |
| "original_dimensions": f"{img.size[0]}x{img.size[1]} pixels", | |
| "processed_dimensions": f"{compressed_img.size[0]}x{compressed_img.size[1]} pixels", | |
| "mode": compressed_img.mode, | |
| "note": "Image compressed for API efficiency (JPEG quality=75)" | |
| } | |
| step_log.observations = json.dumps(obs_data, indent=2) | |
| print(f"[Callback] β Attached compressed image for VLM inspection") | |
| except Exception as e: | |
| print(f"[Callback] Error attaching image: {e}") | |
| # Pattern 2: Segmented image ONLY from refine_segmentation | |
| elif obs_data.get("status") == "ready_for_visual_analysis": | |
| paths = obs_data.get("image_paths", {}) | |
| segmented = paths.get("segmented") | |
| if segmented: | |
| print(f"[Callback] Attaching segmented image only: {segmented}") | |
| try: | |
| seg_img = Image.open(segmented) | |
| # Compress the segmented image | |
| compressed_seg = resize_and_compress_image(seg_img, max_size=512, quality=75) | |
| # Attach only the segmented image | |
| step_log.observations_images = [compressed_seg] | |
| obs_data["images_info"] = { | |
| "image_type": "segmented_overlay", | |
| "description": "Segmentation result with colored cell masks overlaid on original image", | |
| "original_size": f"{seg_img.size[0]}x{seg_img.size[1]}", | |
| "processed_size": f"{compressed_seg.size[0]}x{compressed_seg.size[1]}", | |
| "note": "Segmented image attached for quality assessment (JPEG quality=75)" | |
| } | |
| step_log.observations = json.dumps(obs_data, indent=2) | |
| print(f"[Callback] β Attached compressed segmented image for VLM inspection") | |
| except Exception as e: | |
| print(f"[Callback] Error attaching segmented image: {e}") | |
| except json.JSONDecodeError: | |
| pass | |
| except Exception as e: | |
| print(f"[Callback] Error in attach_images_callback: {e}") | |
| def manage_image_memory(step_log: ActionStep, agent: ToolCallingAgent) -> None: | |
| """ | |
| Clear images from ALL previous steps at the START of each new step. | |
| """ | |
| if not isinstance(step_log, ActionStep): | |
| return | |
| # Clear ALL previous step images immediately | |
| for previous_step in agent.memory.steps: | |
| if isinstance(previous_step, ActionStep): | |
| if previous_step.observations_images is not None: | |
| print(f" [Memory] Clearing images from step {previous_step.step_number}") | |
| previous_step.observations_images = [] # Use empty list instead of None | |
| # Also try to clear any cached references | |
| if hasattr(previous_step, '_observations_images'): | |
| previous_step._observations_images = [] | |
| def __init__(self): | |
| self.instructions = """ | |
| You are an assistant for the cellpose-sam segmentation tool. | |
| ## CRITICAL: IMAGE PATH HANDLING ## | |
| **The image path is automatically available to all tools. You do NOT need to pass the exact path.** | |
| When calling tools that require an image path, you can pass an empty string "" or any placeholder - | |
| the tool will automatically use the correct image path from the system context. | |
| Example: Instead of trying to reproduce the exact path, just call: | |
| `get_segmentation_parameters(image_path="")` | |
| `run_cellpose_sam(image_path="")` | |
| The system will automatically use the correct path. | |
| ## PRIMARY WORKFLOW - IMAGE SEGMENTATION | |
| When a user provides an image: | |
| 1. Use appropriate tools to review which cellpose-sam parameters are available. | |
| 2. Use the tool: `get_segmentation_parameters` with image_path="" | |
| - **IMPORTANT**: After this tool runs, you will receive image metadata (dimensions, properties) | |
| - Use this information to reason about appropriate parameter values | |
| 3. Carefully analyze the image metadata and matched parameters: | |
| - Consider cell density based on image dimensions | |
| - Compare matched parameter values to image characteristics | |
| - Consider if adjustments would likely improve the segmentation | |
| 4. Be conservative: if you make changes, assess if they should differ significantly from the original values | |
| 5. Provide your final parameter recommendations in a clear, structured format | |
| 6. Use the parameters to run cellpose_sam through the tool: run_cellpose_sam with image_path="" | |
| 7. After run_cellpose_sam, call the tool: refine_cellpose_sam_segmentation | |
| - **IMPORTANT**: After this tool runs, you will see the SEGMENTED image (colored masks overlay) | |
| - Visually inspect the segmentation quality - are cells properly detected and separated? | |
| - Use the visual analysis checklist provided in the tool output | |
| 8. Based on visual analysis of the segmented image: | |
| - Assess if cell boundaries are accurate | |
| - Check if neighboring cells are properly separated or merged | |
| - Look for false positive detections (noise) | |
| - Identify any obvious cells that were missed | |
| - If refinement is needed, use knowledge graph and RAG tools to understand parameter effects | |
| - Decide which parameters to adjust based on what you observe | |
| - Re-run run_cellpose_sam with adjusted parameters | |
| **CRITICAL: Call refine_cellpose_sam_segmentation AT MOST 1 TIMES total** | |
| - First call: Check initial segmentation quality | |
| - Second call (if needed): Verify refinement improved results | |
| - NEVER call it a second time - always stop after 1 refinement check | |
| ## DOCUMENTATION QUERY WORKFLOW ## | |
| - "What is X": use `search_documentation_vector` | |
| - "How does X affect Y": use `search_knowledge_graph` | |
| - Complex analysis: use `hybrid_search` | |
| - Parameter relationships: use `get_parameter_relationships` | |
| ## RESPONSE STYLE ## | |
| - Be concise and actionable | |
| - Always explain your reasoning when adjusting parameters | |
| - If keeping original matched parameters, briefly confirm why it's appropriate | |
| - Base your decisions on visual observation of the segmented output | |
| **CRITICAL - Final Response Format:** | |
| When segmentation is complete, you MUST provide a comprehensive text summary that includes: | |
| 1. A brief statement about the segmentation completion | |
| 2. Number of cells detected | |
| 3. The final parameters used (diameter, flow_threshold, cellprob_threshold, min_size) | |
| 4. A quality assessment (e.g., "excellent", "good", "acceptable") | |
| 5. Any observations about the segmentation (e.g., "cells well-separated", "some clustering") | |
| 6. The output file path at the end | |
| Example good final response: | |
| "Segmentation completed successfully! I detected 42 cells in your image using the following parameters: | |
| - diameter: 30 | |
| - flow_threshold: 0.6 | |
| - cellprob_threshold: 0 | |
| - min_size: 15 | |
| The segmentation quality looks excellent - cell boundaries are well-defined and neighboring cells are properly separated with minimal false positives. | |
| Output saved to: /path/to/image_cellpose_sam_overlay.png" | |
| **NEVER return just a filename like "segmentation_output.png" - always provide the full context above.** | |
| """ | |
| self.model = self._initialize_model() | |
| self.agent = self._create_agent() | |
| def _initialize_model(self): | |
| """Initializes the InferenceClientModel for the agent with VLM support.""" | |
| clear_gpu_cache() | |
| return InferenceClientModel( | |
| model_id=settings.AGENT_MODEL_ID, | |
| token=settings.HF_TOKEN, | |
| timeout=240 # 3 minutes timeout for API calls | |
| ) | |
| def _create_agent(self): | |
| """Creates the ToolCallingAgent with all available tools and memory management.""" | |
| return ToolCallingAgent( | |
| model=self.model, | |
| tools=all_tools, | |
| instructions=self.instructions, | |
| max_steps=6, | |
| step_callbacks=[ | |
| self.attach_images_callback, | |
| self.manage_image_memory, | |
| ] | |
| ) | |
| def run(self, task: str, image_path: str = None): | |
| """ | |
| Runs the agent on a given task with Langfuse tracing. | |
| Args: | |
| task: The task description/prompt | |
| image_path: Optional path to the image (will be stored in context for tools) | |
| """ | |
| print(f"\n{'='*60}\nTASK: {task}\n{'='*60}") | |
| # Store image path in global context for tools to access | |
| if image_path: | |
| set_current_image_path(image_path) | |
| try: | |
| langfuse.update_current_trace( | |
| input={"task": task, "image_path": image_path}, | |
| user_id="user_001", | |
| tags=["rag", "cellpose", "knowledge-graph", "vision"], | |
| metadata={ | |
| "agent_type": "ToolCallingAgent", | |
| "model_id": settings.AGENT_MODEL_ID, | |
| "image_context": get_image_context() | |
| } | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not update Langfuse trace: {e}") | |
| try: | |
| final_answer = self.agent.run(task) | |
| print("\n--- Final Answer from Agent ---\n", final_answer) | |
| try: | |
| langfuse.update_current_trace(output={"final_answer": final_answer}) | |
| except Exception as e: | |
| print(f"Warning: Could not update Langfuse output: {e}") | |
| return final_answer | |
| except Exception as e: | |
| print(f"Agent run failed: {e}") | |
| try: | |
| langfuse.update_current_trace(output={"error": str(e)}) | |
| except Exception as log_error: | |
| print(f"Warning: Could not log error to Langfuse: {log_error}") | |
| raise | |
| finally: | |
| clear_gpu_cache() |