Flickinshots's picture
Deploy Project Epsilon Space bundle
38c9982 verified
from __future__ import annotations
import re
from src.executive_assistant.config import OpenRouterConfig
from src.executive_assistant.llm_service import OpenRouterLLMService
from src.executive_assistant.models import AssistantAction, PolicyDecision, WorkspaceObservation
from src.executive_assistant.runner import EpisodeRunner, EpisodeTrace, run_policy_suite
class ActionCatalog:
"""Finite action templates for smoke-testing and future policy indexing."""
@staticmethod
def enumerate_actions(observation: WorkspaceObservation) -> list[AssistantAction]:
actions: list[AssistantAction] = []
for email in observation.unread_emails:
actions.append(AssistantAction(action_type="read_email", target_id=email.id))
actions.append(AssistantAction(action_type="archive", target_id=email.id))
actions.append(
AssistantAction(
action_type="forward",
target_id=email.id,
secondary_payload="manager@company.com",
payload="Escalating this for review.",
)
)
if observation.current_email is not None:
actions.append(
AssistantAction(
action_type="reply",
target_id=observation.current_email.id,
payload="Hello, I will follow up shortly.\nRegards, Executive Assistant",
)
)
actions.extend(
[
AssistantAction(action_type="search_files", payload="Q3 Architecture"),
AssistantAction(action_type="search_files", payload="architecture metrics"),
]
)
return actions
class BaselineAgent:
"""Deterministic baseline policy for seeded scenarios and training-pipeline smoke tests."""
def __init__(self, model_name: str = "deterministic-baseline-v1") -> None:
self.model_name = model_name
def choose_action(self, task_name: str, observation: WorkspaceObservation) -> PolicyDecision:
if task_name == "easy_deadline_extraction":
return self._choose_easy_action(observation)
if task_name == "medium_triage_and_negotiation":
return self._choose_medium_action(observation)
if task_name == "hard_rag_reply":
return self._choose_hard_action(observation)
raise ValueError(f"Unsupported task: {task_name}")
def _choose_easy_action(self, observation: WorkspaceObservation) -> PolicyDecision:
if observation.current_email is None:
email = observation.unread_emails[0]
return PolicyDecision(
reasoning="Read the seeded deadline email before extracting any tasks.",
action=AssistantAction(action_type="read_email", target_id=email.id),
)
deadlines = self._extract_deadlines(observation.current_email.body)
existing = {todo.strip().lower() for todo in observation.active_todos}
for task_name, deadline_date in deadlines:
if task_name.lower() not in existing:
return PolicyDecision(
reasoning=f"Add the missing todo '{task_name}' with deadline {deadline_date}.",
action=AssistantAction(
action_type="add_todo",
payload=task_name,
secondary_payload=deadline_date,
),
)
return PolicyDecision(
reasoning="All deadlines are captured, so archive the source email.",
action=AssistantAction(action_type="archive", target_id=observation.current_email.id),
)
def _choose_medium_action(self, observation: WorkspaceObservation) -> PolicyDecision:
newsletters = {
"news@updates.example",
"promotions@vendor.example",
"events@community.example",
}
action_history = " ".join(observation.action_history).lower()
for email in observation.unread_emails:
if email.sender in newsletters:
return PolicyDecision(
reasoning=f"Archive non-actionable newsletter from {email.sender}.",
action=AssistantAction(action_type="archive", target_id=email.id),
)
client_email = next(
(email for email in observation.unread_emails if email.sender == "client@company.com"),
None,
)
if client_email is not None and "forward: forwarded to manager@company.com" not in action_history:
return PolicyDecision(
reasoning="Escalate the urgent client complaint to the manager.",
action=AssistantAction(
action_type="forward",
target_id=client_email.id,
secondary_payload="manager@company.com",
payload="Urgent client complaint. Please take over immediately.",
),
)
teammate_email = next(
(email for email in observation.unread_emails if email.sender == "teammate@company.com"),
None,
)
if teammate_email is not None and "reply: reply drafted" not in action_history:
return PolicyDecision(
reasoning="Reply to the reschedule request with a concrete proposed time.",
action=AssistantAction(
action_type="reply",
target_id=teammate_email.id,
payload="Hello, 3:30 PM IST works for me. Regards, Executive Assistant",
),
)
if observation.current_email is not None:
return PolicyDecision(
reasoning="Archive the currently open message to reduce inbox clutter.",
action=AssistantAction(action_type="archive", target_id=observation.current_email.id),
)
raise RuntimeError("No valid medium-task action available")
def _choose_hard_action(self, observation: WorkspaceObservation) -> PolicyDecision:
if observation.current_email is None:
email = observation.unread_emails[0]
return PolicyDecision(
reasoning="Read the stakeholder email to ground the response request.",
action=AssistantAction(action_type="read_email", target_id=email.id),
)
if not observation.search_results:
return PolicyDecision(
reasoning="Search the local report store for the Q3 architecture document.",
action=AssistantAction(action_type="search_files", payload="Q3 Architecture"),
)
metrics = self._extract_report_metrics(observation.search_results[0].snippet)
payload = (
"Hello,\n"
f"Here are the requested Q3 architecture metrics: availability {metrics['availability']}, "
f"mean API latency {metrics['latency']}, and infrastructure cost reduction {metrics['cost_reduction']}.\n"
"Regards,\nExecutive Assistant"
)
return PolicyDecision(
reasoning="Reply with the three requested metrics pulled from the report search results.",
action=AssistantAction(
action_type="reply",
target_id=observation.current_email.id,
payload=payload,
),
)
@staticmethod
def _extract_deadlines(email_body: str) -> list[tuple[str, str]]:
pattern = re.compile(r"([a-z ]+ due)\s+(\d{4}-\d{2}-\d{2})", re.IGNORECASE)
cleaned: list[tuple[str, str]] = []
for task, date in pattern.findall(email_body):
normalized_task = re.sub(r"^(and\s+)", "", task.strip(), flags=re.IGNORECASE)
cleaned.append((normalized_task.title(), date))
return cleaned
@staticmethod
def _extract_report_metrics(snippet: str) -> dict[str, str]:
metrics = {
"availability": re.search(r"(\d+\.\d+%)", snippet),
"latency": re.search(r"(\d+ms)", snippet),
"cost_reduction": re.search(r"(\d+%)", snippet.split("Infrastructure cost reduction:")[-1]),
}
return {
"availability": metrics["availability"].group(1) if metrics["availability"] else "unknown",
"latency": metrics["latency"].group(1) if metrics["latency"] else "unknown",
"cost_reduction": (
metrics["cost_reduction"].group(1) if metrics["cost_reduction"] else "unknown"
),
}
class OpenRouterPolicy:
def __init__(
self,
config: OpenRouterConfig | None = None,
service: OpenRouterLLMService | None = None,
) -> None:
self.config = config or OpenRouterConfig.from_env()
self.service = service or OpenRouterLLMService(self.config)
def choose_action(self, task_name: str, observation: WorkspaceObservation) -> PolicyDecision:
decision = self.service.generate_policy_decision(task_name, observation)
return self._sanitize_decision(task_name, observation, decision)
def _sanitize_decision(
self,
task_name: str,
observation: WorkspaceObservation,
decision: PolicyDecision,
) -> PolicyDecision:
action = decision.action
if action.action_type == "add_todo":
action = self._normalize_easy_todo_action(task_name, observation, action)
elif action.action_type == "search_files":
action = AssistantAction(
action_type=action.action_type,
target_id=None,
payload=action.payload,
secondary_payload=None,
)
elif action.action_type == "add_todo":
action = AssistantAction(
action_type=action.action_type,
target_id=None,
payload=action.payload,
secondary_payload=action.secondary_payload,
)
elif action.action_type in {"read_email", "archive"}:
action = AssistantAction(
action_type=action.action_type,
target_id=action.target_id,
payload=None,
secondary_payload=None,
)
elif action.action_type == "forward":
action = self._normalize_forward_action(task_name, observation, action)
if action.action_type == "reply" and action.payload:
payload = action.payload.strip()
target_id = action.target_id
if task_name == "hard_rag_reply":
if not payload.lower().startswith("hello"):
payload = f"Hello,\n{payload}"
if "regards" not in payload.lower():
payload = f"{payload}\nRegards,\nExecutive Assistant"
elif task_name == "medium_triage_and_negotiation":
if not re.search(r"\b\d{1,2}(:\d{2})?\s?(AM|PM|am|pm)\b", payload):
payload = "Hello, 3:30 PM IST works for me."
if "regards" not in payload.lower():
payload = f"{payload}\nRegards,\nExecutive Assistant"
target_id = self._resolve_teammate_email_id(observation, action.target_id)
action = AssistantAction(
action_type=action.action_type,
target_id=target_id,
payload=payload,
secondary_payload=action.secondary_payload,
)
return PolicyDecision(reasoning=decision.reasoning, action=action)
def _normalize_easy_todo_action(
self,
task_name: str,
observation: WorkspaceObservation,
action: AssistantAction,
) -> AssistantAction:
if task_name != "easy_deadline_extraction":
return AssistantAction(
action_type=action.action_type,
target_id=None,
payload=action.payload,
secondary_payload=action.secondary_payload,
)
canonical_todos = [
("proposal", "Proposal Due", "2026-04-10"),
("prototype", "Prototype Due", "2026-04-20"),
("final report", "Final Report Due", "2026-04-30"),
]
payload = (action.payload or "").strip()
payload_lower = payload.lower()
for marker, canonical_name, canonical_deadline in canonical_todos:
if marker in payload_lower:
return AssistantAction(
action_type="add_todo",
target_id=None,
payload=canonical_name,
secondary_payload=canonical_deadline,
)
existing = {todo.strip().lower() for todo in observation.active_todos}
for _, canonical_name, canonical_deadline in canonical_todos:
if canonical_name.lower() not in existing:
return AssistantAction(
action_type="add_todo",
target_id=None,
payload=canonical_name,
secondary_payload=canonical_deadline,
)
return AssistantAction(
action_type="add_todo",
target_id=None,
payload=payload,
secondary_payload=action.secondary_payload,
)
def _normalize_forward_action(
self,
task_name: str,
observation: WorkspaceObservation,
action: AssistantAction,
) -> AssistantAction:
target_id = action.target_id
recipient = action.secondary_payload
note = action.payload
if task_name == "medium_triage_and_negotiation":
if target_id is None and observation.current_email is not None:
target_id = observation.current_email.id
if recipient is None:
recipient = "manager@company.com"
if note is None or not note.strip():
note = "Urgent client complaint. Please take over immediately."
return AssistantAction(
action_type="forward",
target_id=target_id,
payload=note,
secondary_payload=recipient,
)
@staticmethod
def _resolve_teammate_email_id(
observation: WorkspaceObservation,
target_id: int | None,
) -> int | None:
if target_id is not None:
return target_id
if observation.current_email and observation.current_email.sender == "teammate@company.com":
return observation.current_email.id
teammate_email = next(
(email for email in observation.unread_emails if email.sender == "teammate@company.com"),
None,
)
return teammate_email.id if teammate_email is not None else None
OpenAIResponsesPolicy = OpenRouterPolicy
def run_episode(task_name: str, max_steps: int = 12) -> EpisodeTrace:
runner = EpisodeRunner(policy=BaselineAgent(), max_steps=max_steps)
return runner.run(task_name)
def smoke_test_training_pipeline() -> dict[str, EpisodeTrace]:
return run_policy_suite(
policy=BaselineAgent(),
task_names=[
"easy_deadline_extraction",
"medium_triage_and_negotiation",
"hard_rag_reply",
],
)