| """Claude-powered agents used in the deployment readiness workflow.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import os |
| from dataclasses import asdict |
| from typing import Dict, List, Optional |
|
|
| import anthropic |
|
|
| from enhanced_mcp_client import EnhancedMCPClient |
| from schemas import ( |
| ChecklistItem, |
| DocumentationBundle, |
| EvidencePacket, |
| ReadinessPlan, |
| ReadinessRequest, |
| ReviewFinding, |
| ReviewReport, |
| ) |
| from sponsor_llms import SponsorLLMClient |
|
|
| MODEL_ID = os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022") |
| DEFAULT_MAX_TOKENS = int(os.getenv("CLAUDE_MAX_TOKENS", "1500")) |
|
|
|
|
| class ClaudeAgent: |
| """Base helper that wraps Anthropic's Messages API with graceful fallbacks.""" |
|
|
| def __init__(self, name: str, system_prompt: str): |
| self.name = name |
| self.system_prompt = system_prompt |
| api_key = os.getenv("ANTHROPIC_API_KEY") |
| self.client: Optional[anthropic.Anthropic] = None |
| if api_key: |
| self.client = anthropic.Anthropic(api_key=api_key) |
|
|
| def _call_claude(self, user_prompt: str) -> str: |
| if not self.client: |
| return ( |
| f"[offline-mode] {self.name} would respond to: {user_prompt[:180]}..." |
| ) |
|
|
| response = self.client.messages.create( |
| model=MODEL_ID, |
| max_tokens=DEFAULT_MAX_TOKENS, |
| temperature=0.2, |
| system=self.system_prompt, |
| messages=[{"role": "user", "content": user_prompt}] |
| ) |
| return response.content[0].text.strip() |
|
|
|
|
| class PlannerAgent(ClaudeAgent): |
| def __init__(self) -> None: |
| super().__init__( |
| name="Planner", |
| system_prompt=( |
| "You are a release engineer. Return JSON with a summary and a list of" |
| " checklist items (title, description, category, owners, status)." |
| " Categories should cover tests, infra, observability, docs, risk mitigation." |
| ), |
| ) |
|
|
| def run(self, request: ReadinessRequest) -> ReadinessPlan: |
| prompt = ( |
| "Build a release readiness plan for the following data:\n" |
| f"Project: {request.project_name}\n" |
| f"Goal: {request.release_goal}\n" |
| f"Code summary: {request.code_summary}\n" |
| f"Infra notes: {request.infra_notes or 'n/a'}\n" |
| f"Stakeholders: {', '.join(request.stakeholders or ['eng'])}" |
| ) |
| raw = self._call_claude(prompt) |
| plan_dict = _safe_json(raw, fallback={}) |
| summary = plan_dict.get("summary", raw[:200]) |
| items_payload: List[Dict] = plan_dict.get("items", []) |
| items = [ |
| ChecklistItem( |
| title=item.get("title", "Untitled"), |
| description=item.get("description", ""), |
| category=item.get("category", "general"), |
| owners=item.get("owners", []), |
| status=item.get("status", "todo"), |
| ) |
| for item in items_payload |
| ] |
| return ReadinessPlan(summary=summary, items=items) |
|
|
|
|
| class EvidenceAgent(ClaudeAgent): |
| def __init__(self) -> None: |
| super().__init__( |
| name="Evidence", |
| system_prompt=( |
| "You operate like a DevOps SRE. When given a plan, produce three lists:" |
| " findings (signals that support shipping), gaps (missing data), and" |
| " signals (calls you would make to MCP tools or logs). Output JSON." |
| ), |
| ) |
| self.mcp_client = EnhancedMCPClient() |
|
|
| def run(self, plan: ReadinessPlan, project_name: str = "") -> EvidencePacket: |
| |
| mcp_signals = [] |
| try: |
| |
| try: |
| loop = asyncio.get_event_loop() |
| if loop.is_running(): |
| |
| import concurrent.futures |
| with concurrent.futures.ThreadPoolExecutor() as executor: |
| future = executor.submit( |
| asyncio.run, |
| self.mcp_client.gather_deployment_signals( |
| project_name or "project", [item.title for item in plan.items] |
| ) |
| ) |
| mcp_signals = future.result(timeout=5) |
| else: |
| mcp_signals = loop.run_until_complete( |
| self.mcp_client.gather_deployment_signals( |
| project_name or "project", [item.title for item in plan.items] |
| ) |
| ) |
| except RuntimeError: |
| |
| mcp_signals = asyncio.run( |
| self.mcp_client.gather_deployment_signals( |
| project_name or "project", [item.title for item in plan.items] |
| ) |
| ) |
| except Exception as e: |
| mcp_signals = [f"MCP signal gathering: {str(e)[:100]}"] |
|
|
| prompt = ( |
| "Given this deployment plan, synthesize evidence:" |
| f"\n{plan.summary}\nItems: {[_safe_truncate(asdict(item)) for item in plan.items]}" |
| f"\n\nMCP Tool Signals: {', '.join(mcp_signals)}" |
| ) |
| raw = self._call_claude(prompt) |
| payload = _safe_json(raw, fallback={}) |
| return EvidencePacket( |
| findings=payload.get("findings", [raw[:200]]), |
| gaps=payload.get("gaps", []), |
| signals=mcp_signals + payload.get("signals", []), |
| ) |
|
|
|
|
| class DocumentationAgent(ClaudeAgent): |
| def __init__(self) -> None: |
| super().__init__( |
| name="Documentation", |
| system_prompt=( |
| "You are a technical writer. Create JSON with changelog_entry," |
| " readme_snippet, and announcement_draft. Be concise but specific." |
| ), |
| ) |
|
|
| def run(self, request: ReadinessRequest, evidence: EvidencePacket) -> DocumentationBundle: |
| prompt = ( |
| "Author deployment communications. Project: {project}. Goal: {goal}." |
| " Use this evidence: {evidence}." |
| ).format( |
| project=request.project_name, |
| goal=request.release_goal, |
| evidence=evidence.findings, |
| ) |
| raw = self._call_claude(prompt) |
| payload = _safe_json(raw, fallback={}) |
| return DocumentationBundle( |
| changelog_entry=payload.get("changelog_entry", raw[:200]), |
| readme_snippet=payload.get("readme_snippet", ""), |
| announcement_draft=payload.get("announcement_draft", ""), |
| ) |
|
|
|
|
| class SynthesisAgent: |
| """Uses sponsor LLMs (Gemini/OpenAI) to cross-validate evidence.""" |
|
|
| def __init__(self) -> None: |
| self.sponsor_client = SponsorLLMClient() |
|
|
| def run( |
| self, |
| evidence: EvidencePacket, |
| plan_summary: str, |
| preferred_llms: Optional[List[str]] = None, |
| ) -> Dict[str, str]: |
| """Synthesize evidence using sponsor LLMs for bonus points.""" |
| all_evidence = evidence.findings + evidence.signals |
| synthesis = self.sponsor_client.cross_validate_evidence( |
| "\n".join(all_evidence[:5]), |
| plan_summary, |
| preferred_llms, |
| ) |
| return synthesis |
|
|
|
|
| class ReviewerAgent(ClaudeAgent): |
| def __init__(self) -> None: |
| super().__init__( |
| name="Reviewer", |
| system_prompt=( |
| "You chair a release board. Compare plans, evidence, and docs." |
| " Respond with JSON: decision (approve/block/needs_info), confidence" |
| " 0-1, findings (severity+note)." |
| ), |
| ) |
|
|
| def run( |
| self, |
| plan: ReadinessPlan, |
| evidence: EvidencePacket, |
| docs: DocumentationBundle, |
| sponsor_synthesis: Optional[Dict[str, str]] = None, |
| ) -> ReviewReport: |
| synthesis_context = "" |
| if sponsor_synthesis: |
| synthesis_context = f"\nSponsor LLM Synthesis: {sponsor_synthesis}" |
|
|
| prompt = ( |
| "Review release package. Plan: {plan}. Evidence: {evidence}. Docs: {docs}." |
| "{synthesis}" |
| ).format( |
| plan=plan.summary, |
| evidence=evidence.findings + evidence.gaps, |
| docs=docs.changelog_entry, |
| synthesis=synthesis_context, |
| ) |
| raw = self._call_claude(prompt) |
| payload = _safe_json(raw, fallback={}) |
| findings_payload = payload.get("findings", []) |
| findings = [ |
| ReviewFinding( |
| severity=item.get("severity", "medium"), |
| note=item.get("note", "") |
| ) |
| for item in findings_payload |
| ] |
| return ReviewReport( |
| decision=payload.get("decision", "needs_info"), |
| confidence=float(payload.get("confidence", 0.4)), |
| findings=findings, |
| ) |
|
|
|
|
| def _safe_json(text: str, fallback: Dict) -> Dict: |
| import json |
|
|
| try: |
| return json.loads(text) |
| except json.JSONDecodeError: |
| return fallback |
|
|
|
|
| def _safe_truncate(value: Dict, limit: int = 240) -> str: |
| text = str(value) |
| return text if len(text) <= limit else text[:limit] + "…" |
|
|