Spaces:
Runtime error
Runtime error
File size: 6,127 Bytes
66227af | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | """Work unit utility functions for generating and parsing work unit keys."""
from typing import Any
from pydantic import BaseModel
class ParsedWorkUnit(BaseModel):
"""Parsed work unit components."""
task_type: str
workspace_name: str | None
session_name: str | None
observer: str | None
observed: str | None
dream_type: str | None = None
def construct_work_unit_key(
workspace_name: str, payload: dict[str, Any] | ParsedWorkUnit
) -> str:
"""
Generate a work unit key for a given task type, workspace name, and event type.
Args:
workspace_name: The name of the workspace the work unit belongs to
payload: Dictionary containing work unit information
Returns:
Formatted work unit key string
Raises:
ValueError: If required fields are missing or task type is invalid
"""
if isinstance(payload, ParsedWorkUnit):
payload = payload.model_dump()
task_type: str | None = payload.get("task_type")
if not workspace_name or not task_type:
raise ValueError(
"workspace_name and task_type are required to generate a work_unit_key"
)
if task_type in ["representation", "summary", "dream"]:
observer = payload.get("observer", "None")
observed = payload.get("observed", "None")
session_name = payload.get("session_name", "None")
if task_type == "dream":
dream_type = payload.get("dream_type")
if not dream_type:
raise ValueError("dream_type is required for dream tasks")
return f"{task_type}:{dream_type}:{workspace_name}:{observer}:{observed}"
if task_type == "representation":
# Representation tasks don't include observer in the key since
# we process once and save to multiple collections
return f"{task_type}:{workspace_name}:{session_name}:{observed}"
return f"{task_type}:{workspace_name}:{session_name}:{observer}:{observed}"
if task_type == "webhook":
return f"webhook:{workspace_name}"
if task_type == "deletion":
deletion_type = payload.get("deletion_type")
resource_id = payload.get("resource_id")
if not deletion_type or not resource_id:
raise ValueError(
"deletion_type and resource_id are required for deletion tasks"
)
return f"deletion:{workspace_name}:{deletion_type}:{resource_id}"
if task_type == "reconciler":
reconciler_type = payload.get("reconciler_type")
if not reconciler_type:
raise ValueError("reconciler_type is required for reconciler tasks")
return f"reconciler:{reconciler_type}"
raise ValueError(f"Invalid task type: {task_type}")
def parse_work_unit_key(work_unit_key: str) -> ParsedWorkUnit:
"""
Parse a work unit key to extract its components.
Args:
work_unit_key: The work unit key string to parse
Returns:
ParsedWorkUnit with extracted components
Raises:
ValueError: If the work unit key format is invalid
"""
parts = work_unit_key.split(":")
task_type = parts[0]
if task_type == "representation":
if len(parts) == 4:
# New format: representation:{workspace}:{session}:{observed}
return ParsedWorkUnit(
task_type=task_type,
workspace_name=parts[1],
session_name=parts[2],
observer=None,
observed=parts[3],
)
elif len(parts) == 5:
# Legacy format: representation:{workspace}:{session}:{observer}:{observed}
return ParsedWorkUnit(
task_type=task_type,
workspace_name=parts[1],
session_name=parts[2],
observer=parts[3],
observed=parts[4],
)
else:
raise ValueError(
f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}"
)
if task_type == "summary":
if len(parts) != 5:
raise ValueError(
f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}"
)
return ParsedWorkUnit(
task_type=task_type,
workspace_name=parts[1],
session_name=parts[2],
observer=parts[3],
observed=parts[4],
)
if task_type == "dream":
if len(parts) != 5:
raise ValueError(
f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}"
)
return ParsedWorkUnit(
task_type=task_type,
workspace_name=parts[2],
session_name=None,
observer=parts[3],
observed=parts[4],
dream_type=parts[1],
)
if task_type == "webhook":
if len(parts) != 2:
raise ValueError(
f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}"
)
return ParsedWorkUnit(
task_type=task_type,
workspace_name=parts[1],
session_name=None,
observer=None,
observed=None,
)
if task_type == "deletion":
if len(parts) != 4:
raise ValueError(
f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}"
)
return ParsedWorkUnit(
task_type=task_type,
workspace_name=parts[1],
session_name=None,
observer=None,
observed=None,
)
if task_type == "reconciler":
if len(parts) != 2:
raise ValueError(
f"Invalid work_unit_key format for task_type {task_type}: {work_unit_key}"
)
return ParsedWorkUnit(
task_type=task_type,
workspace_name=None,
session_name=None,
observer=None,
observed=None,
)
raise ValueError(f"Invalid task type in work_unit_key: {task_type}")
|