import gradio as gr
from dotenv import load_dotenv
from research_manager import ResearchManager
import markdown
import re
from file_processor import process_file, get_file_icon, format_file_size
load_dotenv(override=True)
async def run_research(query: str, model_choice: str, conversation_history: list, attachments: list, progress=gr.Progress()):
"""Run research and yield updates for both report and references"""
status_messages = []
final_report_md = ""
references_list = []
total_searches = 10 # Default, will be updated
writing_progress = 0.75 # Track dummy progress during writing
progress(0, desc="🚀 Starting research...")
# Show the query at the top with action buttons
# Escape single quotes for JavaScript
query_escaped = query.replace("'", "'")
query_display = f'''
Your Research Query:
{query}
'''
# Collect all chunks and parse structured messages
async for chunk in ResearchManager(model_choice).run(query, conversation_history, attachments):
# Parse structured messages (format: TYPE|data)
if "|" in chunk:
msg_type, msg_data = chunk.split("|", 1)
if msg_type == "INIT":
progress(0.10, desc="🤖 Agents initialized")
status_messages.append(msg_data)
elif msg_type == "PLANNING_COMPLETE":
total_searches = int(msg_data)
progress(0.20, desc=f"📋 Planning complete - {total_searches} searches queued")
status_messages.append(f"Planning complete - {total_searches} searches to perform")
elif msg_type == "SEARCH_PROGRESS":
parts = msg_data.split("|")
current = int(parts[0])
total = int(parts[1])
# Progress from 25% to 70% during searches (45% range for 10 searches)
search_progress = 0.25 + (current / total) * 0.45
progress(search_progress, desc=f"🔍 Searching {current}/{total}...")
status_messages.append(f"Searching {current}/{total}...")
elif msg_type == "SEARCH_COMPLETE":
progress(0.70, desc="✅ All searches complete")
status_messages.append(msg_data)
elif msg_type == "WRITING_START":
# Start writing at 75%
writing_progress = 0.75
progress(writing_progress, desc="✍️ Writing comprehensive report...")
status_messages.append("Writing report...")
# Simulate progress during writing (75% -> 85%)
import asyncio
async def simulate_writing_progress():
for i in range(3):
await asyncio.sleep(2) # Update every 2 seconds
nonlocal writing_progress
writing_progress = min(0.85, writing_progress + 0.05)
progress(writing_progress, desc="✍️ Writing comprehensive report...")
# Start dummy progress in background
asyncio.create_task(simulate_writing_progress())
elif msg_type == "REPORT_READY":
# Report is ready! Process and show it immediately
final_report_md = msg_data
# Extract references from markdown
references_list = extract_references(final_report_md)
# Remove the References section from the report markdown
report_without_refs = remove_references_section(final_report_md)
# Convert markdown to HTML for better rendering
final_html = markdown.markdown(
report_without_refs,
extensions=['extra', 'codehilite', 'tables', 'fenced_code']
)
# Make inline citations clickable
final_html = make_citations_clickable(final_html)
# Format references as clean HTML list
references_html = format_references_html(references_list)
# Mark progress as complete and BREAK the loop
# This ensures Gradio closes the progress bar immediately
progress(1.0, desc="✅ Research complete!")
# Add query display and action buttons to the final report
report_actions = '''
'''
final_html_with_query = query_display + final_html + report_actions
# YIELD THE FINAL REPORT and stop here
# Email will be sent but we won't wait for it
yield final_html_with_query, references_html
# Stop processing - don't wait for email messages
return
elif msg_type == "EMAIL_START":
# Won't reach here because we return after REPORT_READY
pass
elif msg_type == "COMPLETE":
# Won't reach here because we return after REPORT_READY
pass
# Show status messages in report area while processing
if not final_report_md:
status_html = query_display + '
'
for msg in status_messages:
status_html += f'
• {msg}
'
status_html += '
'
yield status_html, "" # Empty references while processing
def remove_references_section(markdown_text):
"""Remove the References section from markdown"""
# Remove everything from ## References onwards
ref_pattern = r'##\s*References\s*\n.*'
cleaned = re.sub(ref_pattern, '', markdown_text, flags=re.DOTALL | re.IGNORECASE)
return cleaned.strip()
def extract_references(markdown_text):
"""Extract references section from markdown"""
references = []
# Find the References section
ref_pattern = r'##\s*References\s*\n(.*?)(?=\n##|\Z)'
match = re.search(ref_pattern, markdown_text, re.DOTALL | re.IGNORECASE)
if match:
ref_section = match.group(1)
# Extract numbered list items with markdown links
# Pattern: 1. [Title](URL) or just plain URLs
list_items = re.findall(r'\d+\.\s*\[([^\]]+)\]\(([^\)]+)\)', ref_section)
if list_items:
references = [(title.strip(), url.strip()) for title, url in list_items]
else:
# Fallback: extract plain URLs
urls = re.findall(r'https?://[^\s<>"{}|\\^`\[\]]+', ref_section)
references = [(url, url) for url in urls]
return references
def make_citations_clickable(html_text):
"""Convert inline citations [1], [2], etc. to clickable links that switch to References tab"""
# Pattern to match [1], [2, 3], [1, 2, 3], etc.
def replace_citation(match):
full_match = match.group(0)
numbers = match.group(1)
# Split by comma and convert to clickable links
nums = [n.strip() for n in numbers.split(',')]
clickable_nums = []
for num in nums:
# Create a link that triggers tab switch via JavaScript
# Using a more reliable Gradio tab selector
clickable_nums.append(
f' 1) {{ '
f'tabs[1].click(); '
f'setTimeout(() => {{ '
f'const ref = document.getElementById(\'ref-{num}\'); '
f'if (ref) {{ '
f'ref.scrollIntoView({{ behavior: \'smooth\', block: \'center\' }}); '
f'ref.style.transition = \'all 0.3s\'; '
f'ref.style.boxShadow = \'0 0 20px rgba(16, 185, 129, 0.5)\'; '
f'setTimeout(() => {{ ref.style.boxShadow = \'\'; }}, 2000); '
f'}} '
f'}}, 300); '
f'}} '
f'" '
f'title="Click to view reference {num}">{num}'
)
return '[' + ', '.join(clickable_nums) + ']'
# Match patterns like [1], [2, 3], [1, 2, 3, 4], etc.
citation_pattern = r'\[(\d+(?:\s*,\s*\d+)*)\]'
return re.sub(citation_pattern, replace_citation, html_text)
def format_references_html(references):
"""Format references as clean HTML with IDs for linking"""
if not references:
return '
No references found
'
html = '
'
for idx, (title, url) in enumerate(references, 1):
html += f'''