#!/usr/bin/env python3 """ Utility functions for agent operations """ import time from typing import Tuple, Optional import logging import os import httpx # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def push_document_update( document_id: str, content: str, user_id: str, endpoint_url: Optional[str] = None, timeout: float = 5.0 ) -> bool: """ Push document update to the external endpoint. This function sends the updated document content to the update-document-content endpoint for real-time synchronization via Supabase Realtime. Args: document_id: UUID of the document to update content: Full HTML document content user_id: User ID for authentication endpoint_url: URL of the update endpoint (optional, reads from env if not provided) timeout: Request timeout in seconds (default: 5.0) Returns: bool: True if push was successful, False otherwise Note: This function handles errors gracefully and logs them without raising exceptions. The document editing workflow should continue regardless of push success/failure. """ if not document_id or not content or not user_id: logger.warning("⚠️ Missing required parameters for document update") return False # Get endpoint URL from environment or parameter if not endpoint_url: base_url = os.getenv("SUPABASE_BASE_URL") if not base_url: logger.warning("⚠️ No base URL configured (SUPABASE_BASE_URL not set)") return False endpoint_url = f"{base_url}/functions/v1/update-document-content" if not endpoint_url: logger.warning("⚠️ No update endpoint configured") return False # Get API key for authentication api_key = os.getenv("CYBERLGL_API_KEY") if not api_key: logger.warning("⚠️ No API key configured (CYBERLGL_API_KEY not set)") return False # Prepare payload payload = { "documentId": document_id, "content": content, "userId": user_id } headers = { "x-api-key": api_key, "Content-Type": "application/json" } try: async with httpx.AsyncClient(timeout=timeout) as client: logger.info(f"📤 Pushing document update for {document_id}...") logger.info(f" Content size: {len(content)} bytes") response = await client.post( endpoint_url, json=payload, headers=headers ) if response.status_code == 200: logger.info(f"✅ Document update pushed successfully for {document_id}") return True else: logger.warning( f"⚠️ Document update failed for {document_id}: " f"HTTP {response.status_code} - {response.text}" ) return False except httpx.TimeoutException: logger.error(f"❌ Document update timeout for {document_id} (after {timeout}s)") return False except httpx.RequestError as e: logger.error(f"❌ Document update request failed for {document_id}: {str(e)}") return False except Exception as e: logger.error(f"❌ Unexpected error pushing document update for {document_id}: {str(e)}") return False def find_tool(tool_name: str, tools: list) -> Tuple[Optional[object], Optional[str]]: """ Smart tool lookup that handles various naming conventions. This function handles cases where tool names may or may not have underscore prefixes, and ensures consistent tool resolution regardless of how the LLM formats the name. Args: tool_name: The tool name as provided by the LLM (may have underscores or not) tools: List of available tool objects Returns: Tuple of (tool_function, actual_tool_name) The actual_tool_name is the real name from the tool object (for comparison/injection) Returns (None, None) if no tool is found Examples: "_find_lawyers" → finds "_find_lawyers", returns "_find_lawyers" "__find_lawyers" → finds "_find_lawyers", returns "_find_lawyers" "find_lawyers" → finds "_find_lawyers", returns "_find_lawyers" "search_web" → finds "search_web", returns "search_web" """ logger = logging.getLogger(__name__) # Normalize tool name to guarantee exactly ONE underscore # Handles cases: "_find_lawyers", "__find_lawyers", "find_lawyers" → all become "find_lawyers" tool_name_normalized = "_".join(part for part in tool_name.split("_") if part) tool_name_with_underscore = "_" + tool_name_normalized logger.info(f"🔧 Normalized tool name: {tool_name} → {tool_name_with_underscore}") # Smart tool lookup: try with underscore first, then without tool_func = next((t for t in tools if t.name == tool_name_with_underscore), None) if tool_func: logger.info(f"✅ Found tool: {tool_func.name}") return tool_func, tool_func.name # Try without underscore (for tools like search_web) tool_func = next((t for t in tools if t.name == tool_name_normalized), None) if tool_func: logger.info(f"🔧 Found tool without underscore: {tool_func.name}") return tool_func, tool_func.name # Try original name as last resort tool_func = next((t for t in tools if t.name == tool_name), None) if tool_func: logger.info(f"🔧 Found tool with original name: {tool_func.name}") return tool_func, tool_func.name # No tool found logger.error(f"❌ Tool function not found for: {tool_name}") logger.error(f"🔍 Available tools: {[t.name for t in tools]}") return None, None class PerformanceMonitor: """ Monitor agent performance and timing """ def __init__(self): self.metrics = {} def start_timer(self, operation: str) -> None: """ Start timing an operation """ self.metrics[f"{operation}_start"] = time.time() def end_timer(self, operation: str) -> float: """ End timing an operation and return duration """ start_time = self.metrics.get(f"{operation}_start") if start_time: duration = time.time() - start_time self.metrics[f"{operation}_duration"] = duration return duration return 0.0 def get_metrics(self) -> dict: """ Get all collected metrics """ return self.metrics.copy() def reset(self) -> None: """ Reset all metrics """ self.metrics.clear()