MiniMind / capabilities /thinking.py
fariasultana's picture
feat: Add capabilities/thinking.py
36c0c61 verified
"""
Thinking Module for MiniMind Max2
Interleaved and Sequential Thinking for complex reasoning and tool interactions.
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Callable, Tuple, Generator
from enum import Enum
import time
import json
import re
class ThinkingMode(Enum):
"""Modes of thinking."""
INTERLEAVED = "interleaved" # Think between each tool call
SEQUENTIAL = "sequential" # Think through all steps first
STREAMING = "streaming" # Stream thoughts in real-time
HIDDEN = "hidden" # Think internally, show only final answer
@dataclass
class ThinkingStep:
"""A single step in the thinking process."""
step_id: int
content: str
step_type: str = "reasoning" # reasoning, evaluation, planning, reflection
confidence: float = 1.0
duration_ms: int = 0
tool_call: Optional[Dict[str, Any]] = None
tool_result: Optional[Any] = None
is_final: bool = False
@dataclass
class ThinkingConfig:
"""Configuration for thinking behavior."""
mode: ThinkingMode = ThinkingMode.INTERLEAVED
max_thinking_steps: int = 10
min_confidence_threshold: float = 0.7
enable_self_reflection: bool = True
enable_step_verification: bool = True
show_thinking_to_user: bool = False
thinking_budget_ms: int = 30000 # Max thinking time
# Special tokens
think_start: str = "<Thinking>"
think_end: str = "</Thinking>"
step_marker: str = "<step>"
reflect_marker: str = "<reflect>"
conclude_marker: str = "<conclude>"
class ThinkingContext:
"""Maintains context across thinking steps."""
def __init__(self, config: ThinkingConfig):
self.config = config
self.steps: List[ThinkingStep] = []
self.tool_history: List[Dict[str, Any]] = []
self.start_time: float = 0
self.total_tokens: int = 0
self.current_confidence: float = 1.0
def start(self):
"""Start thinking session."""
self.start_time = time.time()
self.steps = []
self.tool_history = []
def elapsed_ms(self) -> int:
"""Get elapsed time in milliseconds."""
return int((time.time() - self.start_time) * 1000)
def can_continue(self) -> bool:
"""Check if thinking can continue."""
if len(self.steps) >= self.config.max_thinking_steps:
return False
if self.elapsed_ms() > self.config.thinking_budget_ms:
return False
return True
def add_step(self, step: ThinkingStep):
"""Add a thinking step."""
step.duration_ms = self.elapsed_ms()
self.steps.append(step)
def add_tool_call(self, tool_name: str, arguments: Dict[str, Any], result: Any):
"""Record a tool call."""
self.tool_history.append({
"tool": tool_name,
"arguments": arguments,
"result": result,
"step": len(self.steps),
})
def get_summary(self) -> str:
"""Get summary of thinking process."""
summary = []
for i, step in enumerate(self.steps):
summary.append(f"Step {i+1} ({step.step_type}): {step.content[:100]}...")
return "\n".join(summary)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"steps": [
{
"id": s.step_id,
"content": s.content,
"type": s.step_type,
"confidence": s.confidence,
"duration_ms": s.duration_ms,
"is_final": s.is_final,
}
for s in self.steps
],
"tool_history": self.tool_history,
"total_time_ms": self.elapsed_ms(),
"total_steps": len(self.steps),
}
class InterleavedThinking:
"""
Interleaved Thinking: Reason between each tool interaction.
Enables the model to adapt strategy based on intermediate results.
"""
def __init__(
self,
model,
tool_registry,
config: Optional[ThinkingConfig] = None,
):
self.model = model
self.tools = tool_registry
self.config = config or ThinkingConfig(mode=ThinkingMode.INTERLEAVED)
def think_and_act(
self,
query: str,
context: Optional[ThinkingContext] = None,
) -> Generator[ThinkingStep, None, str]:
"""
Think and act in interleaved fashion.
Yields:
ThinkingStep objects as thinking progresses
Returns:
Final answer string
"""
ctx = context or ThinkingContext(self.config)
ctx.start()
step_id = 0
current_state = query
while ctx.can_continue():
# Step 1: Think about current state
thinking = self._generate_thought(current_state, ctx)
step = ThinkingStep(
step_id=step_id,
content=thinking["thought"],
step_type="reasoning",
confidence=thinking.get("confidence", 0.9),
)
ctx.add_step(step)
yield step
step_id += 1
# Step 2: Decide on action
action = self._decide_action(thinking, ctx)
if action["type"] == "answer":
# Final answer
final_step = ThinkingStep(
step_id=step_id,
content=action["content"],
step_type="conclusion",
is_final=True,
)
ctx.add_step(final_step)
yield final_step
return action["content"]
elif action["type"] == "tool_call":
# Execute tool
tool_name = action["tool"]
tool_args = action["arguments"]
try:
result = self.tools.execute(tool_name, **tool_args)
except Exception as e:
result = f"Error: {str(e)}"
ctx.add_tool_call(tool_name, tool_args, result)
# Record tool step
tool_step = ThinkingStep(
step_id=step_id,
content=f"Called {tool_name}",
step_type="tool_use",
tool_call={"name": tool_name, "args": tool_args},
tool_result=result,
)
ctx.add_step(tool_step)
yield tool_step
step_id += 1
# Update state with result
current_state = f"{current_state}\n\nTool result: {result}"
elif action["type"] == "reflect":
# Self-reflection
reflect_step = ThinkingStep(
step_id=step_id,
content=action["content"],
step_type="reflection",
)
ctx.add_step(reflect_step)
yield reflect_step
step_id += 1
# Reached limit - provide best answer
final_answer = self._generate_final_answer(ctx)
return final_answer
def _generate_thought(
self,
state: str,
context: ThinkingContext,
) -> Dict[str, Any]:
"""Generate a thought about current state."""
# In practice, this would call the model
# For now, return structured thought
return {
"thought": f"Analyzing: {state[:100]}...",
"confidence": 0.85,
"next_action": "continue",
}
def _decide_action(
self,
thinking: Dict[str, Any],
context: ThinkingContext,
) -> Dict[str, Any]:
"""Decide next action based on thinking."""
# In practice, parse model output for action
if thinking.get("confidence", 0) > 0.95:
return {"type": "answer", "content": "Final answer based on analysis"}
if len(context.tool_history) < 3:
return {
"type": "tool_call",
"tool": "search",
"arguments": {"query": "relevant information"},
}
return {"type": "answer", "content": "Answer after tool use"}
def _generate_final_answer(self, context: ThinkingContext) -> str:
"""Generate final answer from context."""
return f"Based on {len(context.steps)} thinking steps and {len(context.tool_history)} tool calls: [Final Answer]"
class SequentialThinking:
"""
Sequential Thinking: Plan all steps before execution.
Best for well-defined tasks with predictable steps.
"""
def __init__(
self,
model,
tool_registry,
config: Optional[ThinkingConfig] = None,
):
self.model = model
self.tools = tool_registry
self.config = config or ThinkingConfig(mode=ThinkingMode.SEQUENTIAL)
def plan_and_execute(
self,
query: str,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Plan all steps then execute sequentially.
Returns:
Tuple of (execution_log, final_answer)
"""
# Phase 1: Generate complete plan
plan = self._generate_plan(query)
# Phase 2: Execute plan
execution_log = []
context = {}
for step in plan:
result = self._execute_step(step, context)
execution_log.append({
"step": step,
"result": result,
})
context[f"step_{len(execution_log)}"] = result
# Phase 3: Synthesize answer
final_answer = self._synthesize_answer(query, execution_log)
return execution_log, final_answer
def _generate_plan(self, query: str) -> List[Dict[str, Any]]:
"""Generate execution plan."""
# In practice, this would use the model to generate plan
return [
{"action": "analyze", "description": "Understand the query"},
{"action": "search", "description": "Gather information"},
{"action": "synthesize", "description": "Combine findings"},
{"action": "answer", "description": "Formulate response"},
]
def _execute_step(
self,
step: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""Execute a single step."""
action = step.get("action", "")
if action == "search" and self.tools:
return self.tools.execute("search", query=step.get("query", ""))
return f"Executed: {action}"
def _synthesize_answer(
self,
query: str,
execution_log: List[Dict[str, Any]],
) -> str:
"""Synthesize final answer from execution log."""
return f"Answer to '{query}' based on {len(execution_log)} execution steps"
class ThinkingEngine:
"""
Unified thinking engine supporting multiple modes.
"""
def __init__(
self,
model,
tool_registry=None,
config: Optional[ThinkingConfig] = None,
):
self.model = model
self.tools = tool_registry
self.config = config or ThinkingConfig()
self.interleaved = InterleavedThinking(model, tool_registry, config)
self.sequential = SequentialThinking(model, tool_registry, config)
def think(
self,
query: str,
mode: Optional[ThinkingMode] = None,
stream: bool = False,
) -> Dict[str, Any]:
"""
Main thinking interface.
Args:
query: User query
mode: Thinking mode (uses config default if None)
stream: Whether to stream thinking steps
Returns:
Dictionary with answer and thinking trace
"""
mode = mode or self.config.mode
if mode == ThinkingMode.INTERLEAVED:
return self._run_interleaved(query, stream)
elif mode == ThinkingMode.SEQUENTIAL:
return self._run_sequential(query)
elif mode == ThinkingMode.HIDDEN:
return self._run_hidden(query)
else:
return self._run_interleaved(query, stream)
def _run_interleaved(self, query: str, stream: bool) -> Dict[str, Any]:
"""Run interleaved thinking."""
context = ThinkingContext(self.config)
steps = []
final_answer = ""
for step in self.interleaved.think_and_act(query, context):
steps.append(step)
if step.is_final:
final_answer = step.content
return {
"answer": final_answer,
"thinking": self._format_thinking(steps),
"context": context.to_dict(),
}
def _run_sequential(self, query: str) -> Dict[str, Any]:
"""Run sequential thinking."""
execution_log, answer = self.sequential.plan_and_execute(query)
return {
"answer": answer,
"plan": execution_log,
"thinking": self._format_plan_thinking(execution_log),
}
def _run_hidden(self, query: str) -> Dict[str, Any]:
"""Run thinking but hide trace."""
result = self._run_interleaved(query, False)
return {
"answer": result["answer"],
"thinking": None, # Hidden
}
def _format_thinking(self, steps: List[ThinkingStep]) -> str:
"""Format thinking steps for display."""
cfg = self.config
lines = [cfg.think_start]
for step in steps:
if step.step_type == "reasoning":
lines.append(f"{cfg.step_marker} {step.content}")
elif step.step_type == "reflection":
lines.append(f"{cfg.reflect_marker} {step.content}")
elif step.step_type == "tool_use":
lines.append(f"[Tool: {step.tool_call['name']}] → {step.tool_result}")
elif step.step_type == "conclusion":
lines.append(f"{cfg.conclude_marker} {step.content}")
lines.append(cfg.think_end)
return "\n".join(lines)
def _format_plan_thinking(self, execution_log: List[Dict[str, Any]]) -> str:
"""Format sequential plan execution."""
cfg = self.config
lines = [cfg.think_start]
for i, entry in enumerate(execution_log):
step = entry["step"]
result = entry["result"]
lines.append(f"{cfg.step_marker} Step {i+1}: {step.get('description', '')}")
lines.append(f" Result: {result}")
lines.append(cfg.think_end)
return "\n".join(lines)
def evaluate_response(
self,
query: str,
response: str,
) -> Dict[str, Any]:
"""
Evaluate a response before presenting to user.
Can reject or warn based on content.
"""
evaluation = {
"approved": True,
"confidence": 0.9,
"warnings": [],
"suggestions": [],
}
# Check for potential issues
if len(response) < 10:
evaluation["warnings"].append("Response is very short")
evaluation["confidence"] -= 0.2
# Check for uncertainty markers
uncertainty_markers = ["I'm not sure", "I don't know", "maybe", "perhaps"]
for marker in uncertainty_markers:
if marker.lower() in response.lower():
evaluation["warnings"].append(f"Contains uncertainty: '{marker}'")
evaluation["confidence"] -= 0.1
# Minimum confidence check
if evaluation["confidence"] < self.config.min_confidence_threshold:
evaluation["approved"] = False
evaluation["suggestions"].append("Consider gathering more information")
return evaluation
class MultilingualThinking:
"""
Multilingual response capability with native thinking.
"""
LANGUAGE_PROMPTS = {
"en": "Think and respond in English.",
"zh": "用中文思考和回答。",
"es": "Piensa y responde en español.",
"fr": "Réfléchis et réponds en français.",
"de": "Denke und antworte auf Deutsch.",
"ja": "日本語で考えて答えてください。",
"ko": "한국어로 생각하고 답하세요.",
"ar": "فكر وأجب بالعربية.",
"ru": "Думай и отвечай по-русски.",
"pt": "Pense e responda em português.",
}
def __init__(self, thinking_engine: ThinkingEngine):
self.engine = thinking_engine
def detect_language(self, text: str) -> str:
"""Detect language of input text."""
# Simple heuristic detection
if re.search(r'[\u4e00-\u9fff]', text):
return "zh"
if re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text):
return "ja"
if re.search(r'[\uac00-\ud7af]', text):
return "ko"
if re.search(r'[\u0600-\u06ff]', text):
return "ar"
if re.search(r'[\u0400-\u04ff]', text):
return "ru"
return "en"
def think_multilingual(
self,
query: str,
target_language: Optional[str] = None,
) -> Dict[str, Any]:
"""
Think in target language natively.
Args:
query: User query
target_language: Target language code (auto-detect if None)
Returns:
Response with thinking in target language
"""
lang = target_language or self.detect_language(query)
lang_prompt = self.LANGUAGE_PROMPTS.get(lang, self.LANGUAGE_PROMPTS["en"])
# Augment query with language instruction
augmented_query = f"{lang_prompt}\n\n{query}"
# Run thinking
result = self.engine.think(augmented_query)
result["language"] = lang
return result