|
|
""" |
|
|
Thinking Module for MiniMind Max2 |
|
|
Interleaved and Sequential Thinking for complex reasoning and tool interactions. |
|
|
""" |
|
|
|
|
|
from dataclasses import dataclass, field |
|
|
from typing import List, Optional, Dict, Any, Callable, Tuple, Generator |
|
|
from enum import Enum |
|
|
import time |
|
|
import json |
|
|
import re |
|
|
|
|
|
|
|
|
class ThinkingMode(Enum): |
|
|
"""Modes of thinking.""" |
|
|
INTERLEAVED = "interleaved" |
|
|
SEQUENTIAL = "sequential" |
|
|
STREAMING = "streaming" |
|
|
HIDDEN = "hidden" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ThinkingStep: |
|
|
"""A single step in the thinking process.""" |
|
|
step_id: int |
|
|
content: str |
|
|
step_type: str = "reasoning" |
|
|
confidence: float = 1.0 |
|
|
duration_ms: int = 0 |
|
|
tool_call: Optional[Dict[str, Any]] = None |
|
|
tool_result: Optional[Any] = None |
|
|
is_final: bool = False |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ThinkingConfig: |
|
|
"""Configuration for thinking behavior.""" |
|
|
mode: ThinkingMode = ThinkingMode.INTERLEAVED |
|
|
max_thinking_steps: int = 10 |
|
|
min_confidence_threshold: float = 0.7 |
|
|
enable_self_reflection: bool = True |
|
|
enable_step_verification: bool = True |
|
|
show_thinking_to_user: bool = False |
|
|
thinking_budget_ms: int = 30000 |
|
|
|
|
|
|
|
|
think_start: str = "<Thinking>" |
|
|
think_end: str = "</Thinking>" |
|
|
step_marker: str = "<step>" |
|
|
reflect_marker: str = "<reflect>" |
|
|
conclude_marker: str = "<conclude>" |
|
|
|
|
|
|
|
|
class ThinkingContext: |
|
|
"""Maintains context across thinking steps.""" |
|
|
|
|
|
def __init__(self, config: ThinkingConfig): |
|
|
self.config = config |
|
|
self.steps: List[ThinkingStep] = [] |
|
|
self.tool_history: List[Dict[str, Any]] = [] |
|
|
self.start_time: float = 0 |
|
|
self.total_tokens: int = 0 |
|
|
self.current_confidence: float = 1.0 |
|
|
|
|
|
def start(self): |
|
|
"""Start thinking session.""" |
|
|
self.start_time = time.time() |
|
|
self.steps = [] |
|
|
self.tool_history = [] |
|
|
|
|
|
def elapsed_ms(self) -> int: |
|
|
"""Get elapsed time in milliseconds.""" |
|
|
return int((time.time() - self.start_time) * 1000) |
|
|
|
|
|
def can_continue(self) -> bool: |
|
|
"""Check if thinking can continue.""" |
|
|
if len(self.steps) >= self.config.max_thinking_steps: |
|
|
return False |
|
|
if self.elapsed_ms() > self.config.thinking_budget_ms: |
|
|
return False |
|
|
return True |
|
|
|
|
|
def add_step(self, step: ThinkingStep): |
|
|
"""Add a thinking step.""" |
|
|
step.duration_ms = self.elapsed_ms() |
|
|
self.steps.append(step) |
|
|
|
|
|
def add_tool_call(self, tool_name: str, arguments: Dict[str, Any], result: Any): |
|
|
"""Record a tool call.""" |
|
|
self.tool_history.append({ |
|
|
"tool": tool_name, |
|
|
"arguments": arguments, |
|
|
"result": result, |
|
|
"step": len(self.steps), |
|
|
}) |
|
|
|
|
|
def get_summary(self) -> str: |
|
|
"""Get summary of thinking process.""" |
|
|
summary = [] |
|
|
for i, step in enumerate(self.steps): |
|
|
summary.append(f"Step {i+1} ({step.step_type}): {step.content[:100]}...") |
|
|
return "\n".join(summary) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to dictionary.""" |
|
|
return { |
|
|
"steps": [ |
|
|
{ |
|
|
"id": s.step_id, |
|
|
"content": s.content, |
|
|
"type": s.step_type, |
|
|
"confidence": s.confidence, |
|
|
"duration_ms": s.duration_ms, |
|
|
"is_final": s.is_final, |
|
|
} |
|
|
for s in self.steps |
|
|
], |
|
|
"tool_history": self.tool_history, |
|
|
"total_time_ms": self.elapsed_ms(), |
|
|
"total_steps": len(self.steps), |
|
|
} |
|
|
|
|
|
|
|
|
class InterleavedThinking: |
|
|
""" |
|
|
Interleaved Thinking: Reason between each tool interaction. |
|
|
Enables the model to adapt strategy based on intermediate results. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model, |
|
|
tool_registry, |
|
|
config: Optional[ThinkingConfig] = None, |
|
|
): |
|
|
self.model = model |
|
|
self.tools = tool_registry |
|
|
self.config = config or ThinkingConfig(mode=ThinkingMode.INTERLEAVED) |
|
|
|
|
|
def think_and_act( |
|
|
self, |
|
|
query: str, |
|
|
context: Optional[ThinkingContext] = None, |
|
|
) -> Generator[ThinkingStep, None, str]: |
|
|
""" |
|
|
Think and act in interleaved fashion. |
|
|
|
|
|
Yields: |
|
|
ThinkingStep objects as thinking progresses |
|
|
|
|
|
Returns: |
|
|
Final answer string |
|
|
""" |
|
|
ctx = context or ThinkingContext(self.config) |
|
|
ctx.start() |
|
|
|
|
|
step_id = 0 |
|
|
current_state = query |
|
|
|
|
|
while ctx.can_continue(): |
|
|
|
|
|
thinking = self._generate_thought(current_state, ctx) |
|
|
step = ThinkingStep( |
|
|
step_id=step_id, |
|
|
content=thinking["thought"], |
|
|
step_type="reasoning", |
|
|
confidence=thinking.get("confidence", 0.9), |
|
|
) |
|
|
ctx.add_step(step) |
|
|
yield step |
|
|
step_id += 1 |
|
|
|
|
|
|
|
|
action = self._decide_action(thinking, ctx) |
|
|
|
|
|
if action["type"] == "answer": |
|
|
|
|
|
final_step = ThinkingStep( |
|
|
step_id=step_id, |
|
|
content=action["content"], |
|
|
step_type="conclusion", |
|
|
is_final=True, |
|
|
) |
|
|
ctx.add_step(final_step) |
|
|
yield final_step |
|
|
return action["content"] |
|
|
|
|
|
elif action["type"] == "tool_call": |
|
|
|
|
|
tool_name = action["tool"] |
|
|
tool_args = action["arguments"] |
|
|
|
|
|
try: |
|
|
result = self.tools.execute(tool_name, **tool_args) |
|
|
except Exception as e: |
|
|
result = f"Error: {str(e)}" |
|
|
|
|
|
ctx.add_tool_call(tool_name, tool_args, result) |
|
|
|
|
|
|
|
|
tool_step = ThinkingStep( |
|
|
step_id=step_id, |
|
|
content=f"Called {tool_name}", |
|
|
step_type="tool_use", |
|
|
tool_call={"name": tool_name, "args": tool_args}, |
|
|
tool_result=result, |
|
|
) |
|
|
ctx.add_step(tool_step) |
|
|
yield tool_step |
|
|
step_id += 1 |
|
|
|
|
|
|
|
|
current_state = f"{current_state}\n\nTool result: {result}" |
|
|
|
|
|
elif action["type"] == "reflect": |
|
|
|
|
|
reflect_step = ThinkingStep( |
|
|
step_id=step_id, |
|
|
content=action["content"], |
|
|
step_type="reflection", |
|
|
) |
|
|
ctx.add_step(reflect_step) |
|
|
yield reflect_step |
|
|
step_id += 1 |
|
|
|
|
|
|
|
|
final_answer = self._generate_final_answer(ctx) |
|
|
return final_answer |
|
|
|
|
|
def _generate_thought( |
|
|
self, |
|
|
state: str, |
|
|
context: ThinkingContext, |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate a thought about current state.""" |
|
|
|
|
|
|
|
|
return { |
|
|
"thought": f"Analyzing: {state[:100]}...", |
|
|
"confidence": 0.85, |
|
|
"next_action": "continue", |
|
|
} |
|
|
|
|
|
def _decide_action( |
|
|
self, |
|
|
thinking: Dict[str, Any], |
|
|
context: ThinkingContext, |
|
|
) -> Dict[str, Any]: |
|
|
"""Decide next action based on thinking.""" |
|
|
|
|
|
if thinking.get("confidence", 0) > 0.95: |
|
|
return {"type": "answer", "content": "Final answer based on analysis"} |
|
|
|
|
|
if len(context.tool_history) < 3: |
|
|
return { |
|
|
"type": "tool_call", |
|
|
"tool": "search", |
|
|
"arguments": {"query": "relevant information"}, |
|
|
} |
|
|
|
|
|
return {"type": "answer", "content": "Answer after tool use"} |
|
|
|
|
|
def _generate_final_answer(self, context: ThinkingContext) -> str: |
|
|
"""Generate final answer from context.""" |
|
|
return f"Based on {len(context.steps)} thinking steps and {len(context.tool_history)} tool calls: [Final Answer]" |
|
|
|
|
|
|
|
|
class SequentialThinking: |
|
|
""" |
|
|
Sequential Thinking: Plan all steps before execution. |
|
|
Best for well-defined tasks with predictable steps. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model, |
|
|
tool_registry, |
|
|
config: Optional[ThinkingConfig] = None, |
|
|
): |
|
|
self.model = model |
|
|
self.tools = tool_registry |
|
|
self.config = config or ThinkingConfig(mode=ThinkingMode.SEQUENTIAL) |
|
|
|
|
|
def plan_and_execute( |
|
|
self, |
|
|
query: str, |
|
|
) -> Tuple[List[Dict[str, Any]], str]: |
|
|
""" |
|
|
Plan all steps then execute sequentially. |
|
|
|
|
|
Returns: |
|
|
Tuple of (execution_log, final_answer) |
|
|
""" |
|
|
|
|
|
plan = self._generate_plan(query) |
|
|
|
|
|
|
|
|
execution_log = [] |
|
|
context = {} |
|
|
|
|
|
for step in plan: |
|
|
result = self._execute_step(step, context) |
|
|
execution_log.append({ |
|
|
"step": step, |
|
|
"result": result, |
|
|
}) |
|
|
context[f"step_{len(execution_log)}"] = result |
|
|
|
|
|
|
|
|
final_answer = self._synthesize_answer(query, execution_log) |
|
|
|
|
|
return execution_log, final_answer |
|
|
|
|
|
def _generate_plan(self, query: str) -> List[Dict[str, Any]]: |
|
|
"""Generate execution plan.""" |
|
|
|
|
|
return [ |
|
|
{"action": "analyze", "description": "Understand the query"}, |
|
|
{"action": "search", "description": "Gather information"}, |
|
|
{"action": "synthesize", "description": "Combine findings"}, |
|
|
{"action": "answer", "description": "Formulate response"}, |
|
|
] |
|
|
|
|
|
def _execute_step( |
|
|
self, |
|
|
step: Dict[str, Any], |
|
|
context: Dict[str, Any], |
|
|
) -> Any: |
|
|
"""Execute a single step.""" |
|
|
action = step.get("action", "") |
|
|
|
|
|
if action == "search" and self.tools: |
|
|
return self.tools.execute("search", query=step.get("query", "")) |
|
|
|
|
|
return f"Executed: {action}" |
|
|
|
|
|
def _synthesize_answer( |
|
|
self, |
|
|
query: str, |
|
|
execution_log: List[Dict[str, Any]], |
|
|
) -> str: |
|
|
"""Synthesize final answer from execution log.""" |
|
|
return f"Answer to '{query}' based on {len(execution_log)} execution steps" |
|
|
|
|
|
|
|
|
class ThinkingEngine: |
|
|
""" |
|
|
Unified thinking engine supporting multiple modes. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model, |
|
|
tool_registry=None, |
|
|
config: Optional[ThinkingConfig] = None, |
|
|
): |
|
|
self.model = model |
|
|
self.tools = tool_registry |
|
|
self.config = config or ThinkingConfig() |
|
|
|
|
|
self.interleaved = InterleavedThinking(model, tool_registry, config) |
|
|
self.sequential = SequentialThinking(model, tool_registry, config) |
|
|
|
|
|
def think( |
|
|
self, |
|
|
query: str, |
|
|
mode: Optional[ThinkingMode] = None, |
|
|
stream: bool = False, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Main thinking interface. |
|
|
|
|
|
Args: |
|
|
query: User query |
|
|
mode: Thinking mode (uses config default if None) |
|
|
stream: Whether to stream thinking steps |
|
|
|
|
|
Returns: |
|
|
Dictionary with answer and thinking trace |
|
|
""" |
|
|
mode = mode or self.config.mode |
|
|
|
|
|
if mode == ThinkingMode.INTERLEAVED: |
|
|
return self._run_interleaved(query, stream) |
|
|
elif mode == ThinkingMode.SEQUENTIAL: |
|
|
return self._run_sequential(query) |
|
|
elif mode == ThinkingMode.HIDDEN: |
|
|
return self._run_hidden(query) |
|
|
else: |
|
|
return self._run_interleaved(query, stream) |
|
|
|
|
|
def _run_interleaved(self, query: str, stream: bool) -> Dict[str, Any]: |
|
|
"""Run interleaved thinking.""" |
|
|
context = ThinkingContext(self.config) |
|
|
steps = [] |
|
|
final_answer = "" |
|
|
|
|
|
for step in self.interleaved.think_and_act(query, context): |
|
|
steps.append(step) |
|
|
if step.is_final: |
|
|
final_answer = step.content |
|
|
|
|
|
return { |
|
|
"answer": final_answer, |
|
|
"thinking": self._format_thinking(steps), |
|
|
"context": context.to_dict(), |
|
|
} |
|
|
|
|
|
def _run_sequential(self, query: str) -> Dict[str, Any]: |
|
|
"""Run sequential thinking.""" |
|
|
execution_log, answer = self.sequential.plan_and_execute(query) |
|
|
|
|
|
return { |
|
|
"answer": answer, |
|
|
"plan": execution_log, |
|
|
"thinking": self._format_plan_thinking(execution_log), |
|
|
} |
|
|
|
|
|
def _run_hidden(self, query: str) -> Dict[str, Any]: |
|
|
"""Run thinking but hide trace.""" |
|
|
result = self._run_interleaved(query, False) |
|
|
return { |
|
|
"answer": result["answer"], |
|
|
"thinking": None, |
|
|
} |
|
|
|
|
|
def _format_thinking(self, steps: List[ThinkingStep]) -> str: |
|
|
"""Format thinking steps for display.""" |
|
|
cfg = self.config |
|
|
lines = [cfg.think_start] |
|
|
|
|
|
for step in steps: |
|
|
if step.step_type == "reasoning": |
|
|
lines.append(f"{cfg.step_marker} {step.content}") |
|
|
elif step.step_type == "reflection": |
|
|
lines.append(f"{cfg.reflect_marker} {step.content}") |
|
|
elif step.step_type == "tool_use": |
|
|
lines.append(f"[Tool: {step.tool_call['name']}] → {step.tool_result}") |
|
|
elif step.step_type == "conclusion": |
|
|
lines.append(f"{cfg.conclude_marker} {step.content}") |
|
|
|
|
|
lines.append(cfg.think_end) |
|
|
return "\n".join(lines) |
|
|
|
|
|
def _format_plan_thinking(self, execution_log: List[Dict[str, Any]]) -> str: |
|
|
"""Format sequential plan execution.""" |
|
|
cfg = self.config |
|
|
lines = [cfg.think_start] |
|
|
|
|
|
for i, entry in enumerate(execution_log): |
|
|
step = entry["step"] |
|
|
result = entry["result"] |
|
|
lines.append(f"{cfg.step_marker} Step {i+1}: {step.get('description', '')}") |
|
|
lines.append(f" Result: {result}") |
|
|
|
|
|
lines.append(cfg.think_end) |
|
|
return "\n".join(lines) |
|
|
|
|
|
def evaluate_response( |
|
|
self, |
|
|
query: str, |
|
|
response: str, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Evaluate a response before presenting to user. |
|
|
Can reject or warn based on content. |
|
|
""" |
|
|
evaluation = { |
|
|
"approved": True, |
|
|
"confidence": 0.9, |
|
|
"warnings": [], |
|
|
"suggestions": [], |
|
|
} |
|
|
|
|
|
|
|
|
if len(response) < 10: |
|
|
evaluation["warnings"].append("Response is very short") |
|
|
evaluation["confidence"] -= 0.2 |
|
|
|
|
|
|
|
|
uncertainty_markers = ["I'm not sure", "I don't know", "maybe", "perhaps"] |
|
|
for marker in uncertainty_markers: |
|
|
if marker.lower() in response.lower(): |
|
|
evaluation["warnings"].append(f"Contains uncertainty: '{marker}'") |
|
|
evaluation["confidence"] -= 0.1 |
|
|
|
|
|
|
|
|
if evaluation["confidence"] < self.config.min_confidence_threshold: |
|
|
evaluation["approved"] = False |
|
|
evaluation["suggestions"].append("Consider gathering more information") |
|
|
|
|
|
return evaluation |
|
|
|
|
|
|
|
|
class MultilingualThinking: |
|
|
""" |
|
|
Multilingual response capability with native thinking. |
|
|
""" |
|
|
|
|
|
LANGUAGE_PROMPTS = { |
|
|
"en": "Think and respond in English.", |
|
|
"zh": "用中文思考和回答。", |
|
|
"es": "Piensa y responde en español.", |
|
|
"fr": "Réfléchis et réponds en français.", |
|
|
"de": "Denke und antworte auf Deutsch.", |
|
|
"ja": "日本語で考えて答えてください。", |
|
|
"ko": "한국어로 생각하고 답하세요.", |
|
|
"ar": "فكر وأجب بالعربية.", |
|
|
"ru": "Думай и отвечай по-русски.", |
|
|
"pt": "Pense e responda em português.", |
|
|
} |
|
|
|
|
|
def __init__(self, thinking_engine: ThinkingEngine): |
|
|
self.engine = thinking_engine |
|
|
|
|
|
def detect_language(self, text: str) -> str: |
|
|
"""Detect language of input text.""" |
|
|
|
|
|
if re.search(r'[\u4e00-\u9fff]', text): |
|
|
return "zh" |
|
|
if re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text): |
|
|
return "ja" |
|
|
if re.search(r'[\uac00-\ud7af]', text): |
|
|
return "ko" |
|
|
if re.search(r'[\u0600-\u06ff]', text): |
|
|
return "ar" |
|
|
if re.search(r'[\u0400-\u04ff]', text): |
|
|
return "ru" |
|
|
return "en" |
|
|
|
|
|
def think_multilingual( |
|
|
self, |
|
|
query: str, |
|
|
target_language: Optional[str] = None, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Think in target language natively. |
|
|
|
|
|
Args: |
|
|
query: User query |
|
|
target_language: Target language code (auto-detect if None) |
|
|
|
|
|
Returns: |
|
|
Response with thinking in target language |
|
|
""" |
|
|
lang = target_language or self.detect_language(query) |
|
|
lang_prompt = self.LANGUAGE_PROMPTS.get(lang, self.LANGUAGE_PROMPTS["en"]) |
|
|
|
|
|
|
|
|
augmented_query = f"{lang_prompt}\n\n{query}" |
|
|
|
|
|
|
|
|
result = self.engine.think(augmented_query) |
|
|
result["language"] = lang |
|
|
|
|
|
return result |
|
|
|