Spaces:
Sleeping
Sleeping
File size: 11,317 Bytes
d7b3d84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | import base64
import os
from datetime import datetime, timezone
from pathlib import Path
import anyio
from bubus import BaseEvent
from pydantic import Field, field_validator
from uuid_extensions import uuid7str
MAX_STRING_LENGTH = 100000 # 100K chars ~ 25k tokens should be enough
MAX_URL_LENGTH = 100000
MAX_TASK_LENGTH = 100000
MAX_COMMENT_LENGTH = 2000
MAX_FILE_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB
class UpdateAgentTaskEvent(BaseEvent):
# Required fields for identification
id: str # The task ID to update
user_id: str = Field(max_length=255) # For authorization
device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup
# Optional fields that can be updated
stopped: bool | None = None
paused: bool | None = None
done_output: str | None = Field(None, max_length=MAX_STRING_LENGTH)
finished_at: datetime | None = None
agent_state: dict | None = None
user_feedback_type: str | None = Field(None, max_length=10) # UserFeedbackType enum value as string
user_comment: str | None = Field(None, max_length=MAX_COMMENT_LENGTH)
gif_url: str | None = Field(None, max_length=MAX_URL_LENGTH)
@classmethod
def from_agent(cls, agent) -> 'UpdateAgentTaskEvent':
"""Create an UpdateAgentTaskEvent from an Agent instance"""
if not hasattr(agent, '_task_start_time'):
raise ValueError('Agent must have _task_start_time attribute')
done_output = agent.history.final_result() if agent.history else None
return cls(
id=str(agent.task_id),
user_id='', # To be filled by cloud handler
device_id=agent.cloud_sync.auth_client.device_id
if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client
else None,
stopped=agent.state.stopped if hasattr(agent.state, 'stopped') else False,
paused=agent.state.paused if hasattr(agent.state, 'paused') else False,
done_output=done_output,
finished_at=datetime.now(timezone.utc) if agent.history and agent.history.is_done() else None,
agent_state=agent.state.model_dump() if hasattr(agent.state, 'model_dump') else {},
user_feedback_type=None,
user_comment=None,
gif_url=None,
# user_feedback_type and user_comment would be set by the API/frontend
# gif_url would be set after GIF generation if needed
)
class CreateAgentOutputFileEvent(BaseEvent):
# Model fields
id: str = Field(default_factory=uuid7str)
user_id: str = Field(max_length=255)
device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup
task_id: str
file_name: str = Field(max_length=255)
file_content: str | None = None # Base64 encoded file content
content_type: str | None = Field(None, max_length=100) # MIME type for file uploads
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
@field_validator('file_content')
@classmethod
def validate_file_size(cls, v: str | None) -> str | None:
"""Validate base64 file content size."""
if v is None:
return v
# Remove data URL prefix if present
if ',' in v:
v = v.split(',')[1]
# Estimate decoded size (base64 is ~33% larger)
estimated_size = len(v) * 3 / 4
if estimated_size > MAX_FILE_CONTENT_SIZE:
raise ValueError(f'File content exceeds maximum size of {MAX_FILE_CONTENT_SIZE / 1024 / 1024}MB')
return v
@classmethod
async def from_agent_and_file(cls, agent, output_path: str) -> 'CreateAgentOutputFileEvent':
"""Create a CreateAgentOutputFileEvent from a file path"""
gif_path = Path(output_path)
if not gif_path.exists():
raise FileNotFoundError(f'File not found: {output_path}')
gif_size = os.path.getsize(gif_path)
# Read GIF content for base64 encoding if needed
gif_content = None
if gif_size < 50 * 1024 * 1024: # Only read if < 50MB
async with await anyio.open_file(gif_path, 'rb') as f:
gif_bytes = await f.read()
gif_content = base64.b64encode(gif_bytes).decode('utf-8')
return cls(
user_id='', # To be filled by cloud handler
device_id=agent.cloud_sync.auth_client.device_id
if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client
else None,
task_id=str(agent.task_id),
file_name=gif_path.name,
file_content=gif_content, # Base64 encoded
content_type='image/gif',
)
class CreateAgentStepEvent(BaseEvent):
# Model fields
id: str = Field(default_factory=uuid7str)
user_id: str = Field(max_length=255) # Added for authorization checks
device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
agent_task_id: str
step: int
evaluation_previous_goal: str = Field(max_length=MAX_STRING_LENGTH)
memory: str = Field(max_length=MAX_STRING_LENGTH)
next_goal: str = Field(max_length=MAX_STRING_LENGTH)
actions: list[dict]
screenshot_url: str | None = Field(None, max_length=MAX_FILE_CONTENT_SIZE) # ~50MB for base64 images
url: str = Field(default='', max_length=MAX_URL_LENGTH)
@field_validator('screenshot_url')
@classmethod
def validate_screenshot_size(cls, v: str | None) -> str | None:
"""Validate screenshot URL or base64 content size."""
if v is None or not v.startswith('data:'):
return v
# It's base64 data, check size
if ',' in v:
base64_part = v.split(',')[1]
estimated_size = len(base64_part) * 3 / 4
if estimated_size > MAX_FILE_CONTENT_SIZE:
raise ValueError(f'Screenshot content exceeds maximum size of {MAX_FILE_CONTENT_SIZE / 1024 / 1024}MB')
return v
@classmethod
def from_agent_step(
cls, agent, model_output, result: list, actions_data: list[dict], browser_state_summary
) -> 'CreateAgentStepEvent':
"""Create a CreateAgentStepEvent from agent step data"""
# Get first action details if available
first_action = model_output.action[0] if model_output.action else None
# Extract current state from model output
current_state = model_output.current_state if hasattr(model_output, 'current_state') else None
# Capture screenshot as base64 data URL if available
screenshot_url = None
if browser_state_summary.screenshot:
screenshot_url = f'data:image/jpeg;base64,{browser_state_summary.screenshot}'
import logging
logger = logging.getLogger(__name__)
logger.debug(f'📸 Including screenshot in CreateAgentStepEvent, length: {len(browser_state_summary.screenshot)}')
else:
import logging
logger = logging.getLogger(__name__)
logger.debug('📸 No screenshot in browser_state_summary for CreateAgentStepEvent')
return cls(
user_id='', # To be filled by cloud handler
device_id=agent.cloud_sync.auth_client.device_id
if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client
else None,
agent_task_id=str(agent.task_id),
step=agent.state.n_steps,
evaluation_previous_goal=current_state.evaluation_previous_goal if current_state else '',
memory=current_state.memory if current_state else '',
next_goal=current_state.next_goal if current_state else '',
actions=actions_data, # List of action dicts
url=browser_state_summary.url,
screenshot_url=screenshot_url,
)
class CreateAgentTaskEvent(BaseEvent):
# Model fields
id: str = Field(default_factory=uuid7str)
user_id: str = Field(max_length=255) # Added for authorization checks
device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup
agent_session_id: str
llm_model: str = Field(max_length=200) # LLMModel enum value as string
stopped: bool = False
paused: bool = False
task: str = Field(max_length=MAX_TASK_LENGTH)
done_output: str | None = Field(None, max_length=MAX_STRING_LENGTH)
scheduled_task_id: str | None = None
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
finished_at: datetime | None = None
agent_state: dict = Field(default_factory=dict)
user_feedback_type: str | None = Field(None, max_length=10) # UserFeedbackType enum value as string
user_comment: str | None = Field(None, max_length=MAX_COMMENT_LENGTH)
gif_url: str | None = Field(None, max_length=MAX_URL_LENGTH)
@classmethod
def from_agent(cls, agent) -> 'CreateAgentTaskEvent':
"""Create a CreateAgentTaskEvent from an Agent instance"""
return cls(
id=str(agent.task_id),
user_id='', # To be filled by cloud handler
device_id=agent.cloud_sync.auth_client.device_id
if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client
else None,
agent_session_id=str(agent.session_id),
task=agent.task,
llm_model=agent.llm.model_name,
agent_state=agent.state.model_dump() if hasattr(agent.state, 'model_dump') else {},
stopped=False,
paused=False,
done_output=None,
started_at=datetime.fromtimestamp(agent._task_start_time, tz=timezone.utc),
finished_at=None,
user_feedback_type=None,
user_comment=None,
gif_url=None,
)
class CreateAgentSessionEvent(BaseEvent):
# Model fields
id: str = Field(default_factory=uuid7str)
user_id: str = Field(max_length=255)
device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup
browser_session_id: str = Field(max_length=255)
browser_session_live_url: str = Field(max_length=MAX_URL_LENGTH)
browser_session_cdp_url: str = Field(max_length=MAX_URL_LENGTH)
browser_session_stopped: bool = False
browser_session_stopped_at: datetime | None = None
is_source_api: bool | None = None
browser_state: dict = Field(default_factory=dict)
browser_session_data: dict | None = None
@classmethod
def from_agent(cls, agent) -> 'CreateAgentSessionEvent':
"""Create a CreateAgentSessionEvent from an Agent instance"""
return cls(
id=str(agent.session_id),
user_id='', # To be filled by cloud handler
device_id=agent.cloud_sync.auth_client.device_id
if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client
else None,
browser_session_id=agent.browser_session.id,
browser_session_live_url='', # To be filled by cloud handler
browser_session_cdp_url='', # To be filled by cloud handler
browser_state={
'viewport': agent.browser_profile.viewport if agent.browser_profile else {'width': 1280, 'height': 720},
'user_agent': agent.browser_profile.user_agent if agent.browser_profile else None,
'headless': agent.browser_profile.headless if agent.browser_profile else True,
'initial_url': None, # Will be updated during execution
'final_url': None, # Will be updated during execution
'total_pages_visited': 0, # Will be updated during execution
'session_duration_seconds': 0, # Will be updated during execution
},
browser_session_data={
'cookies': [],
'secrets': {},
# TODO: send secrets safely so tasks can be replayed on cloud seamlessly
# 'secrets': dict(agent.sensitive_data) if agent.sensitive_data else {},
'allowed_domains': agent.browser_profile.allowed_domains if agent.browser_profile else [],
},
)
class UpdateAgentSessionEvent(BaseEvent):
"""Event to update an existing agent session"""
# Model fields
id: str # Session ID to update
user_id: str = Field(max_length=255)
device_id: str | None = Field(None, max_length=255)
browser_session_stopped: bool | None = None
browser_session_stopped_at: datetime | None = None
end_reason: str | None = Field(None, max_length=100) # Why the session ended
|