Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Utility functions for agent operations | |
| """ | |
| import time | |
| from typing import Tuple, Optional | |
| import logging | |
| import os | |
| import httpx | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| async def push_document_update( | |
| document_id: str, | |
| content: str, | |
| user_id: str, | |
| endpoint_url: Optional[str] = None, | |
| timeout: float = 5.0 | |
| ) -> bool: | |
| """ | |
| Push document update to the external endpoint. | |
| This function sends the updated document content to the update-document-content | |
| endpoint for real-time synchronization via Supabase Realtime. | |
| Args: | |
| document_id: UUID of the document to update | |
| content: Full HTML document content | |
| user_id: User ID for authentication | |
| endpoint_url: URL of the update endpoint (optional, reads from env if not provided) | |
| timeout: Request timeout in seconds (default: 5.0) | |
| Returns: | |
| bool: True if push was successful, False otherwise | |
| Note: | |
| This function handles errors gracefully and logs them without raising exceptions. | |
| The document editing workflow should continue regardless of push success/failure. | |
| """ | |
| if not document_id or not content or not user_id: | |
| logger.warning("β οΈ Missing required parameters for document update") | |
| return False | |
| # Get endpoint URL from environment or parameter | |
| if not endpoint_url: | |
| base_url = os.getenv("SUPABASE_BASE_URL") | |
| if not base_url: | |
| logger.warning("β οΈ No base URL configured (SUPABASE_BASE_URL not set)") | |
| return False | |
| endpoint_url = f"{base_url}/functions/v1/update-document-content" | |
| if not endpoint_url: | |
| logger.warning("β οΈ No update endpoint configured") | |
| return False | |
| # Get API key for authentication | |
| api_key = os.getenv("CYBERLGL_API_KEY") | |
| if not api_key: | |
| logger.warning("β οΈ No API key configured (CYBERLGL_API_KEY not set)") | |
| return False | |
| # Prepare payload | |
| payload = { | |
| "documentId": document_id, | |
| "content": content, | |
| "userId": user_id | |
| } | |
| headers = { | |
| "x-api-key": api_key, | |
| "Content-Type": "application/json" | |
| } | |
| try: | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| logger.info(f"π€ Pushing document update for {document_id}...") | |
| logger.info(f" Content size: {len(content)} bytes") | |
| response = await client.post( | |
| endpoint_url, | |
| json=payload, | |
| headers=headers | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"β Document update pushed successfully for {document_id}") | |
| return True | |
| else: | |
| logger.warning( | |
| f"β οΈ Document update failed for {document_id}: " | |
| f"HTTP {response.status_code} - {response.text}" | |
| ) | |
| return False | |
| except httpx.TimeoutException: | |
| logger.error(f"β Document update timeout for {document_id} (after {timeout}s)") | |
| return False | |
| except httpx.RequestError as e: | |
| logger.error(f"β Document update request failed for {document_id}: {str(e)}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Unexpected error pushing document update for {document_id}: {str(e)}") | |
| return False | |
| def find_tool(tool_name: str, tools: list) -> Tuple[Optional[object], Optional[str]]: | |
| """ | |
| Smart tool lookup that handles various naming conventions. | |
| This function handles cases where tool names may or may not have underscore prefixes, | |
| and ensures consistent tool resolution regardless of how the LLM formats the name. | |
| Args: | |
| tool_name: The tool name as provided by the LLM (may have underscores or not) | |
| tools: List of available tool objects | |
| Returns: | |
| Tuple of (tool_function, actual_tool_name) | |
| The actual_tool_name is the real name from the tool object (for comparison/injection) | |
| Returns (None, None) if no tool is found | |
| Examples: | |
| "_find_lawyers" β finds "_find_lawyers", returns "_find_lawyers" | |
| "__find_lawyers" β finds "_find_lawyers", returns "_find_lawyers" | |
| "find_lawyers" β finds "_find_lawyers", returns "_find_lawyers" | |
| "search_web" β finds "search_web", returns "search_web" | |
| """ | |
| logger = logging.getLogger(__name__) | |
| # Normalize tool name to guarantee exactly ONE underscore | |
| # Handles cases: "_find_lawyers", "__find_lawyers", "find_lawyers" β all become "find_lawyers" | |
| tool_name_normalized = "_".join(part for part in tool_name.split("_") if part) | |
| tool_name_with_underscore = "_" + tool_name_normalized | |
| logger.info(f"π§ Normalized tool name: {tool_name} β {tool_name_with_underscore}") | |
| # Smart tool lookup: try with underscore first, then without | |
| tool_func = next((t for t in tools if t.name == tool_name_with_underscore), None) | |
| if tool_func: | |
| logger.info(f"β Found tool: {tool_func.name}") | |
| return tool_func, tool_func.name | |
| # Try without underscore (for tools like search_web) | |
| tool_func = next((t for t in tools if t.name == tool_name_normalized), None) | |
| if tool_func: | |
| logger.info(f"π§ Found tool without underscore: {tool_func.name}") | |
| return tool_func, tool_func.name | |
| # Try original name as last resort | |
| tool_func = next((t for t in tools if t.name == tool_name), None) | |
| if tool_func: | |
| logger.info(f"π§ Found tool with original name: {tool_func.name}") | |
| return tool_func, tool_func.name | |
| # No tool found | |
| logger.error(f"β Tool function not found for: {tool_name}") | |
| logger.error(f"π Available tools: {[t.name for t in tools]}") | |
| return None, None | |
| class PerformanceMonitor: | |
| """ | |
| Monitor agent performance and timing | |
| """ | |
| def __init__(self): | |
| self.metrics = {} | |
| def start_timer(self, operation: str) -> None: | |
| """ | |
| Start timing an operation | |
| """ | |
| self.metrics[f"{operation}_start"] = time.time() | |
| def end_timer(self, operation: str) -> float: | |
| """ | |
| End timing an operation and return duration | |
| """ | |
| start_time = self.metrics.get(f"{operation}_start") | |
| if start_time: | |
| duration = time.time() - start_time | |
| self.metrics[f"{operation}_duration"] = duration | |
| return duration | |
| return 0.0 | |
| def get_metrics(self) -> dict: | |
| """ | |
| Get all collected metrics | |
| """ | |
| return self.metrics.copy() | |
| def reset(self) -> None: | |
| """ | |
| Reset all metrics | |
| """ | |
| self.metrics.clear() | |