from copy import deepcopy
from agents import Runner, Agent, trace, gen_trace_id
from dotenv import load_dotenv
from prompts.agent_prompts import *
from utils import *
from tools.serper_tool import serper_search
from tools.playwright_tool import playwright_web_read
import os
import pdb
import json
load_dotenv(override=True)
default_model_name = os.environ.get('DEFAULT_MODEL_NAME')
class SectionResearchManager:
def __init__(self, section_name: str, enable_critic: bool = True) -> None:
self.section_name = section_name
self.enable_critic = enable_critic
self.complexity_agent = Agent(
name=f"Complexity Agent: {section_name}",
instructions=complexity_agent_system_prompt,
model=default_model_name
)
self.query_gen_agent = Agent(
name=f"Query Gen Agent: {section_name}",
instructions=query_gen_agent_system_prompt,
model=default_model_name
)
self.researcher_agent = Agent(
name=f"Researcher agent: {section_name}",
instructions=researcher_agent_system_prompt,
tools=[serper_search],
model=default_model_name
)
self.analyst_agent = Agent(
name=f"Analyst agent: {section_name}",
instructions=analyst_agent_system_prompt,
tools=[playwright_web_read],
model=default_model_name
)
self.critic_agent = Agent(
name=f"Critic agent: {section_name}",
instructions=critic_agent_system_prompt,
model=default_model_name
)
self.editor_agent = Agent(
name=f"Editor agent: {section_name}",
instructions=editor_agent_system_prompt,
model=default_model_name
)
async def run_section_manager(self, trace_id: str, section_details: Dict, trace_name: str, progress_callback=None) -> Dict:
section = section_details["section_descriptor"]["section"]
display_name = section_details["section_descriptor"].get("display_name", section.replace("_", " ").title())
description = section_details["section_descriptor"].get("description", "")
topic = section_details["topic_or_idea"]
# Assign a color to this section for visual distinction
section_colors = {
"landscape": "#3498db", "product_categories": "#9b59b6", "tech_stack": "#e74c3c",
"research_frontier": "#f39c12", "market_signals": "#1abc9c", "unmet_needs": "#e67e22",
"opportunity_theses": "#2ecc71", "problem_pain": "#3498db", "buyer_budget_owner": "#9b59b6",
"roi_story": "#e74c3c", "defensibility": "#f39c12", "comp_landscape": "#1abc9c",
"gtm_channels": "#e67e22", "risks": "#e74c3c"
}
section_color = section_colors.get(section, "#95a5a6")
with trace(f"{trace_name} trace", trace_id=trace_id):
base_payload = {
"framework": section_details["framework"],
"topic_or_idea": section_details["topic_or_idea"],
"section_descriptor": section_details["section_descriptor"],
"run_params": section_details.get("run_params", {})
}
# ---------- Step 1: Complexity Assessment ----------
if progress_callback:
await progress_callback(f'๐ง {display_name} for "{topic}": Analyzing research complexity...')
print(f"[{section}] Running Complexity Assessment")
complexity_raw = await Runner.run(self.complexity_agent, as_messages(base_payload))
try:
complexity_result = json.loads(complexity_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing complexity JSON for {section}: {e}")
complexity_result = {
"complexity": "moderate",
"reasoning": "fallback due to parsing error",
"recommended_query_count": 12,
"search_strategy_notes": "standard approach"
}
complexity_level = complexity_result.get("complexity", "moderate")
recommended_count = complexity_result.get("recommended_query_count", 12)
strategy_notes = complexity_result.get("search_strategy_notes", "")
print(f"[{section}] Complexity: {complexity_level}, Recommended queries: {recommended_count}")
if progress_callback:
await progress_callback(f'๐ {display_name} for "{topic}": Planning {recommended_count} search queries (complexity: {complexity_level})')
# ---------- Step 2: Query Generation ----------
query_payload = {
**base_payload,
"complexity_level": complexity_level,
"recommended_query_count": recommended_count,
"search_strategy_notes": strategy_notes
}
if progress_callback:
await progress_callback(f"๐ Generating search queries for **{section}**...")
print(f"[{section}] Running Query Generation")
query_gen_raw = await Runner.run(self.query_gen_agent, as_messages(query_payload))
try:
query_gen_result = json.loads(query_gen_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing query_gen JSON for {section}: {e}")
query_gen_result = {"queries": []}
actual_queries = len(query_gen_result.get("queries", []))
print(f"[{section}] Generated {actual_queries} queries (target: {recommended_count})")
if progress_callback:
await progress_callback(f'๐ {display_name} for "{topic}": Searching the web with {actual_queries} targeted queries...')
# Update run_params with dynamic query count for researcher
dynamic_run_params = base_payload["run_params"].copy()
dynamic_run_params["max_queries"] = recommended_count
# ---------- Step 3: Research ----------
researcher_payload = {
**base_payload,
"queries": query_gen_result.get("queries", []),
"run_params": dynamic_run_params
}
print(f"[{section}] Running Researcher")
researcher_raw = await Runner.run(self.researcher_agent, as_messages(researcher_payload))
# pdb.set_trace()
# researcher_result = parse_json(researcher_raw)
# researcher_result = ensure_keys(researcher_result, {"facts": [], "domains_seen": [], "gap_flags": []})
try:
researcher_result = json.loads(researcher_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing researcher JSON for {section}: {e}")
researcher_result = {"facts": [], "domains_seen": [], "gap_flags": []}
facts_to_url_mapping = {}
if 'facts' in researcher_result and len(researcher_result['facts'])>0:
for fact in researcher_result['facts']:
fact_id = fact["fact_id"]
source_url = fact["source_url"]
if fact_id not in facts_to_url_mapping:
facts_to_url_mapping[fact_id] = []
facts_to_url_mapping[fact_id].append(source_url)
# ---------- Step 4: Analysis ----------
facts_count = len(researcher_result.get("facts", []))
if progress_callback:
await progress_callback(f'๐งช {display_name} for "{topic}": Deep-analyzing {facts_count} facts and sources...')
analyst_payload = {
**base_payload,
"facts": researcher_result.get("facts", []),
"domains_seen": researcher_result.get("domains_seen", []),
"gap_flags": researcher_result.get("gap_flags", [])
}
print(f"[{section}] Running Analyst")
analyst_raw = await Runner.run(self.analyst_agent, as_messages(analyst_payload))
try:
analyst_result = json.loads(analyst_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing analyst JSON for {section}: {e}")
analyst_result = {"section": section, "bullets": [], "mini_takeaways": [], "conflicts": [], "gaps_next": []}
# ---------- Step 5: Quality Assessment (Critic) ----------
critic_result = {}
if self.enable_critic:
if progress_callback:
await progress_callback(f'๐ฌ {display_name} for "{topic}": Checking quality & identifying gaps...')
critic_payload = {
**base_payload,
"facts": researcher_result.get("facts", []),
"analyst_json": analyst_result
}
print(f"[{section}] Running Quality Assessment (Critic)")
critic_raw = await Runner.run(self.critic_agent, as_messages(critic_payload))
try:
critic_result = json.loads(critic_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing critic JSON for {section}: {e}")
critic_result = {
"needs_iteration": False,
"iteration_reason": "JSON parse error",
"quality_issues": [],
"gap_queries": [],
"confidence_assessment": 0.5
}
# Extract iteration decision from Critic
needs_iteration = critic_result.get("needs_iteration", False)
iteration_reason = critic_result.get("iteration_reason", "")
critic_confidence = critic_result.get("confidence_assessment", 0.5)
gap_queries_raw = critic_result.get("gap_queries", [])
print(f"[{section}] Critic assessment - Needs iteration: {needs_iteration}, Confidence: {critic_confidence:.2f}")
# ---------- Step 6: Self-Healing Research Loop (if needed) ----------
if self.enable_critic and needs_iteration and len(gap_queries_raw) > 0:
if progress_callback:
await progress_callback(f'๐ {display_name} for "{topic}": Filling knowledge gaps with {len(gap_queries_raw[:5])} additional searches...')
print(f"[{section}] Triggering self-healing loop: {iteration_reason}")
# Use Critic's gap queries (already formatted)
iteration_queries = gap_queries_raw[:5] # Max 5 gap queries
# Second research round
iteration_payload = {
**base_payload,
"queries": iteration_queries,
"run_params": {**dynamic_run_params, "max_queries": len(iteration_queries)}
}
print(f"[{section}] Running iteration research with {len(iteration_queries)} gap queries")
iteration_researcher_raw = await Runner.run(self.researcher_agent, as_messages(iteration_payload))
try:
iteration_researcher_result = json.loads(iteration_researcher_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing iteration researcher JSON for {section}: {e}")
iteration_researcher_result = {"facts": [], "domains_seen": [], "gap_flags": []}
# Merge original and iteration facts (handle duplicates)
all_facts = researcher_result.get("facts", [])
iteration_facts = iteration_researcher_result.get("facts", [])
# Simple deduplication by claim+entity+source
seen_fact_keys = set()
for fact in all_facts:
fact_key = f"{fact.get('entity', '')}-{fact.get('claim', '')}-{fact.get('source_url', '')}"
seen_fact_keys.add(fact_key)
new_facts = []
for fact in iteration_facts:
fact_key = f"{fact.get('entity', '')}-{fact.get('claim', '')}-{fact.get('source_url', '')}"
if fact_key not in seen_fact_keys:
new_facts.append(fact)
seen_fact_keys.add(fact_key)
merged_facts = all_facts + new_facts
merged_researcher_result = {
**researcher_result,
"facts": merged_facts,
"domains_seen": list(set(researcher_result.get("domains_seen", []) + iteration_researcher_result.get("domains_seen", [])))
}
print(f"[{section}] Merged {len(new_facts)} new facts, total: {len(merged_facts)}")
if progress_callback:
await progress_callback(f'๐ฌ {display_name} for "{topic}": Re-analyzing with {len(merged_facts)} facts (+{len(new_facts)} new)...')
# Re-run analyst with ALL facts (original + iteration facts)
iteration_analyst_payload = {
**base_payload, # Use base_payload for consistency
"facts": merged_facts, # This contains ALL facts: original + new from iteration
"domains_seen": merged_researcher_result.get("domains_seen", []),
"gap_flags": merged_researcher_result.get("gap_flags", [])
}
print(f"[{section}] Re-running Analyst with expanded facts (total: {len(merged_facts)} facts)")
iteration_analyst_raw = await Runner.run(self.analyst_agent, as_messages(iteration_analyst_payload))
try:
iteration_analyst_result = json.loads(iteration_analyst_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing iteration analyst JSON for {section}: {e}")
iteration_analyst_result = analyst_result # fallback to original
# Update the final facts and analysis for editor
print(f"[{section}] Iteration complete - updated facts and analysis ready for Editor")
researcher_result = merged_researcher_result
analyst_result = iteration_analyst_result
# Update facts_to_url_mapping with new facts
for fact in new_facts:
fact_id = fact["fact_id"]
source_url = fact["source_url"]
if fact_id not in facts_to_url_mapping:
facts_to_url_mapping[fact_id] = []
facts_to_url_mapping[fact_id].append(source_url)
# ---------- Step 7: Editor (Always Runs Once at the End) ----------
if progress_callback:
iteration_status = "enhanced with gap-fill" if (self.enable_critic and needs_iteration and len(gap_queries_raw) > 0) else "ready"
await progress_callback(f'โ๏ธ {display_name} for "{topic}": Finalizing insights ({iteration_status})...')
editor_payload = {
**base_payload,
"analyst_json": analyst_result, # This is either original or iteration-enhanced
"facts": researcher_result.get("facts", []), # This is either original or merged facts
"critic_json": critic_result # Pass critic assessment to editor
}
iteration_status = "after iteration" if (self.enable_critic and needs_iteration and len(gap_queries_raw) > 0) else "no iteration"
print(f"[{section}] Running Editor ({iteration_status})")
editor_raw = await Runner.run(self.editor_agent, as_messages(editor_payload))
try:
editor_section = json.loads(editor_raw.final_output)
except json.JSONDecodeError as e:
print(f"Error parsing editor JSON for {section}: {e}")
editor_section = {"section": section, "highlights": [], "facts_ref": [], "gaps_next": [], "confidence": critic_confidence}
# Update facts_ref mapping
if 'facts_ref' in editor_section and len(editor_section['facts_ref'])>0:
updated_facts_ref = {}
for fact_referred_id in editor_section['facts_ref']:
if fact_referred_id in facts_to_url_mapping:
updated_facts_ref[fact_referred_id] = facts_to_url_mapping[fact_referred_id]
editor_section['facts_ref'] = deepcopy(updated_facts_ref)
return {
"section": section,
"section_brief": editor_section,
"display_name": display_name,
"section_color": section_color,
"artifacts": {
"complexity": complexity_result,
"queries": query_gen_result,
"facts": researcher_result,
"analysis": analyst_result,
"critic": critic_result,
"facts_to_url_mapping": facts_to_url_mapping,
"iteration_triggered": self.enable_critic and needs_iteration and len(gap_queries_raw) > 0
}
}