File size: 6,424 Bytes
02117ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
ReasoningAgent v7 — Complex reasoning, analysis, and multi-step problem solving
GOD AGENT OS — Using DeepSeek R1, Qwen QwQ, o1-mini style reasoning
"""

import asyncio
import json
from typing import Dict, Any, List, Optional

import structlog

from .base_agent import BaseAgent

log = structlog.get_logger()

REASONING_SYSTEM = """You are an elite autonomous reasoning and analysis agent.
You excel at:
- Multi-step reasoning with chain-of-thought
- Complex problem decomposition and analysis
- Mathematical and logical reasoning
- Strategic planning and decision making
- Root cause analysis

Always think step by step, show your reasoning, and provide confident conclusions.
"""


class ReasoningAgent(BaseAgent):
    """
    Specialized agent for complex reasoning, analysis, and problem-solving tasks.

    Capabilities:
    - Multi-step reasoning with chain-of-thought
    - Complex problem decomposition
    - Mathematical reasoning
    - Logical analysis
    - Strategic planning
    """

    def __init__(self, ws_manager=None, ai_router=None):
        super().__init__("ReasoningAgent", ws_manager, ai_router)
        self.reasoning_depth = 3
        self.max_reasoning_tokens = 16000

    async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
        """Execute reasoning task."""
        session_id = kwargs.get("session_id", "")
        task_id = kwargs.get("task_id", "")

        await self.emit(task_id, "agent_start", {
            "agent": "ReasoningAgent",
            "task": task[:80],
        }, session_id)

        try:
            # Step 1: Analyze the problem
            analysis = await self._analyze_problem(task, context, task_id, session_id)

            # Step 2: Decompose if complex
            if len(task.split()) > 30:
                sub_problems = await self._decompose_problem(task, analysis, task_id, session_id)
                solutions = []
                for sub in sub_problems[:3]:
                    sol = await self._solve_sub_problem(sub, context, task_id, session_id)
                    solutions.append(sol)
                result = await self._synthesize_answer(task, analysis, sub_problems, solutions, task_id, session_id)
            else:
                result = analysis

            await self.emit(task_id, "reasoning_complete", {
                "agent": "ReasoningAgent",
                "reasoning_depth": self.reasoning_depth,
            }, session_id)

            return result

        except Exception as e:
            log.error("ReasoningAgent failed", error=str(e))
            return "Reasoning agent encountered an error: " + str(e)

    async def _analyze_problem(self, problem: str, context: Dict, task_id: str, session_id: str) -> str:
        """Analyze the problem using reasoning model."""
        messages = [
            {"role": "system", "content": REASONING_SYSTEM},
            {"role": "user", "content": (
                "Analyze this problem step by step:\n\n"
                + problem + "\n\n"
                "Provide:\n"
                "1. Problem type and complexity\n"
                "2. Key constraints and requirements\n"
                "3. Step-by-step solution\n"
                "4. Final answer/recommendation"
            )},
        ]
        return await self.llm(
            messages,
            task_id=task_id,
            session_id=session_id,
            temperature=0.2,
            max_tokens=self.max_reasoning_tokens,
        )

    async def _decompose_problem(self, problem: str, analysis: str, task_id: str, session_id: str) -> List[str]:
        """Break down complex problem into sub-problems."""
        messages = [
            {"role": "system", "content": REASONING_SYSTEM},
            {"role": "user", "content": (
                "Break this complex problem into 3 specific sub-problems:\n\n"
                + problem + "\n\n"
                "Initial analysis: " + analysis[:500] + "\n\n"
                "List each sub-problem on a new line starting with '1.', '2.', '3.'"
            )},
        ]
        raw = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=2000)
        # Parse numbered sub-problems
        lines = raw.split("\n")
        sub_problems = []
        for line in lines:
            line = line.strip()
            if line and any(line.startswith(str(i) + ".") for i in range(1, 10)):
                sub_problems.append(line)
        return sub_problems if sub_problems else [problem]

    async def _solve_sub_problem(self, sub_problem: str, context: Dict, task_id: str, session_id: str) -> str:
        """Solve individual sub-problem."""
        messages = [
            {"role": "system", "content": REASONING_SYSTEM},
            {"role": "user", "content": "Solve this specific problem:\n\n" + sub_problem},
        ]
        return await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=4096)

    async def _synthesize_answer(
        self,
        original: str,
        analysis: str,
        sub_problems: List[str],
        solutions: List[str],
        task_id: str,
        session_id: str,
    ) -> str:
        """Synthesize final answer from all reasoning steps."""
        solutions_text = "\n".join(
            "Sub-problem " + str(i + 1) + ": " + sol[:300]
            for i, sol in enumerate(solutions)
        )
        messages = [
            {"role": "system", "content": REASONING_SYSTEM},
            {"role": "user", "content": (
                "Original problem: " + original + "\n\n"
                "Analysis: " + analysis[:800] + "\n\n"
                "Solutions to sub-problems:\n" + solutions_text + "\n\n"
                "Provide a comprehensive final answer that integrates all insights."
            )},
        ]
        return await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=8192)

    def get_status(self) -> Dict[str, Any]:
        """Get agent status."""
        return {
            "name": self.name,
            "status": "ready",
            "capabilities": [
                "Multi-step reasoning",
                "Problem decomposition",
                "Mathematical reasoning",
                "Logical analysis",
                "Strategic planning",
            ],
            "reasoning_depth": self.reasoning_depth,
        }


__all__ = ["ReasoningAgent"]