Pulastya B
Added Query Awareness , Success/Failure Marking further improving pipeline accuracy
7502356
"""
Findings Accumulator - Core state for the Reasoning Loop.
Tracks everything discovered during investigation:
- Individual findings (action + result + interpretation)
- Hypotheses being tested
- Decision ledger (why each action was taken)
- Confidence tracking
This replaces the need for separate "step tracker" and "decision ledger" -
they're natural byproducts of the accumulated findings.
Architecture:
ReasoningLoop iteration 1: Reason β†’ Act β†’ Evaluate β†’ Finding #1
ReasoningLoop iteration 2: Reason β†’ Act β†’ Evaluate β†’ Finding #2
...
Synthesizer reads all findings β†’ produces final answer
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime
import json
@dataclass
class Finding:
"""A single finding from one reasoning loop iteration."""
iteration: int
hypothesis: str # What we were testing
action: str # Tool name executed
arguments: Dict[str, Any] # Tool arguments used
result_summary: str # Compressed result (what tool returned)
interpretation: str # What we learned from this result
confidence: float # 0.0-1.0 confidence in this finding
answered_question: bool # Did this iteration answer the user's question?
next_questions: List[str] # Follow-up questions generated
success: bool = True # Whether the tool execution succeeded
error_message: str = "" # Error message if tool failed
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return {
"iteration": self.iteration,
"hypothesis": self.hypothesis,
"action": self.action,
"arguments": self.arguments,
"result_summary": self.result_summary,
"interpretation": self.interpretation,
"confidence": self.confidence,
"answered": self.answered_question,
"next_questions": self.next_questions,
"success": self.success,
"error_message": self.error_message,
"timestamp": self.timestamp
}
@dataclass
class Hypothesis:
"""A hypothesis being tested during exploration."""
text: str
status: str = "untested" # untested, testing, supported, refuted, inconclusive
evidence_for: List[str] = field(default_factory=list)
evidence_against: List[str] = field(default_factory=list)
priority: float = 0.5 # 0.0-1.0, higher = investigate first
source_iteration: int = 0 # Which iteration generated this hypothesis
def to_dict(self) -> Dict[str, Any]:
return {
"text": self.text,
"status": self.status,
"evidence_for": self.evidence_for,
"evidence_against": self.evidence_against,
"priority": self.priority,
"source_iteration": self.source_iteration
}
class FindingsAccumulator:
"""
Accumulates findings across the reasoning loop.
This is the central state object that the Reasoner reads from and
the Evaluator writes to. It serves as:
- Step tracker (each finding records what was done)
- Decision ledger (each finding records WHY it was done)
- Evidence accumulator (interpretations build the answer)
- Hypothesis manager (for exploratory analysis)
Usage:
findings = FindingsAccumulator(question="Why are customers churning?")
# After each iteration:
findings.add_finding(Finding(
iteration=1,
hypothesis="High churn correlates with low engagement",
action="analyze_correlations",
arguments={"file_path": "data.csv", "target_col": "churn"},
result_summary="Found 0.72 correlation between login_frequency and churn",
interpretation="Strong evidence: infrequent logins predict churn",
confidence=0.8,
answered_question=False,
next_questions=["Is there a threshold for login frequency?"]
))
# For the Reasoner prompt:
context = findings.get_context_for_reasoning()
# For the Synthesizer:
all_findings = findings.get_all_findings()
"""
def __init__(self, question: str, mode: str = "investigative"):
"""
Initialize findings accumulator.
Args:
question: The user's original question
mode: "investigative" or "exploratory"
"""
self.question = question
self.mode = mode
self.findings: List[Finding] = []
self.hypotheses: List[Hypothesis] = []
self.tools_used: List[str] = []
self.files_produced: List[str] = []
self.failed_tools: Dict[str, str] = {} # tool_name β†’ error message
self.is_answered = False
self.answer_confidence = 0.0
self.started_at = datetime.now().isoformat()
@property
def iteration_count(self) -> int:
"""Number of completed iterations."""
return len(self.findings)
def add_finding(self, finding: Finding):
"""Add a finding from a completed iteration."""
self.findings.append(finding)
if finding.action not in self.tools_used:
self.tools_used.append(finding.action)
# Track answer progress
if finding.answered_question:
self.is_answered = True
self.answer_confidence = max(self.answer_confidence, finding.confidence)
# Add new hypotheses from next_questions
for q in finding.next_questions:
if not any(h.text == q for h in self.hypotheses):
self.hypotheses.append(Hypothesis(
text=q,
status="untested",
priority=0.5,
source_iteration=finding.iteration
))
def add_failed_tool(self, tool_name: str, error_message: str):
"""Record a tool that failed so the Reasoner avoids retrying it."""
self.failed_tools[tool_name] = error_message
def get_failed_tools_context(self) -> str:
"""Build context string listing tools that failed."""
if not self.failed_tools:
return ""
parts = ["\n**FAILED TOOLS (do NOT retry these)**:"]
for tool, error in self.failed_tools.items():
parts.append(f" - `{tool}`: {error[:150]}")
return "\n".join(parts)
def get_successful_findings(self) -> List[Finding]:
"""Return only findings from successful tool executions."""
return [f for f in self.findings if f.success]
def add_hypothesis(self, text: str, priority: float = 0.5, source_iteration: int = 0):
"""Add a hypothesis to test."""
if not any(h.text == text for h in self.hypotheses):
self.hypotheses.append(Hypothesis(
text=text,
status="untested",
priority=priority,
source_iteration=source_iteration
))
def update_hypothesis(self, text: str, status: str, evidence: str, is_supporting: bool = True):
"""Update a hypothesis with new evidence."""
for h in self.hypotheses:
if h.text == text:
h.status = status
if is_supporting:
h.evidence_for.append(evidence)
else:
h.evidence_against.append(evidence)
return
def get_untested_hypotheses(self) -> List[Hypothesis]:
"""Get hypotheses that haven't been tested yet, sorted by priority."""
untested = [h for h in self.hypotheses if h.status == "untested"]
return sorted(untested, key=lambda h: h.priority, reverse=True)
def get_last_output_file(self) -> Optional[str]:
"""Get the most recent output file from tool results."""
for finding in reversed(self.findings):
# Check if result mentions an output file
result = finding.result_summary
if "output_file" in result or "output_path" in result:
try:
# Try to parse as JSON
result_dict = json.loads(result) if isinstance(result, str) else result
return result_dict.get("output_file") or result_dict.get("output_path")
except (json.JSONDecodeError, TypeError):
pass
# Check arguments for file paths
for key in ["file_path", "input_path"]:
if key in finding.arguments:
return finding.arguments[key]
return None
def get_context_for_reasoning(self, max_findings: int = 5) -> str:
"""
Build context string for the Reasoner's prompt.
Returns a concise summary of what's been discovered so far,
formatted for LLM consumption.
Args:
max_findings: Maximum number of recent findings to include
"""
if not self.findings:
return "No investigations completed yet. This is the first step."
parts = []
# Summary of what's been done
parts.append(f"**Investigations completed**: {len(self.findings)}")
parts.append(f"**Tools used**: {', '.join(self.tools_used)}")
# Failed tools warning (critical for avoiding retries)
failed_ctx = self.get_failed_tools_context()
if failed_ctx:
parts.append(failed_ctx)
# Recent findings (most relevant for next decision)
recent = self.findings[-max_findings:]
parts.append("\n**Recent findings**:")
for f in recent:
status_tag = "" if f.success else " [FAILED]"
parts.append(
f" Step {f.iteration}: Ran `{f.action}`{status_tag} to test: \"{f.hypothesis}\"\n"
f" β†’ Result: {f.interpretation}\n"
f" β†’ Confidence: {f.confidence:.0%}"
)
# Unanswered questions
untested = self.get_untested_hypotheses()
if untested:
parts.append(f"\n**Open questions** ({len(untested)} remaining):")
for h in untested[:3]:
parts.append(f" - {h.text} (priority: {h.priority:.1f})")
# Overall progress
if self.is_answered:
parts.append(f"\n**Status**: Question partially answered (confidence: {self.answer_confidence:.0%})")
else:
parts.append(f"\n**Status**: Still investigating")
return "\n".join(parts)
def get_context_for_synthesis(self) -> str:
"""
Build context string for the Synthesizer.
Returns the complete investigative history with all findings
and hypothesis statuses.
"""
parts = []
parts.append(f"**Original question**: {self.question}")
parts.append(f"**Mode**: {self.mode}")
parts.append(f"**Total iterations**: {len(self.findings)}")
parts.append(f"**Tools used**: {', '.join(self.tools_used)}")
# All findings in order
parts.append("\n## Investigation Steps\n")
for f in self.findings:
status_label = "\u2705 SUCCESS" if f.success else "\u274c FAILED"
parts.append(
f"### Step {f.iteration}: {f.action} [{status_label}]\n"
f"**Hypothesis**: {f.hypothesis}\n"
f"**Arguments**: {json.dumps(f.arguments, default=str)}\n"
f"**Result**: {f.result_summary}\n"
f"**Interpretation**: {f.interpretation}\n"
f"**Confidence**: {f.confidence:.0%}\n"
)
# Hypothesis outcomes (for exploratory mode)
if self.hypotheses:
parts.append("\n## Hypothesis Outcomes\n")
for h in self.hypotheses:
status_emoji = {
"supported": "βœ…",
"refuted": "❌",
"inconclusive": "❓",
"testing": "πŸ”„",
"untested": "⬜"
}.get(h.status, "⬜")
parts.append(f"{status_emoji} **{h.text}** β†’ {h.status}")
if h.evidence_for:
parts.append(f" Evidence for: {'; '.join(h.evidence_for)}")
if h.evidence_against:
parts.append(f" Evidence against: {'; '.join(h.evidence_against)}")
return "\n".join(parts)
def to_dict(self) -> Dict[str, Any]:
"""Serialize for API response / session storage."""
return {
"question": self.question,
"mode": self.mode,
"iteration_count": self.iteration_count,
"is_answered": self.is_answered,
"answer_confidence": self.answer_confidence,
"tools_used": self.tools_used,
"files_produced": self.files_produced,
"findings": [f.to_dict() for f in self.findings],
"hypotheses": [h.to_dict() for h in self.hypotheses],
"started_at": self.started_at
}