dromero-nttd's picture
Add MarkItDown retrieval with browser headers and content cleanup
d81c484
from config import OPENAI_MODELS
import argparse
import os
import gradio as gr
import requests
import logging
import re
import warnings
from typing import Optional
import base64
import urllib.parse
# Suppress Pydantic UserWarning about "model_name" conflict
warnings.filterwarnings("ignore", category=UserWarning, module="pydantic")
# Suppress Pydantic serializer warnings
warnings.filterwarnings("ignore", message=".*Pydantic serializer warnings.*", category=UserWarning)
from openai import AzureOpenAI, OpenAI
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool
from duckduckgo_search import DDGS
from newspaper import Article, Config as NewspaperConfig
from bs4 import BeautifulSoup
try:
import trafilatura
except Exception:
trafilatura = None
try:
from readability import Document
except Exception:
Document = None
try:
from playwright.sync_api import sync_playwright
except Exception:
sync_playwright = None
try:
from markitdown import MarkItDown
except Exception:
MarkItDown = None
import fitz # PyMuPDF
from io import BytesIO, StringIO
import sys
import asyncio
import threading
import time
# Basic logger configuration
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger("httpx").setLevel(logging.WARNING)
def setup_logging():
"""Set up logging for better error tracking."""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Remove any existing handlers
if logger.hasHandlers():
logger.handlers.clear()
# Create a handler that writes to stdout
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s | %(levelname)-8s | %(message)s', datefmt='%H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# Global variables
TOKENS_SUMMARIZATION = 0
MODEL_CHOICE = "openai"
RETRIEVAL_CONTEXT_BUDGET_TOKENS = 0
RETRIEVAL_CONTEXT_USED_TOKENS = 0
GROQ_MODEL_PREFIXES = ("llama", "mixtral", "gemma", "qwen", "deepseek")
OPENAI_ALLOWED_MODELS = ("gpt-5.2", "gpt-5-mini", "gpt-5-nano")
STANDARD_BROWSER_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9,es;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
}
def list_openai_models(api_key: str) -> list[str]:
"""Return allowed OpenAI model ids present in the account."""
client = OpenAI(api_key=api_key)
models = client.models.list()
available = {model.id for model in models.data}
return [model for model in OPENAI_ALLOWED_MODELS if model in available]
def list_groq_models(api_key: str) -> list[str]:
"""Return sorted Groq model ids using the OpenAI-compatible models endpoint."""
client = OpenAI(api_key=api_key, base_url="https://api.groq.com/openai/v1")
models = client.models.list()
model_ids = sorted(
{
model.id
for model in models.data
if any(prefix in model.id.lower() for prefix in GROQ_MODEL_PREFIXES)
}
)
return model_ids
def list_azure_models(api_key: str, api_base: str, api_version: str) -> list[str]:
"""
Return sorted Azure model ids available in the resource.
Azure may expose model families instead of deployment names depending on config.
"""
client = AzureOpenAI(
api_key=api_key,
azure_endpoint=api_base,
api_version=api_version,
)
models = client.models.list()
model_ids = sorted({model.id for model in models.data})
return model_ids
def get_model_token_limits(model_id: str) -> dict[str, int]:
"""
Heuristic output-token limits based on the selected model family.
Returns per-use caps for: base agent, advanced agent, and retrieval summary.
"""
model = (model_id or "").lower()
# Reasoning-style models
if model.startswith(("o1", "o3", "o4")):
return {"base": 2500, "advanced": 3500, "summary": 3000}
# Smaller/cheaper models
if "nano" in model:
return {"base": 800, "advanced": 1200, "summary": 1200}
if "mini" in model or "8b" in model:
# Tool-heavy multi-agent runs need more completion headroom than single-turn chat.
return {"base": 6000, "advanced": 8000, "summary": 2200}
# Mid-size open models
if "32b" in model or "70b" in model:
return {"base": 2500, "advanced": 4000, "summary": 3000}
# Default for strong general models (e.g., gpt-4o class)
return {"base": 3000, "advanced": 5000, "summary": 3500}
def estimate_tokens(text: str) -> int:
"""Rough token estimator (conservative enough for budgeting)."""
if not text:
return 0
return max(1, len(text) // 4)
def trim_text_to_token_budget(text: str, max_tokens: int) -> str:
"""Trim text to an approximate token budget using char-based estimation."""
if max_tokens <= 0:
return ""
max_chars = max_tokens * 4
if len(text) <= max_chars:
return text
return text[:max_chars].rstrip()
def get_retrieval_context_budget_tokens(model_id: str) -> int:
"""
Budget for accumulated retrieval summaries passed through the agent context.
Keep it conservative to avoid overflow when assembling the final report.
"""
limits = get_model_token_limits(model_id)
# 3x advanced output as retrieval context envelope, clamped to a sane range.
return max(3500, min(14000, limits["advanced"] * 3))
def extract_responses_text(response: object) -> str:
"""Robustly extract text from OpenAI Responses API payloads."""
text = getattr(response, "output_text", "") or ""
if text.strip():
return text
chunks: list[str] = []
for item in (getattr(response, "output", None) or []):
for content in (getattr(item, "content", None) or []):
maybe_text = getattr(content, "text", None)
if maybe_text:
chunks.append(maybe_text)
return "\n".join(chunks).strip()
def export_to_markdown(result):
"""Utility to export the final result to an output.md file."""
try:
os.makedirs("outputs", exist_ok=True)
with open("outputs/output.md", "w") as file:
file.write(result)
return "outputs/output.md"
except Exception as e:
logger.error("Error exporting to markdown: %s", str(e))
return f"Error exporting: {e}"
def _fetch_html_with_retry(url: str, headers: dict[str, str], timeout: int = 15) -> Optional[str]:
"""Fetch HTML with SSL fallback for endpoints with TLS quirks."""
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as get_err:
if "SSL" in str(get_err).upper():
response = requests.get(url, headers=headers, timeout=timeout, verify=False)
response.raise_for_status()
return response.text
raise
def _extract_with_newspaper(url: str, config: NewspaperConfig) -> str:
article = Article(url, config=config)
article.download()
article.parse()
return (article.text or "").strip()
def _extract_with_trafilatura(html: str, url: str) -> str:
if trafilatura is None:
return ""
extracted = trafilatura.extract(
html,
url=url,
include_links=False,
include_images=False,
favor_precision=True,
output_format="txt",
)
return (extracted or "").strip()
def _extract_with_readability(html: str) -> str:
if Document is None:
return ""
doc = Document(html)
main_html = doc.summary()
soup = BeautifulSoup(main_html, "lxml")
for tag in soup(["script", "style", "noscript", "svg"]):
tag.decompose()
return " ".join(soup.stripped_strings).strip()
def _extract_with_bs4(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript", "svg", "footer", "header"]):
tag.decompose()
return " ".join(soup.stripped_strings).strip()
def _extract_with_markitdown(url: str, headers: dict[str, str]) -> str:
"""Fetch URL and convert to markdown using Microsoft MarkItDown."""
if MarkItDown is None:
return ""
session = requests.Session()
session.headers.update(headers)
md = MarkItDown(requests_session=session)
result = md.convert_uri(url)
return (getattr(result, "text_content", "") or "").strip()
def _clean_extracted_content(text: str) -> str:
"""
Remove common navigation/boilerplate noise from markdown-like extraction output.
Falls back to original content if cleaning is too aggressive.
"""
if not text:
return ""
stop_markers = (
"explora en nuestros medios",
"más sitios que te gustarán",
"ediciones internacionales",
"preferencias de privacidad",
"xatakaletter",
)
drop_contains = (
"ver más artículos",
"ver más vídeos",
"facebook",
"twitter",
"flipboard",
"temas de interés",
)
link_re = re.compile(r"\[[^\]]+\]\([^)]+\)")
image_re = re.compile(r"!\[[^\]]*\]\([^)]+\)")
multi_space_re = re.compile(r"[ \t]{2,}")
def plain_without_links(line: str) -> str:
line = image_re.sub(" ", line)
line = link_re.sub(" ", line)
line = multi_space_re.sub(" ", line)
return line.strip()
working = text
lowered = working.lower()
cut_at = None
for marker in stop_markers:
pos = lowered.find(marker)
if pos != -1 and (cut_at is None or pos < cut_at):
cut_at = pos
if cut_at is not None:
working = working[:cut_at]
kept_lines: list[str] = []
dropped = 0
testimonial_cards_seen = 0
for raw_line in working.splitlines():
line = raw_line.rstrip()
stripped = line.strip()
lower = stripped.lower()
if not stripped:
continue
if any(p in lower for p in drop_contains):
dropped += 1
continue
links = len(link_re.findall(stripped))
plain = plain_without_links(stripped)
if links >= 3 and len(plain) < 30:
dropped += 1
continue
if ("unavatar.io" in lower or "dicebear.com" in lower) and "[![" in lower:
testimonial_cards_seen += 1
if testimonial_cards_seen > 10:
dropped += 1
continue
kept_lines.append(line)
cleaned = re.sub(r"\n{3,}", "\n\n", "\n".join(kept_lines)).strip()
# Guardrail: avoid over-trimming useful content.
if len(cleaned) < max(200, int(len(text) * 0.15)):
return text.strip()
return cleaned
def _extract_with_playwright_reader(url: str, user_agent: str) -> str:
"""
Render page with Playwright and extract reader-style main content.
Returns empty string if Playwright is unavailable or extraction fails.
"""
if sync_playwright is None:
return ""
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(user_agent=user_agent, locale="en-US")
page = context.new_page()
page.goto(url, wait_until="domcontentloaded", timeout=30000)
try:
page.wait_for_load_state("networkidle", timeout=10000)
except Exception:
pass
rendered_html = page.content()
context.close()
browser.close()
# Reader-like extraction on rendered DOM
text = _extract_with_readability(rendered_html)
if text:
return text
text = _extract_with_trafilatura(rendered_html, url)
if text:
return text
return _extract_with_bs4(rendered_html)
except Exception as e:
logger.warning("Playwright reader extraction failed: %s", str(e))
return ""
def fetch_content(url):
"""
Fetch the content from a URL, handling either PDFs or normal web articles.
- url: The URL to fetch the content from.
"""
try:
# Best-effort HEAD request to check content type.
# Some sites block/handshake-fail on HEAD while GET works.
content_type = ""
try:
response = requests.head(url, allow_redirects=True, timeout=10)
content_type = response.headers.get('Content-Type', '').lower()
except requests.exceptions.RequestException as head_err:
logger.warning("HEAD request failed, continuing with GET/article extraction: %s", str(head_err))
if 'application/pdf' in content_type:
# The URL points to a PDF; download and extract text
pdf_response = requests.get(url, stream=True, timeout=10)
pdf_response.raise_for_status()
pdf_file = BytesIO(pdf_response.content)
with fitz.open(stream=pdf_file, filetype="pdf") as doc:
text = ""
for page_num, page in enumerate(doc, start=1):
page_text = page.get_text()
if page_text:
text += page_text
else:
logger.warning(f"Unable to extract text from page {page_num} of the PDF.")
return text.strip()
else:
# Not a PDF; try extractor chain with progressive fallbacks.
article_config = NewspaperConfig()
article_config.allow_binary_content = True
article_config.browser_user_agent = STANDARD_BROWSER_HEADERS["User-Agent"]
article_config.request_timeout = 15
headers = dict(STANDARD_BROWSER_HEADERS)
html_text = ""
try:
html_text = _fetch_html_with_retry(url, headers=headers, timeout=15) or ""
except Exception as html_err:
logger.warning("Direct HTML fetch failed: %s", str(html_err))
try:
article_text = _extract_with_newspaper(url, article_config)
if article_text:
logger.info("Retrieval extractor used: newspaper")
return _clean_extracted_content(article_text)
except Exception as article_err:
logger.warning("Newspaper extraction failed: %s", str(article_err))
try:
markitdown_text = _extract_with_markitdown(url, headers)
if markitdown_text:
logger.info("Retrieval extractor used: markitdown")
return _clean_extracted_content(markitdown_text)
except Exception as md_err:
logger.warning("MarkItDown extraction failed: %s", str(md_err))
if html_text:
trafilatura_text = _extract_with_trafilatura(html_text, url)
if trafilatura_text:
logger.info("Retrieval extractor used: trafilatura")
return _clean_extracted_content(trafilatura_text)
readability_text = _extract_with_readability(html_text)
if readability_text:
logger.info("Retrieval extractor used: readability")
return _clean_extracted_content(readability_text)
bs4_text = _extract_with_bs4(html_text)
if bs4_text:
logger.info("Retrieval extractor used: beautifulsoup")
return _clean_extracted_content(bs4_text)
playwright_text = _extract_with_playwright_reader(url, article_config.browser_user_agent)
if playwright_text:
logger.info("Retrieval extractor used: playwright-reader")
return _clean_extracted_content(playwright_text)
raise requests.exceptions.RequestException("Unable to extract readable content with available extractors.")
except requests.exceptions.RequestException as req_err:
logger.error("Error in the HTTP request: %s", str(req_err))
return f"Error in the HTTP request: {req_err}"
except Exception as e:
logger.error("Error getting the content: %s", str(e))
return f"Error getting the content: {e}"
# Tools
@tool('DuckDuckGoSearchResults')
def search_results(search_query: str) -> list:
"""
Performs a web search to gather and return a collection of search results with this structure:
- title: The title of the search result.
- snippet: A short snippet of the search result.
- link: The link to the search result.
"""
def _decode_bing_redirect_url(url: str) -> str:
parsed = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(parsed.query)
encoded = query.get("u", [""])[0]
if not encoded:
return url
try:
if encoded.startswith("a1"):
encoded = encoded[2:]
padded = encoded + ("=" * (-len(encoded) % 4))
decoded = base64.urlsafe_b64decode(padded.encode()).decode("utf-8", errors="ignore")
return decoded or url
except Exception:
return url
def _bing_fallback_search(query: str, max_results: int = 5) -> list[dict]:
try:
search_url = "https://www.bing.com/search?q=" + urllib.parse.quote_plus(query)
response = requests.get(
search_url,
timeout=15,
headers={"User-Agent": "Mozilla/5.0"},
)
response.raise_for_status()
soup = BeautifulSoup(response.text, "lxml")
out: list[dict] = []
for item in soup.select("li.b_algo"):
a_tag = item.select_one("h2 a")
if not a_tag:
continue
href = (a_tag.get("href") or "").strip()
if not href:
continue
title = a_tag.get_text(" ", strip=True)
snippet_tag = item.select_one(".b_caption p")
snippet = snippet_tag.get_text(" ", strip=True) if snippet_tag else ""
out.append(
{
"title": title,
"snippet": snippet,
"link": _decode_bing_redirect_url(href),
}
)
if len(out) >= max_results:
break
return out
except Exception as e:
logger.error("Bing fallback search failed: %s", str(e))
return []
candidates = [search_query, f"{search_query} official", f"{search_query} github"]
combined: list[dict] = []
seen_links: set[str] = set()
for candidate in candidates:
ddg_results: list[dict] = []
try:
results = DDGS().text(candidate, max_results=5, timelimit='m')
ddg_results = [
{
"title": result.get("title", ""),
"snippet": result.get("body", ""),
"link": result.get("href", ""),
}
for result in results
]
except Exception as e:
logger.error("Error performing DDG search: %s", str(e))
if not ddg_results:
ddg_results = _bing_fallback_search(candidate, max_results=5)
for row in ddg_results:
link = (row.get("link") or "").strip()
if not link or link in seen_links:
continue
seen_links.add(link)
combined.append(row)
if len(combined) >= 5:
return combined
return combined
@tool('WebScrapper')
def web_scrapper(url: str, topic: str) -> str:
"""
Extract and read the content of a specified link and generate a summary on a specific topic.
- url: The URL to extract the content from.
- topic: Strign with the topic to generate a summary on.
"""
global TOKENS_SUMMARIZATION, RETRIEVAL_CONTEXT_BUDGET_TOKENS, RETRIEVAL_CONTEXT_USED_TOKENS
try:
content = fetch_content(url)
selected_model = os.getenv("ANALYSIS_MODEL", OPENAI_MODELS["base"])
token_limits = get_model_token_limits(selected_model)
summary_max_tokens = token_limits["summary"]
prompt = f"""
# OBJECTIVE
Generate an in-depth summary of the following CONTENT on the topic "{topic}"
# INSTRUCTIONS
- Provide in-depth insights based on the following CONTENT.
- If the following CONTENT is not directly related to the topic "{topic}", you MUST respond with INVALID CONTENT.
- Include insights about why the content is important for the topic, possible challenges and advances...
- The format will be markdown.
- Avoid making up anything. Every insight MUST be based on the content.
# CONTENT:
"{content}"
"""
context_messages = [
{
"role": "system",
"content": "You are an expert summarizing content for use as context. Focus on the main points."
},
{
"role": "user",
"content": str(prompt)
}
]
# Use AzureOpenAI, OpenAI or Groq based on model_choice
if MODEL_CHOICE == "azure":
client = AzureOpenAI(
azure_endpoint=os.getenv('AZURE_API_BASE'),
azure_deployment=os.getenv('AZURE_DEPLOYMENT_ID'),
api_key=os.getenv('AZURE_OPENAI_KEY'),
api_version=os.getenv('AZURE_API_VERSION')
)
response = client.chat.completions.create(
model=os.getenv('AZURE_DEPLOYMENT_ID'),
messages=context_messages,
temperature=0.7,
max_tokens=summary_max_tokens
)
elif MODEL_CHOICE == "openai":
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
openai_model = os.getenv("ANALYSIS_MODEL", OPENAI_MODELS['base'])
response = client.responses.create(
model=openai_model,
instructions="You are an expert summarizing content for use as context. Focus on the main points.",
input=str(prompt),
max_output_tokens=summary_max_tokens,
)
elif MODEL_CHOICE == "groq":
client = OpenAI(
api_key=os.getenv('GROQ_API_KEY'),
base_url="https://api.groq.com/openai/v1"
)
response = client.chat.completions.create(
model=os.getenv("ANALYSIS_MODEL", "llama-3.1-8b-instant"),
messages=context_messages,
temperature=0.7,
max_tokens=summary_max_tokens
)
else:
return "Error: Invalid model choice. Please select 'azure', 'openai', or 'groq'."
if MODEL_CHOICE == "openai":
summary = extract_responses_text(response)
usage = getattr(response, "usage", None)
TOKENS_SUMMARIZATION += getattr(usage, "total_tokens", 0) or 0
else:
summary = response.choices[0].message.content or ""
TOKENS_SUMMARIZATION += response.usage.total_tokens
if not summary.strip():
return f"""<article_summary>
# SUMMARY:
Empty summary generated. IGNORE THIS OUTPUT.
# URL: {url}
</article_summary>
"""
remaining_budget = RETRIEVAL_CONTEXT_BUDGET_TOKENS - RETRIEVAL_CONTEXT_USED_TOKENS
if remaining_budget <= 0:
return f"""<article_summary>
# SUMMARY:
Retrieval context budget reached. IGNORE THIS OUTPUT.
# URL: {url}
</article_summary>
"""
summary = trim_text_to_token_budget(summary, remaining_budget)
consumed_tokens = estimate_tokens(summary)
RETRIEVAL_CONTEXT_USED_TOKENS += consumed_tokens
summary_response = f"""<article_summary>
# SUMMARY:
{summary}
# URL: {url}
</article_summary>
"""
# Include a delay to reduce likelihood of Groq rate limiting.
if MODEL_CHOICE == "groq":
time.sleep(10)
return summary_response
except Exception as e:
logger.error("Error generating summary: %s", str(e))
return f"""<article_summary>
# SUMMARY:
Error generating summary.
IGNORE THIS OUTPUT.
# URL: {url}
</article_summary>
"""
def capture_verbose_output(
agent_input,
provider_choice,
analysis_model,
azure_openai_key,
azure_api_base,
azure_api_version,
openai_api_key,
groq_api_key
):
"""
This generator captures stdout produced by the multi-agent process in real time,
updating the Gradio interface with logs, while returning the final result once done.
"""
old_stdout = sys.stdout
mystdout = StringIO()
sys.stdout = mystdout
result_container = [("", "")]
def run_kickoff():
# Create a new event loop for this thread to ensure clean async handling
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result_container[0] = kickoff_crew(
topic=agent_input,
provider_choice=provider_choice,
analysis_model=analysis_model,
azure_openai_key=azure_openai_key,
azure_api_base=azure_api_base,
azure_api_version=azure_api_version,
openai_api_key=openai_api_key,
groq_api_key=groq_api_key
)
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
except Exception:
pass
loop.close()
kickoff_thread = threading.Thread(target=run_kickoff)
kickoff_thread.start()
verbose_output = ""
result_output = ""
stats_output = ""
# Initialize outputs
yield result_output, verbose_output, stats_output
# Regex to match ANSI escape codes
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
while kickoff_thread.is_alive():
# Read new output from mystdout
new_output = mystdout.getvalue()
if new_output != verbose_output:
verbose_output = new_output
clean_verbose = ansi_escape.sub('', verbose_output)
yield result_output, clean_verbose, stats_output
time.sleep(0.1)
# Once done, get final result
kickoff_thread.join()
sys.stdout = old_stdout
result_output, stats_output = result_container[0]
verbose_output = mystdout.getvalue()
clean_verbose = ansi_escape.sub('', verbose_output)
yield result_output, clean_verbose, stats_output
def kickoff_crew(
topic: str,
provider_choice: str,
analysis_model: str,
azure_openai_key: str,
azure_api_base: str,
azure_api_version: str,
openai_api_key: str,
groq_api_key: str
) -> tuple[str, str]:
"""
Kick off the multi-agent pipeline.
"""
try:
global TOKENS_SUMMARIZATION, MODEL_CHOICE, RETRIEVAL_CONTEXT_BUDGET_TOKENS, RETRIEVAL_CONTEXT_USED_TOKENS
TOKENS_SUMMARIZATION = 0
MODEL_CHOICE = provider_choice
os.environ["ANALYSIS_MODEL"] = analysis_model
token_limits = get_model_token_limits(analysis_model)
RETRIEVAL_CONTEXT_BUDGET_TOKENS = get_retrieval_context_budget_tokens(analysis_model)
RETRIEVAL_CONTEXT_USED_TOKENS = 0
# Basic checks
if not topic.strip():
return "Error: The topic cannot be empty. Please provide a valid topic.", ""
if not analysis_model:
return "Error: Select an analysis model before starting.", ""
# ---- Define LLMs based on the user-provided inputs ----
# Inicializa las variables de los modelos con None
azure_llm_base = None
azure_llm_advanced = None
openai_llm_base = None
openai_llm_advanced = None
groq_llm_base = None
groq_llm_advanced = None
if provider_choice == "azure":
if not azure_openai_key or not azure_api_base or not azure_api_version:
return "Error: Please provide all the required Azure OpenAI API details.", ""
else:
os.environ['AZURE_API_BASE']=azure_api_base
os.environ['AZURE_API_VERSION']=azure_api_version
os.environ['AZURE_DEPLOYMENT_ID']=analysis_model
os.environ['AZURE_OPENAI_KEY']=azure_openai_key
# Azure
azure_llm_base = LLM(
temperature=0.3,
model=f"azure/{analysis_model}",
api_key=azure_openai_key,
base_url=azure_api_base,
api_version=azure_api_version,
max_tokens=token_limits["base"]
)
azure_llm_advanced = LLM(
temperature=0.6,
model=f"azure/{analysis_model}",
api_key=azure_openai_key,
base_url=azure_api_base,
api_version=azure_api_version,
max_tokens=token_limits["advanced"]
)
elif provider_choice == "openai":
if not openai_api_key:
return "Error: Please provide the OpenAI API key.", ""
if not openai_api_key.strip().startswith("sk-"):
return "Error: Invalid OpenAI API key format. It should start with 'sk-'.", ""
if analysis_model not in OPENAI_ALLOWED_MODELS:
return (
f"Error: OpenAI model '{analysis_model}' is not allowed. "
f"Use one of: {', '.join(OPENAI_ALLOWED_MODELS)}.",
"",
)
else:
os.environ['OPENAI_API_KEY']=openai_api_key
# OpenAI
openai_llm_base = LLM(
provider="openai",
model=analysis_model,
api_key=openai_api_key,
max_completion_tokens=token_limits["base"]
)
openai_llm_advanced = LLM(
provider="openai",
model=analysis_model,
api_key=openai_api_key,
max_completion_tokens=token_limits["advanced"]
)
elif provider_choice == "groq":
if not groq_api_key:
return "Error: Please provide the GROQ API key.", ""
if not groq_api_key.strip().startswith("gsk_"):
return "Error: Invalid GROQ API key format. It should start with 'gsk_'.", ""
else:
os.environ['GROQ_API_KEY']=groq_api_key
# Route Groq through OpenAI-compatible native provider to avoid LiteLLM fallback.
groq_llm_base = LLM(
model=analysis_model,
provider="openai",
api_key=groq_api_key,
base_url="https://api.groq.com/openai/v1",
temperature=0.3,
max_tokens=token_limits["base"]
)
groq_llm_advanced = LLM(
model=analysis_model,
provider="openai",
api_key=groq_api_key,
base_url="https://api.groq.com/openai/v1",
temperature=0.6,
max_tokens=token_limits["advanced"]
)
# Diccionario para agrupar los LLM
llms = {
"azure": {
"base": azure_llm_base,
"advanced": azure_llm_advanced
},
"openai": {
"base": openai_llm_base,
"advanced": openai_llm_advanced
},
"groq": {
"base": groq_llm_base,
"advanced": groq_llm_advanced
}
}
# Obtain the selected LLM set
if provider_choice not in llms:
return f"Error: Invalid model choice. Please select from {list(llms.keys())}.", ""
selected_llm = llms[provider_choice]
# Define Agents
researcher = Agent(
role='Researcher',
goal=f'Search and collect detailed information on topic ## {topic} ##',
tools=[search_results, web_scrapper],
llm=selected_llm["base"],
backstory=(
"You are a meticulous researcher, skilled at navigating vast amounts of information to extract "
"essential insights on any given topic. Your dedication to detail ensures the reliability and "
"thoroughness of your findings."
),
allow_delegation=False,
max_iter=8,
max_rpm=5 if provider_choice == "groq" else 120,
verbose=True
)
editor = Agent(
role='Editor',
goal=f'Compile and refine the information into a comprehensive report on topic ## {topic} ##',
llm=selected_llm["advanced"],
backstory=(
"As an expert editor, you specialize in transforming raw data into clear, engaging reports. "
"Your strong command of language and attention to detail ensure that each report not only conveys "
"essential insights but is also easily understandable to diverse audiences."
),
allow_delegation=False,
max_iter=5,
max_rpm=10 if provider_choice == "groq" else 120,
verbose=True
)
# Define Tasks
research_task = Task(
description=(
"Be sure to translate the topic into English first. "
"Use the DuckDuckGoSearchResults tool to collect initial search snippets on ## {topic} ##. "
"If more detailed searches are required, generate and execute new searches related to ## {topic} ##. "
"Prioritize high-signal sources and limit deep scraping to the 3 most relevant URLs. "
"Subsequently, employ the WebScrapper tool to extract information from significant URLs, "
"extracting further insights. Compile these findings into a preliminary draft, documenting all "
"relevant sources, titles, and links associated with the topic. "
"Ensure high accuracy throughout the process and avoid any fabrication of information. "
"Never output tool invocation syntax or ask for user input; always return a written draft, "
"and if evidence is limited, clearly mark uncertainties."
),
expected_output=(
"A structured draft report about the topic, featuring an introduction, a detailed main body, "
"and a conclusion. Properly cite sources. Provide a thorough overview of the info gathered."
),
agent=researcher
)
edit_task = Task(
description=(
"Review and refine the initial draft report from the research task. Organize the content logically. "
"Elaborate on each section to provide in-depth information and insights. "
"Verify the accuracy of all data, correct discrepancies, update info to ensure currency, "
"and maintain a consistent tone. Include a section listing all sources used, formatted as bullet points. "
"Do not ask the user for additional information; produce the best possible final report with available context."
),
expected_output=(
"A polished, comprehensive report on topic ## {topic} ##, with a clear, professional narrative. "
"Include an introduction, an extensive discussion, a concise conclusion, and a source list with references."
),
agent=editor,
context=[research_task]
)
# Form the Crew
crew = Crew(
agents=[researcher, editor],
tasks=[research_task, edit_task],
process=Process.sequential
)
# Kick off
total_time_start = time.perf_counter()
result = crew.kickoff(inputs={'topic': topic})
total_time_seconds = time.perf_counter() - total_time_start
# Compute token usage with backward/forward compatibility across CrewAI versions.
total_tokens = 0
token_usage = getattr(result, "token_usage", None)
usage_metrics = getattr(result, "usage_metrics", None)
if token_usage is not None and hasattr(token_usage, "total_tokens"):
total_tokens = token_usage.total_tokens
elif usage_metrics is not None:
if isinstance(usage_metrics, dict):
total_tokens = usage_metrics.get("total_tokens", 0)
elif hasattr(usage_metrics, "total_tokens"):
total_tokens = usage_metrics.total_tokens
tokens = total_tokens / 1_000
tokens_summ = TOKENS_SUMMARIZATION / 1_000
def task_duration_seconds(task: Task) -> float:
if task.start_time and task.end_time:
return (task.end_time - task.start_time).total_seconds()
return 0.0
researcher_seconds = task_duration_seconds(research_task)
editor_seconds = task_duration_seconds(edit_task)
stats_output = (
"### Statistics\n"
f"- Provider: `{provider_choice}`\n"
f"- Analysis model: `{analysis_model}`\n"
f"- Estimated tokens (Agents): `{tokens:.5f} k`\n"
f"- Estimated tokens (Summarization): `{tokens_summ:.5f} k`\n"
f"- Retrieval context used: `{RETRIEVAL_CONTEXT_USED_TOKENS}` tokens\n"
f"- Retrieval context budget: `{RETRIEVAL_CONTEXT_BUDGET_TOKENS}` tokens\n"
f"- Researcher time: `{researcher_seconds:.2f} s`\n"
f"- Editor time: `{editor_seconds:.2f} s`\n"
f"- Total execution time: `{total_time_seconds:.2f} s`"
)
if not isinstance(result, str):
result = str(result)
return result, stats_output
except Exception as e:
logger.error("Error in kickoff_crew: %s", str(e))
return f"Error in kickoff_crew: {str(e)}", ""
# Set up the Gradio interface for the CrewAI Research Tool.
description_demo = """# Automatic Insights Generation with Multi-Agents (CrewAI)
- **Multi-agent framework**: CrewAI
- **Multi-agents**: Two agents, Researcher and Editor, working together to extract information from the internet and compile a report on the topic of choice.
- **Search tool**: Duck-Duck-Go-Search
- **Web Retrieval**: Newspaper4k and PDF
"""
APP_CSS = """
#verbose_output { font-family: monospace; }
#provider-block {
border: 1px solid var(--border-color-primary) !important;
border-radius: 12px !important;
padding: 12px !important;
background: var(--block-background-fill) !important;
}
#provider-block h3,
#provider-block label,
#provider-block .label-wrap,
#provider-block .label-text,
#provider-block .wrap > label {
color: #111827 !important;
opacity: 1 !important;
}
#provider-block .prose p,
#provider-block .prose strong,
#provider-block .prose em {
color: #111827 !important;
}
"""
with gr.Blocks() as demo:
gr.Markdown(description_demo)
with gr.Row():
with gr.Column(scale=1):
with gr.Column(elem_id="provider-block"):
gr.Markdown("### Choose Provider")
provider_choice = gr.Radio(
choices=["azure", "openai", "groq"],
label="Provider",
value="openai",
interactive=True
)
# ------------
# LLM config inputs
# ------------
# Azure
azure_api_base_input = gr.Textbox(label="Azure API Base (url)", type="password", visible=False, interactive=True)
azure_openai_key_input = gr.Textbox(label="Azure API Key", type="password", visible=False, interactive=True)
azure_api_version_input = gr.Textbox(label="Azure API Version", type="text", visible=False, interactive=True)
# OpenAI
openai_api_key_input = gr.Textbox(label="OpenAI API Key", type="password", visible=True, interactive=True)
# GROQ
groq_api_key_input = gr.Textbox(label="GROQ API Key", type="password", visible=False, interactive=False)
load_models_button = gr.Button("Load Available Models", interactive=False)
model_status = gr.Markdown("Load models to enable research.")
analysis_model = gr.Dropdown(
choices=[],
value=None,
label="Analysis Model",
info="Load available models for the selected provider and keys.",
interactive=True
)
export_button = gr.Button("Export to Markdown", interactive=True)
file_output = gr.File(label="Download Markdown File")
credits = gr.Markdown(
label="Credits",
value="This tool is powered by [CrewAI](https://crewai.com), "
"[OpenAI](https://openai.com), "
"[Azure OpenAI Services](https://azure.microsoft.com/en-us/products/ai-services/openai-service), "
"and [GROQ](https://console.groq.com/playground).",
)
with gr.Column(scale=2):
topic_input = gr.Textbox(
label="Enter Topic",
placeholder="Type here the topic of interest...",
interactive=True
)
submit_button = gr.Button("Start Research", interactive=False)
output = gr.Markdown(
label="Result",
value="The generated insighsts will appear here...",
latex_delimiters=[
{"left": "\\[", "right": "\\]", "display": True},
{"left": "\\(", "right": "\\)", "display": False},
]
)
verbose_output = gr.Textbox(
label="Verbose Output",
placeholder="Verbose logs will appear here...",
lines=10,
interactive=False,
elem_id="verbose_output"
)
stats_output = gr.Markdown(
label="Statistics",
value="Statistics will appear after execution."
)
# ---------------
# Dynamic toggling of LLM config boxes
# ---------------
def update_provider_inputs(provider):
"""Update visibility of config inputs based on the selected LLM."""
azure_visibility = False
openai_visibility = False
groq_visibility = False
if provider == "azure":
azure_visibility = True
elif provider == "openai":
openai_visibility = True
elif provider == "groq":
groq_visibility = True
return (
gr.update(visible=azure_visibility, interactive=azure_visibility),
gr.update(visible=azure_visibility, interactive=azure_visibility),
gr.update(visible=azure_visibility, interactive=azure_visibility),
gr.update(visible=openai_visibility, interactive=openai_visibility),
gr.update(visible=groq_visibility, interactive=groq_visibility),
gr.update(choices=[], value=None),
gr.update(value="Load models to enable research."),
gr.update(interactive=False),
)
def invalidate_model_selection(provider, azure_key, azure_base, azure_version, openai_key, groq_key):
"""Disable start until model list is loaded again with current credentials."""
load_enabled = False
status_message = "Credentials changed. Reload available models."
if provider == "openai":
load_enabled = bool(openai_key)
status_message = (
"Credentials ready. Click Load Available Models."
if load_enabled
else "Enter OpenAI API key to load models."
)
elif provider == "azure":
load_enabled = bool(azure_key and azure_base and azure_version)
status_message = (
"Credentials ready. Click Load Available Models."
if load_enabled
else "Enter Azure API base, version and API key to load models."
)
elif provider == "groq":
load_enabled = bool(groq_key)
status_message = (
"Credentials ready. Click Load Available Models."
if load_enabled
else "Enter GROQ API key to load models."
)
return (
gr.update(choices=[], value=None),
gr.update(value=status_message),
gr.update(interactive=False),
gr.update(interactive=load_enabled),
)
def toggle_submit_from_model(selected_model):
"""Start is enabled only if an analysis model is selected."""
return gr.update(interactive=bool(selected_model))
def load_available_models(provider, azure_key, azure_base, azure_version, openai_key, groq_key):
"""Fetch provider models and enable Start Research only with valid credentials + selected model."""
try:
model_ids = []
if provider == "openai":
if not openai_key:
return (
gr.update(choices=[], value=None),
gr.update(value="Enter OpenAI API key and load models."),
gr.update(interactive=False)
)
if not openai_key.strip().startswith("sk-"):
return (
gr.update(choices=[], value=None),
gr.update(value="Invalid OpenAI API key format. It should start with 'sk-'."),
gr.update(interactive=False)
)
model_ids = list_openai_models(openai_key)
elif provider == "azure":
if not azure_key or not azure_base or not azure_version:
return (
gr.update(choices=[], value=None),
gr.update(value="Enter Azure API base, version and key, then load models."),
gr.update(interactive=False)
)
model_ids = list_azure_models(azure_key, azure_base, azure_version)
elif provider == "groq":
if not groq_key:
return (
gr.update(choices=[], value=None),
gr.update(value="Enter GROQ API key and load models."),
gr.update(interactive=False)
)
if not groq_key.strip().startswith("gsk_"):
return (
gr.update(choices=[], value=None),
gr.update(value="Invalid GROQ API key format. It should start with 'gsk_'."),
gr.update(interactive=False)
)
model_ids = list_groq_models(groq_key)
else:
return (
gr.update(choices=[], value=None),
gr.update(value="Select a provider first."),
gr.update(interactive=False)
)
if not model_ids:
return (
gr.update(choices=[], value=None),
gr.update(value="No models found for the selected provider."),
gr.update(interactive=False)
)
return (
gr.update(choices=model_ids, value=model_ids[0]),
gr.update(value=f"Loaded {len(model_ids)} models. Ready to start."),
gr.update(interactive=True)
)
except Exception as e:
return (
gr.update(choices=[], value=None),
gr.update(value=f"Error loading models: {str(e)}"),
gr.update(interactive=False)
)
provider_choice.change(
fn=update_provider_inputs,
inputs=[provider_choice],
outputs=[
azure_openai_key_input,
azure_api_base_input,
azure_api_version_input,
openai_api_key_input,
groq_api_key_input,
analysis_model,
model_status,
submit_button,
]
)
provider_choice.change(
fn=invalidate_model_selection,
inputs=[
provider_choice,
azure_openai_key_input,
azure_api_base_input,
azure_api_version_input,
openai_api_key_input,
groq_api_key_input,
],
outputs=[analysis_model, model_status, submit_button, load_models_button]
)
for input_component in [
azure_openai_key_input,
azure_api_base_input,
azure_api_version_input,
openai_api_key_input,
groq_api_key_input,
]:
input_component.change(
fn=invalidate_model_selection,
inputs=[
provider_choice,
azure_openai_key_input,
azure_api_base_input,
azure_api_version_input,
openai_api_key_input,
groq_api_key_input,
],
outputs=[analysis_model, model_status, submit_button, load_models_button]
)
load_models_button.click(
fn=load_available_models,
inputs=[
provider_choice,
azure_openai_key_input,
azure_api_base_input,
azure_api_version_input,
openai_api_key_input,
groq_api_key_input
],
outputs=[analysis_model, model_status, submit_button]
)
analysis_model.change(
fn=toggle_submit_from_model,
inputs=[analysis_model],
outputs=[submit_button]
)
submit_button.click(
fn=capture_verbose_output,
inputs=[
topic_input,
provider_choice,
analysis_model,
azure_openai_key_input,
azure_api_base_input,
azure_api_version_input,
openai_api_key_input,
groq_api_key_input
],
outputs=[output, verbose_output, stats_output]
)
export_button.click(
fn=export_to_markdown,
inputs=output,
outputs=file_output
)
demo.queue(max_size=3)
def launch_app(debug_mode: bool = False) -> None:
"""Launch Gradio app with optional debug settings."""
demo.launch(
css=APP_CSS,
ssr_mode=False,
debug=debug_mode,
show_error=debug_mode,
)
def str_to_bool(value: str) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode with local auto-reload when available.",
)
args = parser.parse_args()
debug_mode = args.debug or str_to_bool(os.getenv("DEBUG", "0"))
if debug_mode:
try:
from watchfiles import DefaultFilter, run_process
class AppWatchFilter(DefaultFilter):
def __call__(self, change, path): # type: ignore[override]
excluded = ("/venv/", "/.venv/", "/.git/", "__pycache__", ".pytest_cache")
if any(token in path for token in excluded):
return False
return super().__call__(change, path)
print("Debug auto-reload enabled. Watching project files for changes...")
run_process(
".",
target=launch_app,
kwargs={"debug_mode": True},
watch_filter=AppWatchFilter(),
)
except Exception as e:
print(f"Auto-reload not available ({e}). Starting in debug mode without watcher.")
launch_app(debug_mode=True)
else:
launch_app(debug_mode=False)