import os import json import asyncio from typing import Dict, List, Tuple from dotenv import load_dotenv import gradio as gr import pdb from agents import gen_trace_id from section_agent import SectionResearchManager from summarize_agent import generate_final_report from frameworks.big_idea_framework import big_idea_sections from frameworks.specific_idea_framework import specific_idea_sections load_dotenv(override=True) # ------------- Framework → Section descriptors (loaded from framework files) ------------- # ------------- Shared run params ------------- DEFAULT_RUN_PARAMS = { "depth": "standard", "lookback_days": 540, "langs": ["en"], "k_per_query": 6, "max_queries": 12 } # ------------- Helper: Make full section_details for SectionResearchManager ------------- def build_section_details(framework: str, topic: str, raw_desc: Dict, run_params: Dict) -> Dict: # Replace placeholders in example queries ex_queries = [q.replace("", topic) for q in raw_desc.get("example_queries", [])] section_descriptor = { "section": raw_desc["section"], "description": raw_desc["description"], "facets": raw_desc["facets"], "example_queries": ex_queries } return { "framework": framework, "topic_or_idea": topic, "section_descriptor": section_descriptor, "run_params": run_params } # ------------- Orchestrator (parallel) with streaming logs ------------- async def run_framework_parallel_stream(framework: str, topic: str): """ Async generator that yields (chat_text, partial_results_json) tuples as the run progresses. """ if framework not in ("big-idea", "specific-idea"): yield (f"āŒ Unknown framework: {framework}", None) return section_defs = big_idea_sections() if framework == "big-idea" else specific_idea_sections() trace_id = gen_trace_id() trace_name = f"{framework} {topic}" # Initial setup message with time estimation num_sections = len(section_defs) framework_name = "Big-Idea Exploration" if framework == "big-idea" else "Specific-Idea Validation" est_time_min = num_sections * 1.5 # ~2-3 min per section minimum est_time_max = num_sections * 3 # ~3-5 min per section with iterations yield (f"""šŸš€ **Starting {framework_name}** **Topic**: {topic} **Sections to analyze**: {num_sections} **Estimated time**: {est_time_min}-{est_time_max} minutes (depends on complexity) Each section goes through a 7-step research pipeline: 1. Complexity assessment 2. Query generation 3. Web research 4. Deep analysis 5. Quality check 6. Self-healing (if gaps found) 7. Final synthesis _Research is running in parallel across all sections..._ """, None) # Use asyncio.Queue to collect progress messages from all sections progress_queue = asyncio.Queue() # Create a progress callback that adds messages to the queue async def progress_callback(message: str): await progress_queue.put(message) # Show section overview yield (f"\nšŸ“‹ **Research Sections:**", None) for sec_name, desc in section_defs.items(): display_name = desc.get("display_name", sec_name.replace("_", " ").title()) section_desc = desc.get("description", "")[:100] + "..." if len(desc.get("description", "")) > 100 else desc.get("description", "") yield (f"• **{display_name}**: {section_desc}", None) yield (f"\nā³ **Launching parallel research agents...**\n", None) # Kick off all sections in parallel tasks = [] mgrs = {} for sec_name, desc in section_defs.items(): details = build_section_details(framework, topic, desc, DEFAULT_RUN_PARAMS) mgr = SectionResearchManager(sec_name, enable_critic=False) mgrs[sec_name] = mgr tasks.append(asyncio.create_task(mgr.run_section_manager(trace_id, details, trace_name, progress_callback))) # Monitor both task completion and progress messages active_tasks = set(tasks) section_results = {} while active_tasks or not progress_queue.empty(): # Check for completed tasks done_tasks = {task for task in active_tasks if task.done()} for task in done_tasks: active_tasks.remove(task) try: res = await task sec = res["section"] brief = res["section_brief"] section_results[sec] = res # stream per-section done conf = brief.get("confidence", 0.0) hl_count = len(brief.get("highlights", [])) # Get display name and color for completion message display_name = res.get("display_name", sec.replace("_", " ").title()) section_color = res.get("section_color", "#95a5a6") yield (f'āœ… {display_name} complete — {hl_count} insights extracted (confidence: {conf:.0%})', None) except Exception as e: print("Something went wrong") yield (f"āš ļø A section failed: {e}", None) # Check for progress messages try: while True: message = progress_queue.get_nowait() yield (message, None) except asyncio.QueueEmpty: pass # Brief sleep to prevent busy waiting if active_tasks: await asyncio.sleep(0.1) # Generate comprehensive final report using summarize_agent yield (f"\nšŸŽÆ **All {num_sections} sections complete!**", None) yield ("šŸ”„ Synthesizing final report with cross-section fact verification and deduplication...", None) yield ("ā±ļø _This may take 1-2 minutes..._", None) report_data = await generate_final_report(framework, topic, section_results, trace_id, trace_name) # Format the final output - this will be handled by the improved UI yield ("\n✨ **Research Complete!** Your comprehensive report is ready below.", report_data) # ------------- Gradio UI ------------- CSS = """ #chat {height: 400px} .json-display {font-family: 'Monaco', 'Consolas', monospace; font-size: 12px;} .metadata-display {background: #f8f9fa; padding: 10px; border-radius: 5px;} """ with gr.Blocks(css=CSS, fill_height=True, theme=gr.themes.Soft()) as demo: gr.Markdown("""## šŸ”Ž ReallyDeepResearch **Deep, multi-agent research system** — Parallel exploration with self-healing quality checks Choose your framework: - 🌐 **Big-Idea**: Market landscape, tech stack, research frontier, opportunities - šŸŽÆ **Specific-Idea**: Problem validation, ROI, competition, GTM, risks _ā±ļø Research typically takes 8-10 minutes depending on complexity_ """) with gr.Row(): topic_in = gr.Textbox( label="Topic / Idea", placeholder="e.g., AI music • or • Agents to clear IT backlog", lines=1 ) with gr.Row(): btn_big = gr.Button("🌐 Run Big-Idea Exploration", variant="primary") btn_specific = gr.Button("šŸŽÆ Run Specific-Idea Exploration") # Progress chat at the top chat = gr.Chatbot(label="šŸ”„ Research Progress", height=400, elem_id="chat") # Organized results in tabs with gr.Tabs(): with gr.TabItem("šŸ“„ Executive Report"): narrative_display = gr.Markdown( label="Executive Summary", value="Research results will appear here...", elem_classes=["narrative-display"] ) metadata_display = gr.Markdown( label="Research Statistics", value="", elem_classes=["metadata-display"] ) with gr.TabItem("šŸ“Š Structured Data"): json_display = gr.Code( label="Section Analysis (JSON)", language="json", value="{}", elem_classes=["json-display"] ) with gr.TabItem("šŸ’¾ Export"): download_data = gr.JSON(label="Full Research Data", visible=False) gr.Markdown("**Export Options:**") gr.Markdown("_Click a button below to download your research report._") with gr.Row(): export_json_btn = gr.DownloadButton("šŸ“„ Download JSON", variant="primary") export_md_btn = gr.DownloadButton("šŸ“ Download Markdown", variant="secondary") # Hidden state for messages and data state_msgs = gr.State([]) # List[Tuple[str,str]] async def _start_run(framework: str, topic: str, msgs: List[Tuple[str, str]]): if not topic or not topic.strip(): msgs = msgs + [("user", f"{framework}"), ("assistant", "āŒ Please enter a topic/idea first.")] # Clear all outputs and return yield msgs, "", "", "", {}, msgs return # Add user's "start" message msgs = msgs + [("user", f"{framework}: {topic}")] # Clear previous outputs current_json = "" current_narrative = "" current_metadata = "" # Stream updates as they arrive async for text, report_data in run_framework_parallel_stream(framework, topic.strip()): msgs = msgs + [("assistant", text)] if report_data is not None: # Extract different parts of the report if isinstance(report_data, dict): # Format structured summary as JSON structured_summary = report_data.get("structured_summary", {}) current_json = json.dumps(structured_summary, indent=2, ensure_ascii=False) # Extract narrative report current_narrative = report_data.get("narrative_report", "") # Format metadata metadata = report_data.get("metadata", {}) current_metadata = f"""**Research Metadata:** - Total Facts: {metadata.get('total_facts', 0)} - Average Confidence: {metadata.get('avg_confidence', 0):.2f} - Sections Analyzed: {metadata.get('sections_count', 0)}""" yield msgs, current_json, current_narrative, current_metadata, report_data or {}, msgs # Final yield to ensure last state is displayed yield msgs, current_json, current_narrative, current_metadata, report_data or {}, msgs # Download functions def download_json(report_data): if not report_data or not isinstance(report_data, dict): # Return a placeholder file if no data import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump({"error": "No research data available yet"}, f, indent=2) return f.name import tempfile # Create temporary file for JSON download with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') as f: json.dump(report_data, f, indent=2, ensure_ascii=False) return f.name def download_markdown(report_data): if not report_data or not isinstance(report_data, dict): # Return a placeholder file if no data import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: f.write("# No research data available yet\n\nPlease run a research query first.") return f.name import tempfile # Get the narrative report narrative = report_data.get("narrative_report", "# No report available") # Create temporary file for Markdown download with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: f.write(narrative) return f.name # Button handlers (streaming) btn_big.click( _start_run, inputs=[gr.State("big-idea"), topic_in, state_msgs], outputs=[chat, json_display, narrative_display, metadata_display, download_data, state_msgs], queue=True ) btn_specific.click( _start_run, inputs=[gr.State("specific-idea"), topic_in, state_msgs], outputs=[chat, json_display, narrative_display, metadata_display, download_data, state_msgs], queue=True ) # Download button handlers - DownloadButton automatically triggers download when function returns a file path export_json_btn.click( fn=download_json, inputs=[download_data], outputs=export_json_btn ) export_md_btn.click( fn=download_markdown, inputs=[download_data], outputs=export_md_btn ) if __name__ == "__main__": # Launch Gradio demo.queue() # enables concurrency/streaming demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")))