Spaces:
Running
Running
File size: 6,934 Bytes
65eb048 8858ffb 65eb048 8858ffb 65eb048 8858ffb 65eb048 8858ffb 65eb048 8858ffb 7abc361 87d5a5f 7abc361 87d5a5f 7abc361 87d5a5f 7abc361 87d5a5f 7abc361 87d5a5f 7abc361 8858ffb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | #!/usr/bin/env python3
"""
Utility functions for agent operations
"""
import time
from typing import Tuple, Optional
import logging
import os
import httpx
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def push_document_update(
document_id: str,
content: str,
user_id: str,
endpoint_url: Optional[str] = None,
timeout: float = 5.0
) -> bool:
"""
Push document update to the external endpoint.
This function sends the updated document content to the update-document-content
endpoint for real-time synchronization via Supabase Realtime.
Args:
document_id: UUID of the document to update
content: Full HTML document content
user_id: User ID for authentication
endpoint_url: URL of the update endpoint (optional, reads from env if not provided)
timeout: Request timeout in seconds (default: 5.0)
Returns:
bool: True if push was successful, False otherwise
Note:
This function handles errors gracefully and logs them without raising exceptions.
The document editing workflow should continue regardless of push success/failure.
"""
if not document_id or not content or not user_id:
logger.warning("β οΈ Missing required parameters for document update")
return False
# Get endpoint URL from environment or parameter
if not endpoint_url:
base_url = os.getenv("SUPABASE_BASE_URL")
if not base_url:
logger.warning("β οΈ No base URL configured (SUPABASE_BASE_URL not set)")
return False
endpoint_url = f"{base_url}/functions/v1/update-document-content"
if not endpoint_url:
logger.warning("β οΈ No update endpoint configured")
return False
# Get API key for authentication
api_key = os.getenv("CYBERLGL_API_KEY")
if not api_key:
logger.warning("β οΈ No API key configured (CYBERLGL_API_KEY not set)")
return False
# Prepare payload
payload = {
"documentId": document_id,
"content": content,
"userId": user_id
}
headers = {
"x-api-key": api_key,
"Content-Type": "application/json"
}
try:
async with httpx.AsyncClient(timeout=timeout) as client:
logger.info(f"π€ Pushing document update for {document_id}...")
logger.info(f" Content size: {len(content)} bytes")
response = await client.post(
endpoint_url,
json=payload,
headers=headers
)
if response.status_code == 200:
logger.info(f"β
Document update pushed successfully for {document_id}")
return True
else:
logger.warning(
f"β οΈ Document update failed for {document_id}: "
f"HTTP {response.status_code} - {response.text}"
)
return False
except httpx.TimeoutException:
logger.error(f"β Document update timeout for {document_id} (after {timeout}s)")
return False
except httpx.RequestError as e:
logger.error(f"β Document update request failed for {document_id}: {str(e)}")
return False
except Exception as e:
logger.error(f"β Unexpected error pushing document update for {document_id}: {str(e)}")
return False
def find_tool(tool_name: str, tools: list) -> Tuple[Optional[object], Optional[str]]:
"""
Smart tool lookup that handles various naming conventions.
This function handles cases where tool names may or may not have underscore prefixes,
and ensures consistent tool resolution regardless of how the LLM formats the name.
Args:
tool_name: The tool name as provided by the LLM (may have underscores or not)
tools: List of available tool objects
Returns:
Tuple of (tool_function, actual_tool_name)
The actual_tool_name is the real name from the tool object (for comparison/injection)
Returns (None, None) if no tool is found
Examples:
"_find_lawyers" β finds "_find_lawyers", returns "_find_lawyers"
"__find_lawyers" β finds "_find_lawyers", returns "_find_lawyers"
"find_lawyers" β finds "_find_lawyers", returns "_find_lawyers"
"search_web" β finds "search_web", returns "search_web"
"""
logger = logging.getLogger(__name__)
# Normalize tool name to guarantee exactly ONE underscore
# Handles cases: "_find_lawyers", "__find_lawyers", "find_lawyers" β all become "find_lawyers"
tool_name_normalized = "_".join(part for part in tool_name.split("_") if part)
tool_name_with_underscore = "_" + tool_name_normalized
logger.info(f"π§ Normalized tool name: {tool_name} β {tool_name_with_underscore}")
# Smart tool lookup: try with underscore first, then without
tool_func = next((t for t in tools if t.name == tool_name_with_underscore), None)
if tool_func:
logger.info(f"β
Found tool: {tool_func.name}")
return tool_func, tool_func.name
# Try without underscore (for tools like search_web)
tool_func = next((t for t in tools if t.name == tool_name_normalized), None)
if tool_func:
logger.info(f"π§ Found tool without underscore: {tool_func.name}")
return tool_func, tool_func.name
# Try original name as last resort
tool_func = next((t for t in tools if t.name == tool_name), None)
if tool_func:
logger.info(f"π§ Found tool with original name: {tool_func.name}")
return tool_func, tool_func.name
# No tool found
logger.error(f"β Tool function not found for: {tool_name}")
logger.error(f"π Available tools: {[t.name for t in tools]}")
return None, None
class PerformanceMonitor:
"""
Monitor agent performance and timing
"""
def __init__(self):
self.metrics = {}
def start_timer(self, operation: str) -> None:
"""
Start timing an operation
"""
self.metrics[f"{operation}_start"] = time.time()
def end_timer(self, operation: str) -> float:
"""
End timing an operation and return duration
"""
start_time = self.metrics.get(f"{operation}_start")
if start_time:
duration = time.time() - start_time
self.metrics[f"{operation}_duration"] = duration
return duration
return 0.0
def get_metrics(self) -> dict:
"""
Get all collected metrics
"""
return self.metrics.copy()
def reset(self) -> None:
"""
Reset all metrics
"""
self.metrics.clear()
|