import base64 import os from datetime import datetime, timezone from pathlib import Path import anyio from bubus import BaseEvent from pydantic import Field, field_validator from uuid_extensions import uuid7str MAX_STRING_LENGTH = 100000 # 100K chars ~ 25k tokens should be enough MAX_URL_LENGTH = 100000 MAX_TASK_LENGTH = 100000 MAX_COMMENT_LENGTH = 2000 MAX_FILE_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB class UpdateAgentTaskEvent(BaseEvent): # Required fields for identification id: str # The task ID to update user_id: str = Field(max_length=255) # For authorization device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup # Optional fields that can be updated stopped: bool | None = None paused: bool | None = None done_output: str | None = Field(None, max_length=MAX_STRING_LENGTH) finished_at: datetime | None = None agent_state: dict | None = None user_feedback_type: str | None = Field(None, max_length=10) # UserFeedbackType enum value as string user_comment: str | None = Field(None, max_length=MAX_COMMENT_LENGTH) gif_url: str | None = Field(None, max_length=MAX_URL_LENGTH) @classmethod def from_agent(cls, agent) -> 'UpdateAgentTaskEvent': """Create an UpdateAgentTaskEvent from an Agent instance""" if not hasattr(agent, '_task_start_time'): raise ValueError('Agent must have _task_start_time attribute') done_output = agent.history.final_result() if agent.history else None return cls( id=str(agent.task_id), user_id='', # To be filled by cloud handler device_id=agent.cloud_sync.auth_client.device_id if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client else None, stopped=agent.state.stopped if hasattr(agent.state, 'stopped') else False, paused=agent.state.paused if hasattr(agent.state, 'paused') else False, done_output=done_output, finished_at=datetime.now(timezone.utc) if agent.history and agent.history.is_done() else None, agent_state=agent.state.model_dump() if hasattr(agent.state, 'model_dump') else {}, user_feedback_type=None, user_comment=None, gif_url=None, # user_feedback_type and user_comment would be set by the API/frontend # gif_url would be set after GIF generation if needed ) class CreateAgentOutputFileEvent(BaseEvent): # Model fields id: str = Field(default_factory=uuid7str) user_id: str = Field(max_length=255) device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup task_id: str file_name: str = Field(max_length=255) file_content: str | None = None # Base64 encoded file content content_type: str | None = Field(None, max_length=100) # MIME type for file uploads created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) @field_validator('file_content') @classmethod def validate_file_size(cls, v: str | None) -> str | None: """Validate base64 file content size.""" if v is None: return v # Remove data URL prefix if present if ',' in v: v = v.split(',')[1] # Estimate decoded size (base64 is ~33% larger) estimated_size = len(v) * 3 / 4 if estimated_size > MAX_FILE_CONTENT_SIZE: raise ValueError(f'File content exceeds maximum size of {MAX_FILE_CONTENT_SIZE / 1024 / 1024}MB') return v @classmethod async def from_agent_and_file(cls, agent, output_path: str) -> 'CreateAgentOutputFileEvent': """Create a CreateAgentOutputFileEvent from a file path""" gif_path = Path(output_path) if not gif_path.exists(): raise FileNotFoundError(f'File not found: {output_path}') gif_size = os.path.getsize(gif_path) # Read GIF content for base64 encoding if needed gif_content = None if gif_size < 50 * 1024 * 1024: # Only read if < 50MB async with await anyio.open_file(gif_path, 'rb') as f: gif_bytes = await f.read() gif_content = base64.b64encode(gif_bytes).decode('utf-8') return cls( user_id='', # To be filled by cloud handler device_id=agent.cloud_sync.auth_client.device_id if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client else None, task_id=str(agent.task_id), file_name=gif_path.name, file_content=gif_content, # Base64 encoded content_type='image/gif', ) class CreateAgentStepEvent(BaseEvent): # Model fields id: str = Field(default_factory=uuid7str) user_id: str = Field(max_length=255) # Added for authorization checks device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) agent_task_id: str step: int evaluation_previous_goal: str = Field(max_length=MAX_STRING_LENGTH) memory: str = Field(max_length=MAX_STRING_LENGTH) next_goal: str = Field(max_length=MAX_STRING_LENGTH) actions: list[dict] screenshot_url: str | None = Field(None, max_length=MAX_FILE_CONTENT_SIZE) # ~50MB for base64 images url: str = Field(default='', max_length=MAX_URL_LENGTH) @field_validator('screenshot_url') @classmethod def validate_screenshot_size(cls, v: str | None) -> str | None: """Validate screenshot URL or base64 content size.""" if v is None or not v.startswith('data:'): return v # It's base64 data, check size if ',' in v: base64_part = v.split(',')[1] estimated_size = len(base64_part) * 3 / 4 if estimated_size > MAX_FILE_CONTENT_SIZE: raise ValueError(f'Screenshot content exceeds maximum size of {MAX_FILE_CONTENT_SIZE / 1024 / 1024}MB') return v @classmethod def from_agent_step( cls, agent, model_output, result: list, actions_data: list[dict], browser_state_summary ) -> 'CreateAgentStepEvent': """Create a CreateAgentStepEvent from agent step data""" # Get first action details if available first_action = model_output.action[0] if model_output.action else None # Extract current state from model output current_state = model_output.current_state if hasattr(model_output, 'current_state') else None # Capture screenshot as base64 data URL if available screenshot_url = None if browser_state_summary.screenshot: screenshot_url = f'data:image/jpeg;base64,{browser_state_summary.screenshot}' import logging logger = logging.getLogger(__name__) logger.debug(f'📸 Including screenshot in CreateAgentStepEvent, length: {len(browser_state_summary.screenshot)}') else: import logging logger = logging.getLogger(__name__) logger.debug('📸 No screenshot in browser_state_summary for CreateAgentStepEvent') return cls( user_id='', # To be filled by cloud handler device_id=agent.cloud_sync.auth_client.device_id if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client else None, agent_task_id=str(agent.task_id), step=agent.state.n_steps, evaluation_previous_goal=current_state.evaluation_previous_goal if current_state else '', memory=current_state.memory if current_state else '', next_goal=current_state.next_goal if current_state else '', actions=actions_data, # List of action dicts url=browser_state_summary.url, screenshot_url=screenshot_url, ) class CreateAgentTaskEvent(BaseEvent): # Model fields id: str = Field(default_factory=uuid7str) user_id: str = Field(max_length=255) # Added for authorization checks device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup agent_session_id: str llm_model: str = Field(max_length=200) # LLMModel enum value as string stopped: bool = False paused: bool = False task: str = Field(max_length=MAX_TASK_LENGTH) done_output: str | None = Field(None, max_length=MAX_STRING_LENGTH) scheduled_task_id: str | None = None started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) finished_at: datetime | None = None agent_state: dict = Field(default_factory=dict) user_feedback_type: str | None = Field(None, max_length=10) # UserFeedbackType enum value as string user_comment: str | None = Field(None, max_length=MAX_COMMENT_LENGTH) gif_url: str | None = Field(None, max_length=MAX_URL_LENGTH) @classmethod def from_agent(cls, agent) -> 'CreateAgentTaskEvent': """Create a CreateAgentTaskEvent from an Agent instance""" return cls( id=str(agent.task_id), user_id='', # To be filled by cloud handler device_id=agent.cloud_sync.auth_client.device_id if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client else None, agent_session_id=str(agent.session_id), task=agent.task, llm_model=agent.llm.model_name, agent_state=agent.state.model_dump() if hasattr(agent.state, 'model_dump') else {}, stopped=False, paused=False, done_output=None, started_at=datetime.fromtimestamp(agent._task_start_time, tz=timezone.utc), finished_at=None, user_feedback_type=None, user_comment=None, gif_url=None, ) class CreateAgentSessionEvent(BaseEvent): # Model fields id: str = Field(default_factory=uuid7str) user_id: str = Field(max_length=255) device_id: str | None = Field(None, max_length=255) # Device ID for auth lookup browser_session_id: str = Field(max_length=255) browser_session_live_url: str = Field(max_length=MAX_URL_LENGTH) browser_session_cdp_url: str = Field(max_length=MAX_URL_LENGTH) browser_session_stopped: bool = False browser_session_stopped_at: datetime | None = None is_source_api: bool | None = None browser_state: dict = Field(default_factory=dict) browser_session_data: dict | None = None @classmethod def from_agent(cls, agent) -> 'CreateAgentSessionEvent': """Create a CreateAgentSessionEvent from an Agent instance""" return cls( id=str(agent.session_id), user_id='', # To be filled by cloud handler device_id=agent.cloud_sync.auth_client.device_id if hasattr(agent, 'cloud_sync') and agent.cloud_sync and agent.cloud_sync.auth_client else None, browser_session_id=agent.browser_session.id, browser_session_live_url='', # To be filled by cloud handler browser_session_cdp_url='', # To be filled by cloud handler browser_state={ 'viewport': agent.browser_profile.viewport if agent.browser_profile else {'width': 1280, 'height': 720}, 'user_agent': agent.browser_profile.user_agent if agent.browser_profile else None, 'headless': agent.browser_profile.headless if agent.browser_profile else True, 'initial_url': None, # Will be updated during execution 'final_url': None, # Will be updated during execution 'total_pages_visited': 0, # Will be updated during execution 'session_duration_seconds': 0, # Will be updated during execution }, browser_session_data={ 'cookies': [], 'secrets': {}, # TODO: send secrets safely so tasks can be replayed on cloud seamlessly # 'secrets': dict(agent.sensitive_data) if agent.sensitive_data else {}, 'allowed_domains': agent.browser_profile.allowed_domains if agent.browser_profile else [], }, ) class UpdateAgentSessionEvent(BaseEvent): """Event to update an existing agent session""" # Model fields id: str # Session ID to update user_id: str = Field(max_length=255) device_id: str | None = Field(None, max_length=255) browser_session_stopped: bool | None = None browser_session_stopped_at: datetime | None = None end_reason: str | None = Field(None, max_length=100) # Why the session ended