Spaces:
Sleeping
Sleeping
| """ | |
| Typed Pydantic models for the incident operations OpenEnv environment. | |
| """ | |
| from __future__ import annotations | |
| from typing import Literal, Optional | |
| from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator | |
| class ChunkSummary(BaseModel): | |
| model_config = ConfigDict( | |
| json_schema_extra={ | |
| "examples": [ | |
| { | |
| "chunk_id": "support_003", | |
| "domain": "Customer Support Operations", | |
| "tokens": 132, | |
| "keywords": ["refund policy", "incident timeline", "billing ledger"], | |
| } | |
| ] | |
| } | |
| ) | |
| chunk_id: str = Field(..., description="Unique artifact identifier exposed to the agent.") | |
| domain: str = Field(..., description="High-level source domain for the artifact.") | |
| tokens: int = Field(..., ge=1, description="Approximate token cost for including the artifact.") | |
| keywords: list[str] = Field(..., min_length=1, description="Important artifact hints available before inspection.") | |
| def validate_non_empty_text(cls, value: str) -> str: | |
| value = value.strip() | |
| if not value: | |
| raise ValueError("Value must not be empty.") | |
| return value | |
| def validate_keywords(cls, value: list[str]) -> list[str]: | |
| cleaned = [keyword.strip() for keyword in value if keyword.strip()] | |
| if not cleaned: | |
| raise ValueError("keywords must contain at least one non-empty entry.") | |
| return cleaned | |
| class RagObservation(BaseModel): | |
| model_config = ConfigDict( | |
| json_schema_extra={ | |
| "examples": [ | |
| { | |
| "case_id": "case-refund-triage-001", | |
| "case_summary": "A business customer requests a refund after a confirmed outage.", | |
| "objective": "Prepare a refund triage memo grounded in support evidence.", | |
| "workflow_stage": "triage", | |
| "customer_tier": "business", | |
| "incident_severity": "sev2", | |
| "available_artifacts": [ | |
| { | |
| "chunk_id": "support_003", | |
| "domain": "Customer Support Operations", | |
| "tokens": 132, | |
| "keywords": ["refund policy", "incident timeline", "billing ledger"], | |
| } | |
| ], | |
| "reviewed_artifacts": [], | |
| "prioritized_artifacts": [], | |
| "plan_draft": None, | |
| "report_requirements": ["State whether the case should proceed to refund review."], | |
| "total_tokens_used": 0, | |
| "token_budget": 850, | |
| "step_number": 0, | |
| "task_name": "refund_triage_easy", | |
| "last_action_feedback": None, | |
| "query": "Prepare an incident-linked refund triage memo.", | |
| "available_chunks": [], | |
| "selected_chunks": [], | |
| } | |
| ] | |
| } | |
| ) | |
| case_id: str = Field(..., description="Unique identifier for the active simulated incident case.") | |
| case_summary: str = Field(..., description="Short real-world case summary presented to the agent.") | |
| objective: str = Field(..., description="The operational deliverable the agent must produce.") | |
| workflow_stage: Literal["triage", "analysis", "resolution", "submitted"] = Field( | |
| ..., description="Current workflow stage in the incident operations process." | |
| ) | |
| customer_tier: Literal["standard", "business", "enterprise"] = Field( | |
| ..., description="Customer tier for the active case." | |
| ) | |
| incident_severity: Literal["sev3", "sev2", "sev1"] = Field( | |
| ..., description="Severity of the active incident." | |
| ) | |
| available_artifacts: list[ChunkSummary] = Field( | |
| ..., description="Artifacts that can be inspected, prioritized, or summarized." | |
| ) | |
| reviewed_artifacts: list[str] = Field( | |
| default_factory=list, | |
| description="Artifact ids the agent has inspected so far.", | |
| ) | |
| prioritized_artifacts: list[str] = Field( | |
| default_factory=list, | |
| description="Artifact ids currently included in the working resolution set.", | |
| ) | |
| plan_draft: Optional[str] = Field( | |
| default=None, | |
| description="Current draft of the resolution plan or operational recommendation.", | |
| ) | |
| report_requirements: list[str] = Field( | |
| default_factory=list, | |
| description="Deterministic requirements the final report must satisfy.", | |
| ) | |
| progress_signals: dict[str, float] = Field( | |
| default_factory=dict, | |
| description="Normalized progress metrics for artifact coverage, planning, and workflow readiness.", | |
| ) | |
| total_tokens_used: int = Field(..., ge=0, description="Current token cost of the prioritized working set.") | |
| token_budget: int = Field(..., ge=1, description="Maximum allowed token budget for the current task.") | |
| step_number: int = Field(..., ge=0, description="Current step number in the episode.") | |
| task_name: str = Field(..., description="Active task identifier.") | |
| last_action_feedback: Optional[str] = Field(default=None, description="Outcome of the previous action.") | |
| query: str = Field(..., description="Compatibility mirror of objective for legacy clients.") | |
| available_chunks: list[ChunkSummary] = Field( | |
| default_factory=list, | |
| description="Compatibility mirror of available_artifacts for legacy clients.", | |
| ) | |
| selected_chunks: list[str] = Field( | |
| default_factory=list, | |
| description="Compatibility mirror of prioritized_artifacts for legacy clients.", | |
| ) | |
| def validate_required_strings(cls, value: str) -> str: | |
| value = value.strip() | |
| if not value: | |
| raise ValueError("Value must not be empty.") | |
| return value | |
| def validate_ids(cls, value: list[str]) -> list[str]: | |
| cleaned = [artifact_id.strip() for artifact_id in value if artifact_id.strip()] | |
| if len(cleaned) != len(set(cleaned)): | |
| raise ValueError("Artifact id lists must not contain duplicates.") | |
| return cleaned | |
| def validate_report_requirements(cls, value: list[str]) -> list[str]: | |
| cleaned = [item.strip() for item in value if item.strip()] | |
| return cleaned | |
| def validate_feedback(cls, value: Optional[str]) -> Optional[str]: | |
| if value is None: | |
| return value | |
| value = value.strip() | |
| return value or None | |
| def validate_budget_and_aliases(self) -> "RagObservation": | |
| if self.total_tokens_used > self.token_budget: | |
| raise ValueError("total_tokens_used cannot exceed token_budget.") | |
| if self.query != self.objective: | |
| raise ValueError("query must mirror objective.") | |
| if self.selected_chunks != self.prioritized_artifacts: | |
| raise ValueError("selected_chunks must mirror prioritized_artifacts.") | |
| if len(self.available_chunks) != len(self.available_artifacts): | |
| raise ValueError("available_chunks must mirror available_artifacts.") | |
| return self | |
| class RagAction(BaseModel): | |
| model_config = ConfigDict( | |
| json_schema_extra={ | |
| "examples": [ | |
| {"action_type": "inspect_artifact", "artifact_id": "support_003"}, | |
| {"action_type": "summarize_artifact", "artifact_id": "support_003", "compression_ratio": 0.55}, | |
| {"action_type": "set_resolution_plan", "plan": "Verify outage evidence and route manual exceptions to finance review."}, | |
| {"action_type": "submit_report", "answer": "Proceed to refund review only after outage and billing evidence are confirmed. [support_001] [support_003]"}, | |
| ] | |
| } | |
| ) | |
| action_type: Literal[ | |
| "inspect_artifact", | |
| "prioritize_artifact", | |
| "summarize_artifact", | |
| "set_resolution_plan", | |
| "submit_report", | |
| "select_chunk", | |
| "deselect_chunk", | |
| "compress_chunk", | |
| "submit_answer", | |
| ] = Field(..., description="The environment action the agent wants to perform.") | |
| artifact_id: Optional[str] = Field(default=None, description="Target artifact id for artifact actions.") | |
| chunk_id: Optional[str] = Field(default=None, description="Legacy alias for artifact_id.") | |
| compression_ratio: Optional[float] = Field(default=None, ge=0.3, le=0.9) | |
| plan: Optional[str] = Field(default=None, description="Draft of the current operational resolution plan.") | |
| answer: Optional[str] = Field(default=None, description="Final report or resolution memo to submit.") | |
| def normalize_optional_strings(cls, value: Optional[str]) -> Optional[str]: | |
| if value is None: | |
| return value | |
| value = value.strip() | |
| return value or None | |
| def validate_action_semantics(self) -> "RagAction": | |
| normalized_artifact_id = self.artifact_id or self.chunk_id | |
| if self.action_type in {"inspect_artifact", "prioritize_artifact", "select_chunk", "deselect_chunk"}: | |
| if normalized_artifact_id is None: | |
| raise ValueError("artifact_id or chunk_id is required for artifact selection actions.") | |
| elif self.action_type in {"summarize_artifact", "compress_chunk"}: | |
| if normalized_artifact_id is None: | |
| raise ValueError("artifact_id or chunk_id is required for summarize actions.") | |
| if self.compression_ratio is None: | |
| raise ValueError("compression_ratio is required for summarize actions.") | |
| elif self.action_type == "set_resolution_plan": | |
| if self.plan is None: | |
| raise ValueError("plan is required for set_resolution_plan.") | |
| elif self.action_type in {"submit_report", "submit_answer"}: | |
| if self.answer is None: | |
| raise ValueError("answer is required for submit_report/submit_answer.") | |
| return self | |
| class RagReward(BaseModel): | |
| total: float = Field(..., ge=0.0, le=1.0) | |
| token_efficiency: float = Field(..., ge=0.0, le=1.0) | |
| answer_quality: float = Field(..., ge=0.0, le=1.0) | |
| retrieval_precision: float = Field(..., ge=0.0, le=1.0) | |
| penalty: float = Field(..., ge=0.0, le=1.0) | |
| def validate_total_bound(self) -> "RagReward": | |
| if self.total > 1.0 or self.total < 0.0: | |
| raise ValueError("total must remain within [0.0, 1.0].") | |
| return self | |