"""Work unit utility functions for generating and parsing work unit keys.""" from typing import Any from pydantic import BaseModel class ParsedWorkUnit(BaseModel): """Parsed work unit components.""" task_type: str workspace_name: str | None session_name: str | None observer: str | None observed: str | None dream_type: str | None = None def construct_work_unit_key( workspace_name: str, payload: dict[str, Any] | ParsedWorkUnit ) -> str: """ Generate a work unit key for a given task type, workspace name, and event type. Args: workspace_name: The name of the workspace the work unit belongs to payload: Dictionary containing work unit information Returns: Formatted work unit key string Raises: ValueError: If required fields are missing or task type is invalid """ if isinstance(payload, ParsedWorkUnit): payload = payload.model_dump() task_type: str | None = payload.get("task_type") if not workspace_name or not task_type: raise ValueError( "workspace_name and task_type are required to generate a work_unit_key" ) if task_type in ["representation", "summary", "dream"]: observer = payload.get("observer", "None") observed = payload.get("observed", "None") session_name = payload.get("session_name", "None") if task_type == "dream": dream_type = payload.get("dream_type") if not dream_type: raise ValueError("dream_type is required for dream tasks") return f"{task_type}:{dream_type}:{workspace_name}:{observer}:{observed}" if task_type == "representation": # Representation tasks don't include observer in the key since # we process once and save to multiple collections return f"{task_type}:{workspace_name}:{session_name}:{observed}" return f"{task_type}:{workspace_name}:{session_name}:{observer}:{observed}" if task_type == "webhook": return f"webhook:{workspace_name}" if task_type == "deletion": deletion_type = payload.get("deletion_type") resource_id = payload.get("resource_id") if not deletion_type or not resource_id: raise ValueError( "deletion_type and resource_id are required for deletion tasks" ) return f"deletion:{workspace_name}:{deletion_type}:{resource_id}" if task_type == "reconciler": reconciler_type = payload.get("reconciler_type") if not reconciler_type: raise ValueError("reconciler_type is required for reconciler tasks") return f"reconciler:{reconciler_type}" raise ValueError(f"Invalid task type: {task_type}") def parse_work_unit_key(work_unit_key: str) -> ParsedWorkUnit: """ Parse a work unit key to extract its components. Args: work_unit_key: The work unit key string to parse Returns: ParsedWorkUnit with extracted components Raises: ValueError: If the work unit key format is invalid """ parts = work_unit_key.split(":") task_type = parts[0] if task_type == "representation": if len(parts) == 4: # New format: representation:{workspace}:{session}:{observed} return ParsedWorkUnit( task_type=task_type, workspace_name=parts[1], session_name=parts[2], observer=None, observed=parts[3], ) elif len(parts) == 5: # Legacy format: representation:{workspace}:{session}:{observer}:{observed} return ParsedWorkUnit( task_type=task_type, workspace_name=parts[1], session_name=parts[2], observer=parts[3], observed=parts[4], ) else: raise ValueError( f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}" ) if task_type == "summary": if len(parts) != 5: raise ValueError( f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}" ) return ParsedWorkUnit( task_type=task_type, workspace_name=parts[1], session_name=parts[2], observer=parts[3], observed=parts[4], ) if task_type == "dream": if len(parts) != 5: raise ValueError( f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}" ) return ParsedWorkUnit( task_type=task_type, workspace_name=parts[2], session_name=None, observer=parts[3], observed=parts[4], dream_type=parts[1], ) if task_type == "webhook": if len(parts) != 2: raise ValueError( f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}" ) return ParsedWorkUnit( task_type=task_type, workspace_name=parts[1], session_name=None, observer=None, observed=None, ) if task_type == "deletion": if len(parts) != 4: raise ValueError( f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}" ) return ParsedWorkUnit( task_type=task_type, workspace_name=parts[1], session_name=None, observer=None, observed=None, ) if task_type == "reconciler": if len(parts) != 2: raise ValueError( f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}" ) return ParsedWorkUnit( task_type=task_type, workspace_name=None, session_name=None, observer=None, observed=None, ) raise ValueError(f"Invalid task type in work_unit_key: {task_type}")