| """ |
| Typed Pydantic models for the incident operations OpenEnv environment. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Literal, Optional |
|
|
| from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator |
|
|
|
|
| class ChunkSummary(BaseModel): |
| model_config = ConfigDict( |
| json_schema_extra={ |
| "examples": [ |
| { |
| "chunk_id": "support_003", |
| "domain": "Customer Support Operations", |
| "tokens": 132, |
| "keywords": ["refund policy", "incident timeline", "billing ledger"], |
| } |
| ] |
| } |
| ) |
|
|
| chunk_id: str = Field(..., description="Unique artifact identifier exposed to the agent.") |
| domain: str = Field(..., description="High-level source domain for the artifact.") |
| tokens: int = Field(..., ge=1, description="Approximate token cost for including the artifact.") |
| keywords: list[str] = Field(..., min_length=1, description="Important artifact hints available before inspection.") |
|
|
| @field_validator("chunk_id", "domain") |
| @classmethod |
| def validate_non_empty_text(cls, value: str) -> str: |
| value = value.strip() |
| if not value: |
| raise ValueError("Value must not be empty.") |
| return value |
|
|
| @field_validator("keywords") |
| @classmethod |
| def validate_keywords(cls, value: list[str]) -> list[str]: |
| cleaned = [keyword.strip() for keyword in value if keyword.strip()] |
| if not cleaned: |
| raise ValueError("keywords must contain at least one non-empty entry.") |
| return cleaned |
|
|
|
|
| class RagObservation(BaseModel): |
| model_config = ConfigDict( |
| json_schema_extra={ |
| "examples": [ |
| { |
| "case_id": "case-refund-triage-001", |
| "case_summary": "A business customer requests a refund after a confirmed outage.", |
| "objective": "Prepare a refund triage memo grounded in support evidence.", |
| "workflow_stage": "triage", |
| "customer_tier": "business", |
| "incident_severity": "sev2", |
| "available_artifacts": [ |
| { |
| "chunk_id": "support_003", |
| "domain": "Customer Support Operations", |
| "tokens": 132, |
| "keywords": ["refund policy", "incident timeline", "billing ledger"], |
| } |
| ], |
| "reviewed_artifacts": [], |
| "prioritized_artifacts": [], |
| "plan_draft": None, |
| "report_requirements": ["State whether the case should proceed to refund review."], |
| "total_tokens_used": 0, |
| "token_budget": 850, |
| "step_number": 0, |
| "task_name": "refund_triage_easy", |
| "last_action_feedback": None, |
| "query": "Prepare an incident-linked refund triage memo.", |
| "available_chunks": [], |
| "selected_chunks": [], |
| } |
| ] |
| } |
| ) |
|
|
| case_id: str = Field(..., description="Unique identifier for the active simulated incident case.") |
| case_summary: str = Field(..., description="Short real-world case summary presented to the agent.") |
| objective: str = Field(..., description="The operational deliverable the agent must produce.") |
| workflow_stage: Literal["triage", "analysis", "resolution", "submitted"] = Field( |
| ..., description="Current workflow stage in the incident operations process." |
| ) |
| customer_tier: Literal["standard", "business", "enterprise"] = Field( |
| ..., description="Customer tier for the active case." |
| ) |
| incident_severity: Literal["sev3", "sev2", "sev1"] = Field( |
| ..., description="Severity of the active incident." |
| ) |
| available_artifacts: list[ChunkSummary] = Field( |
| ..., description="Artifacts that can be inspected, prioritized, or summarized." |
| ) |
| reviewed_artifacts: list[str] = Field( |
| default_factory=list, |
| description="Artifact ids the agent has inspected so far.", |
| ) |
| prioritized_artifacts: list[str] = Field( |
| default_factory=list, |
| description="Artifact ids currently included in the working resolution set.", |
| ) |
| plan_draft: Optional[str] = Field( |
| default=None, |
| description="Current draft of the resolution plan or operational recommendation.", |
| ) |
| report_requirements: list[str] = Field( |
| default_factory=list, |
| description="Deterministic requirements the final report must satisfy.", |
| ) |
| progress_signals: dict[str, float] = Field( |
| default_factory=dict, |
| description="Normalized progress metrics for artifact coverage, planning, and workflow readiness.", |
| ) |
| total_tokens_used: int = Field(..., ge=0, description="Current token cost of the prioritized working set.") |
| token_budget: int = Field(..., ge=1, description="Maximum allowed token budget for the current task.") |
| step_number: int = Field(..., ge=0, description="Current step number in the episode.") |
| task_name: str = Field(..., description="Active task identifier.") |
| last_action_feedback: Optional[str] = Field(default=None, description="Outcome of the previous action.") |
|
|
| query: str = Field(..., description="Compatibility mirror of objective for legacy clients.") |
| available_chunks: list[ChunkSummary] = Field( |
| default_factory=list, |
| description="Compatibility mirror of available_artifacts for legacy clients.", |
| ) |
| selected_chunks: list[str] = Field( |
| default_factory=list, |
| description="Compatibility mirror of prioritized_artifacts for legacy clients.", |
| ) |
|
|
| @field_validator("case_id", "case_summary", "objective", "task_name", "query") |
| @classmethod |
| def validate_required_strings(cls, value: str) -> str: |
| value = value.strip() |
| if not value: |
| raise ValueError("Value must not be empty.") |
| return value |
|
|
| @field_validator("reviewed_artifacts", "prioritized_artifacts", "selected_chunks") |
| @classmethod |
| def validate_ids(cls, value: list[str]) -> list[str]: |
| cleaned = [artifact_id.strip() for artifact_id in value if artifact_id.strip()] |
| if len(cleaned) != len(set(cleaned)): |
| raise ValueError("Artifact id lists must not contain duplicates.") |
| return cleaned |
|
|
| @field_validator("report_requirements") |
| @classmethod |
| def validate_report_requirements(cls, value: list[str]) -> list[str]: |
| cleaned = [item.strip() for item in value if item.strip()] |
| return cleaned |
|
|
| @field_validator("last_action_feedback") |
| @classmethod |
| def validate_feedback(cls, value: Optional[str]) -> Optional[str]: |
| if value is None: |
| return value |
| value = value.strip() |
| return value or None |
|
|
| @model_validator(mode="after") |
| def validate_budget_and_aliases(self) -> "RagObservation": |
| if self.total_tokens_used > self.token_budget: |
| raise ValueError("total_tokens_used cannot exceed token_budget.") |
| if self.query != self.objective: |
| raise ValueError("query must mirror objective.") |
| if self.selected_chunks != self.prioritized_artifacts: |
| raise ValueError("selected_chunks must mirror prioritized_artifacts.") |
| if len(self.available_chunks) != len(self.available_artifacts): |
| raise ValueError("available_chunks must mirror available_artifacts.") |
| return self |
|
|
|
|
| class RagAction(BaseModel): |
| model_config = ConfigDict( |
| json_schema_extra={ |
| "examples": [ |
| {"action_type": "inspect_artifact", "artifact_id": "support_003"}, |
| {"action_type": "summarize_artifact", "artifact_id": "support_003", "compression_ratio": 0.55}, |
| {"action_type": "set_resolution_plan", "plan": "Verify outage evidence and route manual exceptions to finance review."}, |
| {"action_type": "submit_report", "answer": "Proceed to refund review only after outage and billing evidence are confirmed. [support_001] [support_003]"}, |
| ] |
| } |
| ) |
|
|
| action_type: Literal[ |
| "inspect_artifact", |
| "prioritize_artifact", |
| "summarize_artifact", |
| "set_resolution_plan", |
| "submit_report", |
| "select_chunk", |
| "deselect_chunk", |
| "compress_chunk", |
| "submit_answer", |
| ] = Field(..., description="The environment action the agent wants to perform.") |
| artifact_id: Optional[str] = Field(default=None, description="Target artifact id for artifact actions.") |
| chunk_id: Optional[str] = Field(default=None, description="Legacy alias for artifact_id.") |
| compression_ratio: Optional[float] = Field(default=None, ge=0.3, le=0.9) |
| plan: Optional[str] = Field(default=None, description="Draft of the current operational resolution plan.") |
| answer: Optional[str] = Field(default=None, description="Final report or resolution memo to submit.") |
|
|
| @field_validator("artifact_id", "chunk_id", "plan", "answer") |
| @classmethod |
| def normalize_optional_strings(cls, value: Optional[str]) -> Optional[str]: |
| if value is None: |
| return value |
| value = value.strip() |
| return value or None |
|
|
| @model_validator(mode="after") |
| def validate_action_semantics(self) -> "RagAction": |
| normalized_artifact_id = self.artifact_id or self.chunk_id |
| if self.action_type in {"inspect_artifact", "prioritize_artifact", "select_chunk", "deselect_chunk"}: |
| if normalized_artifact_id is None: |
| raise ValueError("artifact_id or chunk_id is required for artifact selection actions.") |
| elif self.action_type in {"summarize_artifact", "compress_chunk"}: |
| if normalized_artifact_id is None: |
| raise ValueError("artifact_id or chunk_id is required for summarize actions.") |
| if self.compression_ratio is None: |
| raise ValueError("compression_ratio is required for summarize actions.") |
| elif self.action_type == "set_resolution_plan": |
| if self.plan is None: |
| raise ValueError("plan is required for set_resolution_plan.") |
| elif self.action_type in {"submit_report", "submit_answer"}: |
| if self.answer is None: |
| raise ValueError("answer is required for submit_report/submit_answer.") |
| return self |
|
|
|
|
| class RagReward(BaseModel): |
| total: float = Field(..., ge=0.0, le=1.0) |
| token_efficiency: float = Field(..., ge=0.0, le=1.0) |
| answer_quality: float = Field(..., ge=0.0, le=1.0) |
| retrieval_precision: float = Field(..., ge=0.0, le=1.0) |
| penalty: float = Field(..., ge=0.0, le=1.0) |
|
|
| @model_validator(mode="after") |
| def validate_total_bound(self) -> "RagReward": |
| if self.total > 1.0 or self.total < 0.0: |
| raise ValueError("total must remain within [0.0, 1.0].") |
| return self |
|
|