Spaces:
Running
Running
Pulastya B
Added Query Awareness , Success/Failure Marking further improving pipeline accuracy
7502356 | """ | |
| Findings Accumulator - Core state for the Reasoning Loop. | |
| Tracks everything discovered during investigation: | |
| - Individual findings (action + result + interpretation) | |
| - Hypotheses being tested | |
| - Decision ledger (why each action was taken) | |
| - Confidence tracking | |
| This replaces the need for separate "step tracker" and "decision ledger" - | |
| they're natural byproducts of the accumulated findings. | |
| Architecture: | |
| ReasoningLoop iteration 1: Reason β Act β Evaluate β Finding #1 | |
| ReasoningLoop iteration 2: Reason β Act β Evaluate β Finding #2 | |
| ... | |
| Synthesizer reads all findings β produces final answer | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| import json | |
| class Finding: | |
| """A single finding from one reasoning loop iteration.""" | |
| iteration: int | |
| hypothesis: str # What we were testing | |
| action: str # Tool name executed | |
| arguments: Dict[str, Any] # Tool arguments used | |
| result_summary: str # Compressed result (what tool returned) | |
| interpretation: str # What we learned from this result | |
| confidence: float # 0.0-1.0 confidence in this finding | |
| answered_question: bool # Did this iteration answer the user's question? | |
| next_questions: List[str] # Follow-up questions generated | |
| success: bool = True # Whether the tool execution succeeded | |
| error_message: str = "" # Error message if tool failed | |
| timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "iteration": self.iteration, | |
| "hypothesis": self.hypothesis, | |
| "action": self.action, | |
| "arguments": self.arguments, | |
| "result_summary": self.result_summary, | |
| "interpretation": self.interpretation, | |
| "confidence": self.confidence, | |
| "answered": self.answered_question, | |
| "next_questions": self.next_questions, | |
| "success": self.success, | |
| "error_message": self.error_message, | |
| "timestamp": self.timestamp | |
| } | |
| class Hypothesis: | |
| """A hypothesis being tested during exploration.""" | |
| text: str | |
| status: str = "untested" # untested, testing, supported, refuted, inconclusive | |
| evidence_for: List[str] = field(default_factory=list) | |
| evidence_against: List[str] = field(default_factory=list) | |
| priority: float = 0.5 # 0.0-1.0, higher = investigate first | |
| source_iteration: int = 0 # Which iteration generated this hypothesis | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "text": self.text, | |
| "status": self.status, | |
| "evidence_for": self.evidence_for, | |
| "evidence_against": self.evidence_against, | |
| "priority": self.priority, | |
| "source_iteration": self.source_iteration | |
| } | |
| class FindingsAccumulator: | |
| """ | |
| Accumulates findings across the reasoning loop. | |
| This is the central state object that the Reasoner reads from and | |
| the Evaluator writes to. It serves as: | |
| - Step tracker (each finding records what was done) | |
| - Decision ledger (each finding records WHY it was done) | |
| - Evidence accumulator (interpretations build the answer) | |
| - Hypothesis manager (for exploratory analysis) | |
| Usage: | |
| findings = FindingsAccumulator(question="Why are customers churning?") | |
| # After each iteration: | |
| findings.add_finding(Finding( | |
| iteration=1, | |
| hypothesis="High churn correlates with low engagement", | |
| action="analyze_correlations", | |
| arguments={"file_path": "data.csv", "target_col": "churn"}, | |
| result_summary="Found 0.72 correlation between login_frequency and churn", | |
| interpretation="Strong evidence: infrequent logins predict churn", | |
| confidence=0.8, | |
| answered_question=False, | |
| next_questions=["Is there a threshold for login frequency?"] | |
| )) | |
| # For the Reasoner prompt: | |
| context = findings.get_context_for_reasoning() | |
| # For the Synthesizer: | |
| all_findings = findings.get_all_findings() | |
| """ | |
| def __init__(self, question: str, mode: str = "investigative"): | |
| """ | |
| Initialize findings accumulator. | |
| Args: | |
| question: The user's original question | |
| mode: "investigative" or "exploratory" | |
| """ | |
| self.question = question | |
| self.mode = mode | |
| self.findings: List[Finding] = [] | |
| self.hypotheses: List[Hypothesis] = [] | |
| self.tools_used: List[str] = [] | |
| self.files_produced: List[str] = [] | |
| self.failed_tools: Dict[str, str] = {} # tool_name β error message | |
| self.is_answered = False | |
| self.answer_confidence = 0.0 | |
| self.started_at = datetime.now().isoformat() | |
| def iteration_count(self) -> int: | |
| """Number of completed iterations.""" | |
| return len(self.findings) | |
| def add_finding(self, finding: Finding): | |
| """Add a finding from a completed iteration.""" | |
| self.findings.append(finding) | |
| if finding.action not in self.tools_used: | |
| self.tools_used.append(finding.action) | |
| # Track answer progress | |
| if finding.answered_question: | |
| self.is_answered = True | |
| self.answer_confidence = max(self.answer_confidence, finding.confidence) | |
| # Add new hypotheses from next_questions | |
| for q in finding.next_questions: | |
| if not any(h.text == q for h in self.hypotheses): | |
| self.hypotheses.append(Hypothesis( | |
| text=q, | |
| status="untested", | |
| priority=0.5, | |
| source_iteration=finding.iteration | |
| )) | |
| def add_failed_tool(self, tool_name: str, error_message: str): | |
| """Record a tool that failed so the Reasoner avoids retrying it.""" | |
| self.failed_tools[tool_name] = error_message | |
| def get_failed_tools_context(self) -> str: | |
| """Build context string listing tools that failed.""" | |
| if not self.failed_tools: | |
| return "" | |
| parts = ["\n**FAILED TOOLS (do NOT retry these)**:"] | |
| for tool, error in self.failed_tools.items(): | |
| parts.append(f" - `{tool}`: {error[:150]}") | |
| return "\n".join(parts) | |
| def get_successful_findings(self) -> List[Finding]: | |
| """Return only findings from successful tool executions.""" | |
| return [f for f in self.findings if f.success] | |
| def add_hypothesis(self, text: str, priority: float = 0.5, source_iteration: int = 0): | |
| """Add a hypothesis to test.""" | |
| if not any(h.text == text for h in self.hypotheses): | |
| self.hypotheses.append(Hypothesis( | |
| text=text, | |
| status="untested", | |
| priority=priority, | |
| source_iteration=source_iteration | |
| )) | |
| def update_hypothesis(self, text: str, status: str, evidence: str, is_supporting: bool = True): | |
| """Update a hypothesis with new evidence.""" | |
| for h in self.hypotheses: | |
| if h.text == text: | |
| h.status = status | |
| if is_supporting: | |
| h.evidence_for.append(evidence) | |
| else: | |
| h.evidence_against.append(evidence) | |
| return | |
| def get_untested_hypotheses(self) -> List[Hypothesis]: | |
| """Get hypotheses that haven't been tested yet, sorted by priority.""" | |
| untested = [h for h in self.hypotheses if h.status == "untested"] | |
| return sorted(untested, key=lambda h: h.priority, reverse=True) | |
| def get_last_output_file(self) -> Optional[str]: | |
| """Get the most recent output file from tool results.""" | |
| for finding in reversed(self.findings): | |
| # Check if result mentions an output file | |
| result = finding.result_summary | |
| if "output_file" in result or "output_path" in result: | |
| try: | |
| # Try to parse as JSON | |
| result_dict = json.loads(result) if isinstance(result, str) else result | |
| return result_dict.get("output_file") or result_dict.get("output_path") | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| # Check arguments for file paths | |
| for key in ["file_path", "input_path"]: | |
| if key in finding.arguments: | |
| return finding.arguments[key] | |
| return None | |
| def get_context_for_reasoning(self, max_findings: int = 5) -> str: | |
| """ | |
| Build context string for the Reasoner's prompt. | |
| Returns a concise summary of what's been discovered so far, | |
| formatted for LLM consumption. | |
| Args: | |
| max_findings: Maximum number of recent findings to include | |
| """ | |
| if not self.findings: | |
| return "No investigations completed yet. This is the first step." | |
| parts = [] | |
| # Summary of what's been done | |
| parts.append(f"**Investigations completed**: {len(self.findings)}") | |
| parts.append(f"**Tools used**: {', '.join(self.tools_used)}") | |
| # Failed tools warning (critical for avoiding retries) | |
| failed_ctx = self.get_failed_tools_context() | |
| if failed_ctx: | |
| parts.append(failed_ctx) | |
| # Recent findings (most relevant for next decision) | |
| recent = self.findings[-max_findings:] | |
| parts.append("\n**Recent findings**:") | |
| for f in recent: | |
| status_tag = "" if f.success else " [FAILED]" | |
| parts.append( | |
| f" Step {f.iteration}: Ran `{f.action}`{status_tag} to test: \"{f.hypothesis}\"\n" | |
| f" β Result: {f.interpretation}\n" | |
| f" β Confidence: {f.confidence:.0%}" | |
| ) | |
| # Unanswered questions | |
| untested = self.get_untested_hypotheses() | |
| if untested: | |
| parts.append(f"\n**Open questions** ({len(untested)} remaining):") | |
| for h in untested[:3]: | |
| parts.append(f" - {h.text} (priority: {h.priority:.1f})") | |
| # Overall progress | |
| if self.is_answered: | |
| parts.append(f"\n**Status**: Question partially answered (confidence: {self.answer_confidence:.0%})") | |
| else: | |
| parts.append(f"\n**Status**: Still investigating") | |
| return "\n".join(parts) | |
| def get_context_for_synthesis(self) -> str: | |
| """ | |
| Build context string for the Synthesizer. | |
| Returns the complete investigative history with all findings | |
| and hypothesis statuses. | |
| """ | |
| parts = [] | |
| parts.append(f"**Original question**: {self.question}") | |
| parts.append(f"**Mode**: {self.mode}") | |
| parts.append(f"**Total iterations**: {len(self.findings)}") | |
| parts.append(f"**Tools used**: {', '.join(self.tools_used)}") | |
| # All findings in order | |
| parts.append("\n## Investigation Steps\n") | |
| for f in self.findings: | |
| status_label = "\u2705 SUCCESS" if f.success else "\u274c FAILED" | |
| parts.append( | |
| f"### Step {f.iteration}: {f.action} [{status_label}]\n" | |
| f"**Hypothesis**: {f.hypothesis}\n" | |
| f"**Arguments**: {json.dumps(f.arguments, default=str)}\n" | |
| f"**Result**: {f.result_summary}\n" | |
| f"**Interpretation**: {f.interpretation}\n" | |
| f"**Confidence**: {f.confidence:.0%}\n" | |
| ) | |
| # Hypothesis outcomes (for exploratory mode) | |
| if self.hypotheses: | |
| parts.append("\n## Hypothesis Outcomes\n") | |
| for h in self.hypotheses: | |
| status_emoji = { | |
| "supported": "β ", | |
| "refuted": "β", | |
| "inconclusive": "β", | |
| "testing": "π", | |
| "untested": "β¬" | |
| }.get(h.status, "β¬") | |
| parts.append(f"{status_emoji} **{h.text}** β {h.status}") | |
| if h.evidence_for: | |
| parts.append(f" Evidence for: {'; '.join(h.evidence_for)}") | |
| if h.evidence_against: | |
| parts.append(f" Evidence against: {'; '.join(h.evidence_against)}") | |
| return "\n".join(parts) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Serialize for API response / session storage.""" | |
| return { | |
| "question": self.question, | |
| "mode": self.mode, | |
| "iteration_count": self.iteration_count, | |
| "is_answered": self.is_answered, | |
| "answer_confidence": self.answer_confidence, | |
| "tools_used": self.tools_used, | |
| "files_produced": self.files_produced, | |
| "findings": [f.to_dict() for f in self.findings], | |
| "hypotheses": [h.to_dict() for h in self.hypotheses], | |
| "started_at": self.started_at | |
| } | |