File size: 6,424 Bytes
02117ee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """
ReasoningAgent v7 — Complex reasoning, analysis, and multi-step problem solving
GOD AGENT OS — Using DeepSeek R1, Qwen QwQ, o1-mini style reasoning
"""
import asyncio
import json
from typing import Dict, Any, List, Optional
import structlog
from .base_agent import BaseAgent
log = structlog.get_logger()
REASONING_SYSTEM = """You are an elite autonomous reasoning and analysis agent.
You excel at:
- Multi-step reasoning with chain-of-thought
- Complex problem decomposition and analysis
- Mathematical and logical reasoning
- Strategic planning and decision making
- Root cause analysis
Always think step by step, show your reasoning, and provide confident conclusions.
"""
class ReasoningAgent(BaseAgent):
"""
Specialized agent for complex reasoning, analysis, and problem-solving tasks.
Capabilities:
- Multi-step reasoning with chain-of-thought
- Complex problem decomposition
- Mathematical reasoning
- Logical analysis
- Strategic planning
"""
def __init__(self, ws_manager=None, ai_router=None):
super().__init__("ReasoningAgent", ws_manager, ai_router)
self.reasoning_depth = 3
self.max_reasoning_tokens = 16000
async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
"""Execute reasoning task."""
session_id = kwargs.get("session_id", "")
task_id = kwargs.get("task_id", "")
await self.emit(task_id, "agent_start", {
"agent": "ReasoningAgent",
"task": task[:80],
}, session_id)
try:
# Step 1: Analyze the problem
analysis = await self._analyze_problem(task, context, task_id, session_id)
# Step 2: Decompose if complex
if len(task.split()) > 30:
sub_problems = await self._decompose_problem(task, analysis, task_id, session_id)
solutions = []
for sub in sub_problems[:3]:
sol = await self._solve_sub_problem(sub, context, task_id, session_id)
solutions.append(sol)
result = await self._synthesize_answer(task, analysis, sub_problems, solutions, task_id, session_id)
else:
result = analysis
await self.emit(task_id, "reasoning_complete", {
"agent": "ReasoningAgent",
"reasoning_depth": self.reasoning_depth,
}, session_id)
return result
except Exception as e:
log.error("ReasoningAgent failed", error=str(e))
return "Reasoning agent encountered an error: " + str(e)
async def _analyze_problem(self, problem: str, context: Dict, task_id: str, session_id: str) -> str:
"""Analyze the problem using reasoning model."""
messages = [
{"role": "system", "content": REASONING_SYSTEM},
{"role": "user", "content": (
"Analyze this problem step by step:\n\n"
+ problem + "\n\n"
"Provide:\n"
"1. Problem type and complexity\n"
"2. Key constraints and requirements\n"
"3. Step-by-step solution\n"
"4. Final answer/recommendation"
)},
]
return await self.llm(
messages,
task_id=task_id,
session_id=session_id,
temperature=0.2,
max_tokens=self.max_reasoning_tokens,
)
async def _decompose_problem(self, problem: str, analysis: str, task_id: str, session_id: str) -> List[str]:
"""Break down complex problem into sub-problems."""
messages = [
{"role": "system", "content": REASONING_SYSTEM},
{"role": "user", "content": (
"Break this complex problem into 3 specific sub-problems:\n\n"
+ problem + "\n\n"
"Initial analysis: " + analysis[:500] + "\n\n"
"List each sub-problem on a new line starting with '1.', '2.', '3.'"
)},
]
raw = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=2000)
# Parse numbered sub-problems
lines = raw.split("\n")
sub_problems = []
for line in lines:
line = line.strip()
if line and any(line.startswith(str(i) + ".") for i in range(1, 10)):
sub_problems.append(line)
return sub_problems if sub_problems else [problem]
async def _solve_sub_problem(self, sub_problem: str, context: Dict, task_id: str, session_id: str) -> str:
"""Solve individual sub-problem."""
messages = [
{"role": "system", "content": REASONING_SYSTEM},
{"role": "user", "content": "Solve this specific problem:\n\n" + sub_problem},
]
return await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=4096)
async def _synthesize_answer(
self,
original: str,
analysis: str,
sub_problems: List[str],
solutions: List[str],
task_id: str,
session_id: str,
) -> str:
"""Synthesize final answer from all reasoning steps."""
solutions_text = "\n".join(
"Sub-problem " + str(i + 1) + ": " + sol[:300]
for i, sol in enumerate(solutions)
)
messages = [
{"role": "system", "content": REASONING_SYSTEM},
{"role": "user", "content": (
"Original problem: " + original + "\n\n"
"Analysis: " + analysis[:800] + "\n\n"
"Solutions to sub-problems:\n" + solutions_text + "\n\n"
"Provide a comprehensive final answer that integrates all insights."
)},
]
return await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=8192)
def get_status(self) -> Dict[str, Any]:
"""Get agent status."""
return {
"name": self.name,
"status": "ready",
"capabilities": [
"Multi-step reasoning",
"Problem decomposition",
"Mathematical reasoning",
"Logical analysis",
"Strategic planning",
],
"reasoning_depth": self.reasoning_depth,
}
__all__ = ["ReasoningAgent"]
|