Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import asyncio | |
| from typing import Dict, List, Tuple | |
| from dotenv import load_dotenv | |
| import gradio as gr | |
| import pdb | |
| from agents import gen_trace_id | |
| from section_agent import SectionResearchManager | |
| from summarize_agent import generate_final_report | |
| from frameworks.big_idea_framework import big_idea_sections | |
| from frameworks.specific_idea_framework import specific_idea_sections | |
| load_dotenv(override=True) | |
| # ------------- Framework β Section descriptors (loaded from framework files) ------------- | |
| # ------------- Shared run params ------------- | |
| DEFAULT_RUN_PARAMS = { | |
| "depth": "standard", | |
| "lookback_days": 540, | |
| "langs": ["en"], | |
| "k_per_query": 6, | |
| "max_queries": 12 | |
| } | |
| # ------------- Helper: Make full section_details for SectionResearchManager ------------- | |
| def build_section_details(framework: str, topic: str, raw_desc: Dict, run_params: Dict) -> Dict: | |
| # Replace <TOPIC> placeholders in example queries | |
| ex_queries = [q.replace("<TOPIC>", topic) for q in raw_desc.get("example_queries", [])] | |
| section_descriptor = { | |
| "section": raw_desc["section"], | |
| "description": raw_desc["description"], | |
| "facets": raw_desc["facets"], | |
| "example_queries": ex_queries | |
| } | |
| return { | |
| "framework": framework, | |
| "topic_or_idea": topic, | |
| "section_descriptor": section_descriptor, | |
| "run_params": run_params | |
| } | |
| # ------------- Orchestrator (parallel) with streaming logs ------------- | |
| async def run_framework_parallel_stream(framework: str, topic: str): | |
| """ | |
| Async generator that yields (chat_text, partial_results_json) tuples as the run progresses. | |
| """ | |
| if framework not in ("big-idea", "specific-idea"): | |
| yield (f"β Unknown framework: {framework}", None) | |
| return | |
| section_defs = big_idea_sections() if framework == "big-idea" else specific_idea_sections() | |
| trace_id = gen_trace_id() | |
| trace_name = f"{framework} {topic}" | |
| # Initial setup message with time estimation | |
| num_sections = len(section_defs) | |
| framework_name = "Big-Idea Exploration" if framework == "big-idea" else "Specific-Idea Validation" | |
| est_time_min = num_sections * 1.5 # ~2-3 min per section minimum | |
| est_time_max = num_sections * 3 # ~3-5 min per section with iterations | |
| yield (f"""π **Starting {framework_name}** | |
| **Topic**: {topic} | |
| **Sections to analyze**: {num_sections} | |
| **Estimated time**: {est_time_min}-{est_time_max} minutes (depends on complexity) | |
| Each section goes through a 7-step research pipeline: | |
| 1. Complexity assessment | |
| 2. Query generation | |
| 3. Web research | |
| 4. Deep analysis | |
| 5. Quality check | |
| 6. Self-healing (if gaps found) | |
| 7. Final synthesis | |
| _Research is running in parallel across all sections..._ | |
| """, None) | |
| # Use asyncio.Queue to collect progress messages from all sections | |
| progress_queue = asyncio.Queue() | |
| # Create a progress callback that adds messages to the queue | |
| async def progress_callback(message: str): | |
| await progress_queue.put(message) | |
| # Show section overview | |
| yield (f"\nπ **Research Sections:**", None) | |
| for sec_name, desc in section_defs.items(): | |
| display_name = desc.get("display_name", sec_name.replace("_", " ").title()) | |
| section_desc = desc.get("description", "")[:100] + "..." if len(desc.get("description", "")) > 100 else desc.get("description", "") | |
| yield (f"β’ **{display_name}**: {section_desc}", None) | |
| yield (f"\nβ³ **Launching parallel research agents...**\n", None) | |
| # Kick off all sections in parallel | |
| tasks = [] | |
| mgrs = {} | |
| for sec_name, desc in section_defs.items(): | |
| details = build_section_details(framework, topic, desc, DEFAULT_RUN_PARAMS) | |
| mgr = SectionResearchManager(sec_name, enable_critic=False) | |
| mgrs[sec_name] = mgr | |
| tasks.append(asyncio.create_task(mgr.run_section_manager(trace_id, details, trace_name, progress_callback))) | |
| # Monitor both task completion and progress messages | |
| active_tasks = set(tasks) | |
| section_results = {} | |
| while active_tasks or not progress_queue.empty(): | |
| # Check for completed tasks | |
| done_tasks = {task for task in active_tasks if task.done()} | |
| for task in done_tasks: | |
| active_tasks.remove(task) | |
| try: | |
| res = await task | |
| sec = res["section"] | |
| brief = res["section_brief"] | |
| section_results[sec] = res | |
| # stream per-section done | |
| conf = brief.get("confidence", 0.0) | |
| hl_count = len(brief.get("highlights", [])) | |
| # Get display name and color for completion message | |
| display_name = res.get("display_name", sec.replace("_", " ").title()) | |
| section_color = res.get("section_color", "#95a5a6") | |
| yield (f'<span style="color: {section_color}; font-weight: bold;">β {display_name} complete β {hl_count} insights extracted (confidence: {conf:.0%})</span>', None) | |
| except Exception as e: | |
| print("Something went wrong") | |
| yield (f"β οΈ A section failed: {e}", None) | |
| # Check for progress messages | |
| try: | |
| while True: | |
| message = progress_queue.get_nowait() | |
| yield (message, None) | |
| except asyncio.QueueEmpty: | |
| pass | |
| # Brief sleep to prevent busy waiting | |
| if active_tasks: | |
| await asyncio.sleep(0.1) | |
| # Generate comprehensive final report using summarize_agent | |
| yield (f"\nπ― **All {num_sections} sections complete!**", None) | |
| yield ("π Synthesizing final report with cross-section fact verification and deduplication...", None) | |
| yield ("β±οΈ _This may take 1-2 minutes..._", None) | |
| report_data = await generate_final_report(framework, topic, section_results, trace_id, trace_name) | |
| # Format the final output - this will be handled by the improved UI | |
| yield ("\n⨠**Research Complete!** Your comprehensive report is ready below.", report_data) | |
| # ------------- Gradio UI ------------- | |
| CSS = """ | |
| #chat {height: 400px} | |
| .json-display {font-family: 'Monaco', 'Consolas', monospace; font-size: 12px;} | |
| .metadata-display {background: #f8f9fa; padding: 10px; border-radius: 5px;} | |
| """ | |
| with gr.Blocks(css=CSS, fill_height=True, theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("""## π ReallyDeepResearch | |
| **Deep, multi-agent research system** β Parallel exploration with self-healing quality checks | |
| Choose your framework: | |
| - π **Big-Idea**: Market landscape, tech stack, research frontier, opportunities | |
| - π― **Specific-Idea**: Problem validation, ROI, competition, GTM, risks | |
| _β±οΈ Research typically takes 8-10 minutes depending on complexity_ | |
| """) | |
| with gr.Row(): | |
| topic_in = gr.Textbox( | |
| label="Topic / Idea", | |
| placeholder="e.g., AI music β’ or β’ Agents to clear IT backlog", | |
| lines=1 | |
| ) | |
| with gr.Row(): | |
| btn_big = gr.Button("π Run Big-Idea Exploration", variant="primary") | |
| btn_specific = gr.Button("π― Run Specific-Idea Exploration") | |
| # Progress chat at the top | |
| chat = gr.Chatbot(label="π Research Progress", height=400, elem_id="chat") | |
| # Organized results in tabs | |
| with gr.Tabs(): | |
| with gr.TabItem("π Executive Report"): | |
| narrative_display = gr.Markdown( | |
| label="Executive Summary", | |
| value="Research results will appear here...", | |
| elem_classes=["narrative-display"] | |
| ) | |
| metadata_display = gr.Markdown( | |
| label="Research Statistics", | |
| value="", | |
| elem_classes=["metadata-display"] | |
| ) | |
| with gr.TabItem("π Structured Data"): | |
| json_display = gr.Code( | |
| label="Section Analysis (JSON)", | |
| language="json", | |
| value="{}", | |
| elem_classes=["json-display"] | |
| ) | |
| with gr.TabItem("πΎ Export"): | |
| download_data = gr.JSON(label="Full Research Data", visible=False) | |
| gr.Markdown("**Export Options:**") | |
| gr.Markdown("_Click a button below to download your research report._") | |
| with gr.Row(): | |
| export_json_btn = gr.DownloadButton("π₯ Download JSON", variant="primary") | |
| export_md_btn = gr.DownloadButton("π Download Markdown", variant="secondary") | |
| # Hidden state for messages and data | |
| state_msgs = gr.State([]) # List[Tuple[str,str]] | |
| async def _start_run(framework: str, topic: str, msgs: List[Tuple[str, str]]): | |
| if not topic or not topic.strip(): | |
| msgs = msgs + [("user", f"{framework}"), ("assistant", "β Please enter a topic/idea first.")] | |
| # Clear all outputs and return | |
| yield msgs, "", "", "", {}, msgs | |
| return | |
| # Add user's "start" message | |
| msgs = msgs + [("user", f"{framework}: {topic}")] | |
| # Clear previous outputs | |
| current_json = "" | |
| current_narrative = "" | |
| current_metadata = "" | |
| # Stream updates as they arrive | |
| async for text, report_data in run_framework_parallel_stream(framework, topic.strip()): | |
| msgs = msgs + [("assistant", text)] | |
| if report_data is not None: | |
| # Extract different parts of the report | |
| if isinstance(report_data, dict): | |
| # Format structured summary as JSON | |
| structured_summary = report_data.get("structured_summary", {}) | |
| current_json = json.dumps(structured_summary, indent=2, ensure_ascii=False) | |
| # Extract narrative report | |
| current_narrative = report_data.get("narrative_report", "") | |
| # Format metadata | |
| metadata = report_data.get("metadata", {}) | |
| current_metadata = f"""**Research Metadata:** | |
| - Total Facts: {metadata.get('total_facts', 0)} | |
| - Average Confidence: {metadata.get('avg_confidence', 0):.2f} | |
| - Sections Analyzed: {metadata.get('sections_count', 0)}""" | |
| yield msgs, current_json, current_narrative, current_metadata, report_data or {}, msgs | |
| # Final yield to ensure last state is displayed | |
| yield msgs, current_json, current_narrative, current_metadata, report_data or {}, msgs | |
| # Download functions | |
| def download_json(report_data): | |
| if not report_data or not isinstance(report_data, dict): | |
| # Return a placeholder file if no data | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump({"error": "No research data available yet"}, f, indent=2) | |
| return f.name | |
| import tempfile | |
| # Create temporary file for JSON download | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') as f: | |
| json.dump(report_data, f, indent=2, ensure_ascii=False) | |
| return f.name | |
| def download_markdown(report_data): | |
| if not report_data or not isinstance(report_data, dict): | |
| # Return a placeholder file if no data | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: | |
| f.write("# No research data available yet\n\nPlease run a research query first.") | |
| return f.name | |
| import tempfile | |
| # Get the narrative report | |
| narrative = report_data.get("narrative_report", "# No report available") | |
| # Create temporary file for Markdown download | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: | |
| f.write(narrative) | |
| return f.name | |
| # Button handlers (streaming) | |
| btn_big.click( | |
| _start_run, | |
| inputs=[gr.State("big-idea"), topic_in, state_msgs], | |
| outputs=[chat, json_display, narrative_display, metadata_display, download_data, state_msgs], | |
| queue=True | |
| ) | |
| btn_specific.click( | |
| _start_run, | |
| inputs=[gr.State("specific-idea"), topic_in, state_msgs], | |
| outputs=[chat, json_display, narrative_display, metadata_display, download_data, state_msgs], | |
| queue=True | |
| ) | |
| # Download button handlers - DownloadButton automatically triggers download when function returns a file path | |
| export_json_btn.click( | |
| fn=download_json, | |
| inputs=[download_data], | |
| outputs=export_json_btn | |
| ) | |
| export_md_btn.click( | |
| fn=download_markdown, | |
| inputs=[download_data], | |
| outputs=export_md_btn | |
| ) | |
| if __name__ == "__main__": | |
| # Launch Gradio | |
| demo.queue() # enables concurrency/streaming | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) | |