import re import json from schemas.agent import Task, TaskResult class ReflectionEngine: def __init__(self, llm_call_fn): self._llm = llm_call_fn async def reflect(self, task: Task, result: TaskResult) -> dict: if result.error: if task.retry_count >= 3: return {"decision": "escalate", "reason": f"Max retries (3) exceeded: {result.error}"} return {"decision": "retry", "reason": result.error, "retry_count": task.retry_count + 1} quality = await self._check_quality(task, result) if quality.get("score", 1.0) < 0.5: if task.retry_count >= 3: return {"decision": "escalate", "reason": f"Low quality after 3 retries: {quality.get('reason', '')}"} return {"decision": "retry", "reason": quality.get("reason", "Quality too low"), "retry_count": task.retry_count + 1} if quality.get("success", True): return {"decision": "continue", "reason": "Task completed successfully"} return {"decision": "continue", "reason": quality.get("reason", "Proceeding with current result")} async def _check_quality(self, task: Task, result: TaskResult) -> dict: prompt = f"""You are a quality verifier. Given a task and its result, determine: 1. Did the task complete successfully? 2. Should we retry with a different approach? 3. What went wrong if it failed? Rate quality 0.0-1.0. Return ONLY a JSON object: {{"success": true/false, "score": 0.0-1.0, "should_retry": true/false, "reason": "explanation"}} Task: {task.description} Output: {result.output[:1000] if result.output else '(empty)'} Error: {result.error or 'none'} """ raw = await self._llm(prompt, model_hint="fast") return self._parse_json(raw, {"success": True, "score": 1.0, "should_retry": False, "reason": "ok"}) def _parse_json(self, text: str, default: dict) -> dict: if not text: return default json_str = re.sub(r'^```(?:json)?\\s*|\\s*```$', '', text.strip()) json_str = re.sub(r'^[^{]*|[^}]*$', '', json_str) try: return json.loads(json_str) except json.JSONDecodeError: return default