| from config import OPENAI_MODELS |
|
|
| import argparse |
| import os |
| import gradio as gr |
| import requests |
| import logging |
| import re |
| import warnings |
| from typing import Optional |
| import base64 |
| import urllib.parse |
|
|
| |
| warnings.filterwarnings("ignore", category=UserWarning, module="pydantic") |
| |
| warnings.filterwarnings("ignore", message=".*Pydantic serializer warnings.*", category=UserWarning) |
|
|
| from openai import AzureOpenAI, OpenAI |
|
|
| from crewai import Agent, Task, Crew, Process, LLM |
| from crewai.tools import tool |
| from duckduckgo_search import DDGS |
| from newspaper import Article, Config as NewspaperConfig |
| from bs4 import BeautifulSoup |
| try: |
| import trafilatura |
| except Exception: |
| trafilatura = None |
|
|
| try: |
| from readability import Document |
| except Exception: |
| Document = None |
|
|
| try: |
| from playwright.sync_api import sync_playwright |
| except Exception: |
| sync_playwright = None |
| try: |
| from markitdown import MarkItDown |
| except Exception: |
| MarkItDown = None |
| import fitz |
| from io import BytesIO, StringIO |
| import sys |
|
|
| import asyncio |
| import threading |
| import time |
|
|
| |
| logger = logging.getLogger(__name__) |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logging.getLogger("httpx").setLevel(logging.WARNING) |
|
|
| def setup_logging(): |
| """Set up logging for better error tracking.""" |
| logger = logging.getLogger(__name__) |
| logger.setLevel(logging.INFO) |
|
|
| |
| if logger.hasHandlers(): |
| logger.handlers.clear() |
|
|
| |
| handler = logging.StreamHandler(sys.stdout) |
| formatter = logging.Formatter('%(asctime)s | %(levelname)-8s | %(message)s', datefmt='%H:%M:%S') |
| handler.setFormatter(formatter) |
| logger.addHandler(handler) |
|
|
| return logger |
|
|
| |
| TOKENS_SUMMARIZATION = 0 |
| MODEL_CHOICE = "openai" |
| RETRIEVAL_CONTEXT_BUDGET_TOKENS = 0 |
| RETRIEVAL_CONTEXT_USED_TOKENS = 0 |
|
|
| GROQ_MODEL_PREFIXES = ("llama", "mixtral", "gemma", "qwen", "deepseek") |
| OPENAI_ALLOWED_MODELS = ("gpt-5.2", "gpt-5-mini", "gpt-5-nano") |
| STANDARD_BROWSER_HEADERS = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/124.0.0.0 Safari/537.36" |
| ), |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "Accept-Language": "en-US,en;q=0.9,es;q=0.8", |
| "Cache-Control": "no-cache", |
| "Pragma": "no-cache", |
| "Upgrade-Insecure-Requests": "1", |
| } |
|
|
|
|
| def list_openai_models(api_key: str) -> list[str]: |
| """Return allowed OpenAI model ids present in the account.""" |
| client = OpenAI(api_key=api_key) |
| models = client.models.list() |
| available = {model.id for model in models.data} |
| return [model for model in OPENAI_ALLOWED_MODELS if model in available] |
|
|
|
|
| def list_groq_models(api_key: str) -> list[str]: |
| """Return sorted Groq model ids using the OpenAI-compatible models endpoint.""" |
| client = OpenAI(api_key=api_key, base_url="https://api.groq.com/openai/v1") |
| models = client.models.list() |
| model_ids = sorted( |
| { |
| model.id |
| for model in models.data |
| if any(prefix in model.id.lower() for prefix in GROQ_MODEL_PREFIXES) |
| } |
| ) |
| return model_ids |
|
|
|
|
| def list_azure_models(api_key: str, api_base: str, api_version: str) -> list[str]: |
| """ |
| Return sorted Azure model ids available in the resource. |
| Azure may expose model families instead of deployment names depending on config. |
| """ |
| client = AzureOpenAI( |
| api_key=api_key, |
| azure_endpoint=api_base, |
| api_version=api_version, |
| ) |
| models = client.models.list() |
| model_ids = sorted({model.id for model in models.data}) |
| return model_ids |
|
|
|
|
| def get_model_token_limits(model_id: str) -> dict[str, int]: |
| """ |
| Heuristic output-token limits based on the selected model family. |
| Returns per-use caps for: base agent, advanced agent, and retrieval summary. |
| """ |
| model = (model_id or "").lower() |
|
|
| |
| if model.startswith(("o1", "o3", "o4")): |
| return {"base": 2500, "advanced": 3500, "summary": 3000} |
|
|
| |
| if "nano" in model: |
| return {"base": 800, "advanced": 1200, "summary": 1200} |
| if "mini" in model or "8b" in model: |
| |
| return {"base": 6000, "advanced": 8000, "summary": 2200} |
|
|
| |
| if "32b" in model or "70b" in model: |
| return {"base": 2500, "advanced": 4000, "summary": 3000} |
|
|
| |
| return {"base": 3000, "advanced": 5000, "summary": 3500} |
|
|
|
|
| def estimate_tokens(text: str) -> int: |
| """Rough token estimator (conservative enough for budgeting).""" |
| if not text: |
| return 0 |
| return max(1, len(text) // 4) |
|
|
|
|
| def trim_text_to_token_budget(text: str, max_tokens: int) -> str: |
| """Trim text to an approximate token budget using char-based estimation.""" |
| if max_tokens <= 0: |
| return "" |
| max_chars = max_tokens * 4 |
| if len(text) <= max_chars: |
| return text |
| return text[:max_chars].rstrip() |
|
|
|
|
| def get_retrieval_context_budget_tokens(model_id: str) -> int: |
| """ |
| Budget for accumulated retrieval summaries passed through the agent context. |
| Keep it conservative to avoid overflow when assembling the final report. |
| """ |
| limits = get_model_token_limits(model_id) |
| |
| return max(3500, min(14000, limits["advanced"] * 3)) |
|
|
|
|
| def extract_responses_text(response: object) -> str: |
| """Robustly extract text from OpenAI Responses API payloads.""" |
| text = getattr(response, "output_text", "") or "" |
| if text.strip(): |
| return text |
|
|
| chunks: list[str] = [] |
| for item in (getattr(response, "output", None) or []): |
| for content in (getattr(item, "content", None) or []): |
| maybe_text = getattr(content, "text", None) |
| if maybe_text: |
| chunks.append(maybe_text) |
| return "\n".join(chunks).strip() |
|
|
| def export_to_markdown(result): |
| """Utility to export the final result to an output.md file.""" |
| try: |
| os.makedirs("outputs", exist_ok=True) |
| with open("outputs/output.md", "w") as file: |
| file.write(result) |
| return "outputs/output.md" |
| except Exception as e: |
| logger.error("Error exporting to markdown: %s", str(e)) |
| return f"Error exporting: {e}" |
|
|
|
|
| def _fetch_html_with_retry(url: str, headers: dict[str, str], timeout: int = 15) -> Optional[str]: |
| """Fetch HTML with SSL fallback for endpoints with TLS quirks.""" |
| try: |
| response = requests.get(url, headers=headers, timeout=timeout) |
| response.raise_for_status() |
| return response.text |
| except requests.exceptions.RequestException as get_err: |
| if "SSL" in str(get_err).upper(): |
| response = requests.get(url, headers=headers, timeout=timeout, verify=False) |
| response.raise_for_status() |
| return response.text |
| raise |
|
|
|
|
| def _extract_with_newspaper(url: str, config: NewspaperConfig) -> str: |
| article = Article(url, config=config) |
| article.download() |
| article.parse() |
| return (article.text or "").strip() |
|
|
|
|
| def _extract_with_trafilatura(html: str, url: str) -> str: |
| if trafilatura is None: |
| return "" |
| extracted = trafilatura.extract( |
| html, |
| url=url, |
| include_links=False, |
| include_images=False, |
| favor_precision=True, |
| output_format="txt", |
| ) |
| return (extracted or "").strip() |
|
|
|
|
| def _extract_with_readability(html: str) -> str: |
| if Document is None: |
| return "" |
| doc = Document(html) |
| main_html = doc.summary() |
| soup = BeautifulSoup(main_html, "lxml") |
| for tag in soup(["script", "style", "noscript", "svg"]): |
| tag.decompose() |
| return " ".join(soup.stripped_strings).strip() |
|
|
|
|
| def _extract_with_bs4(html: str) -> str: |
| soup = BeautifulSoup(html, "lxml") |
| for tag in soup(["script", "style", "noscript", "svg", "footer", "header"]): |
| tag.decompose() |
| return " ".join(soup.stripped_strings).strip() |
|
|
|
|
| def _extract_with_markitdown(url: str, headers: dict[str, str]) -> str: |
| """Fetch URL and convert to markdown using Microsoft MarkItDown.""" |
| if MarkItDown is None: |
| return "" |
| session = requests.Session() |
| session.headers.update(headers) |
| md = MarkItDown(requests_session=session) |
| result = md.convert_uri(url) |
| return (getattr(result, "text_content", "") or "").strip() |
|
|
|
|
| def _clean_extracted_content(text: str) -> str: |
| """ |
| Remove common navigation/boilerplate noise from markdown-like extraction output. |
| Falls back to original content if cleaning is too aggressive. |
| """ |
| if not text: |
| return "" |
|
|
| stop_markers = ( |
| "explora en nuestros medios", |
| "más sitios que te gustarán", |
| "ediciones internacionales", |
| "preferencias de privacidad", |
| "xatakaletter", |
| ) |
| drop_contains = ( |
| "ver más artículos", |
| "ver más vídeos", |
| "facebook", |
| "twitter", |
| "flipboard", |
| "temas de interés", |
| ) |
|
|
| link_re = re.compile(r"\[[^\]]+\]\([^)]+\)") |
| image_re = re.compile(r"!\[[^\]]*\]\([^)]+\)") |
| multi_space_re = re.compile(r"[ \t]{2,}") |
|
|
| def plain_without_links(line: str) -> str: |
| line = image_re.sub(" ", line) |
| line = link_re.sub(" ", line) |
| line = multi_space_re.sub(" ", line) |
| return line.strip() |
|
|
| working = text |
| lowered = working.lower() |
| cut_at = None |
| for marker in stop_markers: |
| pos = lowered.find(marker) |
| if pos != -1 and (cut_at is None or pos < cut_at): |
| cut_at = pos |
| if cut_at is not None: |
| working = working[:cut_at] |
|
|
| kept_lines: list[str] = [] |
| dropped = 0 |
| testimonial_cards_seen = 0 |
|
|
| for raw_line in working.splitlines(): |
| line = raw_line.rstrip() |
| stripped = line.strip() |
| lower = stripped.lower() |
|
|
| if not stripped: |
| continue |
| if any(p in lower for p in drop_contains): |
| dropped += 1 |
| continue |
|
|
| links = len(link_re.findall(stripped)) |
| plain = plain_without_links(stripped) |
| if links >= 3 and len(plain) < 30: |
| dropped += 1 |
| continue |
| if ("unavatar.io" in lower or "dicebear.com" in lower) and "[![" in lower: |
| testimonial_cards_seen += 1 |
| if testimonial_cards_seen > 10: |
| dropped += 1 |
| continue |
|
|
| kept_lines.append(line) |
|
|
| cleaned = re.sub(r"\n{3,}", "\n\n", "\n".join(kept_lines)).strip() |
|
|
| |
| if len(cleaned) < max(200, int(len(text) * 0.15)): |
| return text.strip() |
| return cleaned |
|
|
|
|
| def _extract_with_playwright_reader(url: str, user_agent: str) -> str: |
| """ |
| Render page with Playwright and extract reader-style main content. |
| Returns empty string if Playwright is unavailable or extraction fails. |
| """ |
| if sync_playwright is None: |
| return "" |
|
|
| try: |
| with sync_playwright() as p: |
| browser = p.chromium.launch(headless=True) |
| context = browser.new_context(user_agent=user_agent, locale="en-US") |
| page = context.new_page() |
| page.goto(url, wait_until="domcontentloaded", timeout=30000) |
| try: |
| page.wait_for_load_state("networkidle", timeout=10000) |
| except Exception: |
| pass |
|
|
| rendered_html = page.content() |
| context.close() |
| browser.close() |
|
|
| |
| text = _extract_with_readability(rendered_html) |
| if text: |
| return text |
|
|
| text = _extract_with_trafilatura(rendered_html, url) |
| if text: |
| return text |
|
|
| return _extract_with_bs4(rendered_html) |
| except Exception as e: |
| logger.warning("Playwright reader extraction failed: %s", str(e)) |
| return "" |
|
|
|
|
| def fetch_content(url): |
| """ |
| Fetch the content from a URL, handling either PDFs or normal web articles. |
| - url: The URL to fetch the content from. |
| """ |
| try: |
| |
| |
| content_type = "" |
| try: |
| response = requests.head(url, allow_redirects=True, timeout=10) |
| content_type = response.headers.get('Content-Type', '').lower() |
| except requests.exceptions.RequestException as head_err: |
| logger.warning("HEAD request failed, continuing with GET/article extraction: %s", str(head_err)) |
| |
| if 'application/pdf' in content_type: |
| |
| pdf_response = requests.get(url, stream=True, timeout=10) |
| pdf_response.raise_for_status() |
| |
| pdf_file = BytesIO(pdf_response.content) |
| with fitz.open(stream=pdf_file, filetype="pdf") as doc: |
| text = "" |
| for page_num, page in enumerate(doc, start=1): |
| page_text = page.get_text() |
| if page_text: |
| text += page_text |
| else: |
| logger.warning(f"Unable to extract text from page {page_num} of the PDF.") |
| return text.strip() |
| else: |
| |
| article_config = NewspaperConfig() |
| article_config.allow_binary_content = True |
| article_config.browser_user_agent = STANDARD_BROWSER_HEADERS["User-Agent"] |
| article_config.request_timeout = 15 |
|
|
| headers = dict(STANDARD_BROWSER_HEADERS) |
| html_text = "" |
| try: |
| html_text = _fetch_html_with_retry(url, headers=headers, timeout=15) or "" |
| except Exception as html_err: |
| logger.warning("Direct HTML fetch failed: %s", str(html_err)) |
|
|
| try: |
| article_text = _extract_with_newspaper(url, article_config) |
| if article_text: |
| logger.info("Retrieval extractor used: newspaper") |
| return _clean_extracted_content(article_text) |
| except Exception as article_err: |
| logger.warning("Newspaper extraction failed: %s", str(article_err)) |
|
|
| try: |
| markitdown_text = _extract_with_markitdown(url, headers) |
| if markitdown_text: |
| logger.info("Retrieval extractor used: markitdown") |
| return _clean_extracted_content(markitdown_text) |
| except Exception as md_err: |
| logger.warning("MarkItDown extraction failed: %s", str(md_err)) |
|
|
| if html_text: |
| trafilatura_text = _extract_with_trafilatura(html_text, url) |
| if trafilatura_text: |
| logger.info("Retrieval extractor used: trafilatura") |
| return _clean_extracted_content(trafilatura_text) |
|
|
| readability_text = _extract_with_readability(html_text) |
| if readability_text: |
| logger.info("Retrieval extractor used: readability") |
| return _clean_extracted_content(readability_text) |
|
|
| bs4_text = _extract_with_bs4(html_text) |
| if bs4_text: |
| logger.info("Retrieval extractor used: beautifulsoup") |
| return _clean_extracted_content(bs4_text) |
|
|
| playwright_text = _extract_with_playwright_reader(url, article_config.browser_user_agent) |
| if playwright_text: |
| logger.info("Retrieval extractor used: playwright-reader") |
| return _clean_extracted_content(playwright_text) |
|
|
| raise requests.exceptions.RequestException("Unable to extract readable content with available extractors.") |
| except requests.exceptions.RequestException as req_err: |
| logger.error("Error in the HTTP request: %s", str(req_err)) |
| return f"Error in the HTTP request: {req_err}" |
| except Exception as e: |
| logger.error("Error getting the content: %s", str(e)) |
| return f"Error getting the content: {e}" |
|
|
| |
| @tool('DuckDuckGoSearchResults') |
| def search_results(search_query: str) -> list: |
| """ |
| Performs a web search to gather and return a collection of search results with this structure: |
| - title: The title of the search result. |
| - snippet: A short snippet of the search result. |
| - link: The link to the search result. |
| """ |
| def _decode_bing_redirect_url(url: str) -> str: |
| parsed = urllib.parse.urlparse(url) |
| query = urllib.parse.parse_qs(parsed.query) |
| encoded = query.get("u", [""])[0] |
| if not encoded: |
| return url |
| try: |
| if encoded.startswith("a1"): |
| encoded = encoded[2:] |
| padded = encoded + ("=" * (-len(encoded) % 4)) |
| decoded = base64.urlsafe_b64decode(padded.encode()).decode("utf-8", errors="ignore") |
| return decoded or url |
| except Exception: |
| return url |
|
|
| def _bing_fallback_search(query: str, max_results: int = 5) -> list[dict]: |
| try: |
| search_url = "https://www.bing.com/search?q=" + urllib.parse.quote_plus(query) |
| response = requests.get( |
| search_url, |
| timeout=15, |
| headers={"User-Agent": "Mozilla/5.0"}, |
| ) |
| response.raise_for_status() |
| soup = BeautifulSoup(response.text, "lxml") |
| out: list[dict] = [] |
| for item in soup.select("li.b_algo"): |
| a_tag = item.select_one("h2 a") |
| if not a_tag: |
| continue |
| href = (a_tag.get("href") or "").strip() |
| if not href: |
| continue |
| title = a_tag.get_text(" ", strip=True) |
| snippet_tag = item.select_one(".b_caption p") |
| snippet = snippet_tag.get_text(" ", strip=True) if snippet_tag else "" |
| out.append( |
| { |
| "title": title, |
| "snippet": snippet, |
| "link": _decode_bing_redirect_url(href), |
| } |
| ) |
| if len(out) >= max_results: |
| break |
| return out |
| except Exception as e: |
| logger.error("Bing fallback search failed: %s", str(e)) |
| return [] |
|
|
| candidates = [search_query, f"{search_query} official", f"{search_query} github"] |
| combined: list[dict] = [] |
| seen_links: set[str] = set() |
|
|
| for candidate in candidates: |
| ddg_results: list[dict] = [] |
| try: |
| results = DDGS().text(candidate, max_results=5, timelimit='m') |
| ddg_results = [ |
| { |
| "title": result.get("title", ""), |
| "snippet": result.get("body", ""), |
| "link": result.get("href", ""), |
| } |
| for result in results |
| ] |
| except Exception as e: |
| logger.error("Error performing DDG search: %s", str(e)) |
|
|
| if not ddg_results: |
| ddg_results = _bing_fallback_search(candidate, max_results=5) |
|
|
| for row in ddg_results: |
| link = (row.get("link") or "").strip() |
| if not link or link in seen_links: |
| continue |
| seen_links.add(link) |
| combined.append(row) |
| if len(combined) >= 5: |
| return combined |
|
|
| return combined |
|
|
| @tool('WebScrapper') |
| def web_scrapper(url: str, topic: str) -> str: |
| """ |
| Extract and read the content of a specified link and generate a summary on a specific topic. |
| - url: The URL to extract the content from. |
| - topic: Strign with the topic to generate a summary on. |
| """ |
| global TOKENS_SUMMARIZATION, RETRIEVAL_CONTEXT_BUDGET_TOKENS, RETRIEVAL_CONTEXT_USED_TOKENS |
|
|
| try: |
| content = fetch_content(url) |
| selected_model = os.getenv("ANALYSIS_MODEL", OPENAI_MODELS["base"]) |
| token_limits = get_model_token_limits(selected_model) |
| summary_max_tokens = token_limits["summary"] |
| prompt = f""" |
| # OBJECTIVE |
| Generate an in-depth summary of the following CONTENT on the topic "{topic}" |
| |
| # INSTRUCTIONS |
| - Provide in-depth insights based on the following CONTENT. |
| - If the following CONTENT is not directly related to the topic "{topic}", you MUST respond with INVALID CONTENT. |
| - Include insights about why the content is important for the topic, possible challenges and advances... |
| - The format will be markdown. |
| - Avoid making up anything. Every insight MUST be based on the content. |
| |
| # CONTENT: |
| "{content}" |
| """ |
|
|
| context_messages = [ |
| { |
| "role": "system", |
| "content": "You are an expert summarizing content for use as context. Focus on the main points." |
| }, |
| { |
| "role": "user", |
| "content": str(prompt) |
| } |
| ] |
|
|
| |
| if MODEL_CHOICE == "azure": |
| client = AzureOpenAI( |
| azure_endpoint=os.getenv('AZURE_API_BASE'), |
| azure_deployment=os.getenv('AZURE_DEPLOYMENT_ID'), |
| api_key=os.getenv('AZURE_OPENAI_KEY'), |
| api_version=os.getenv('AZURE_API_VERSION') |
| ) |
| response = client.chat.completions.create( |
| model=os.getenv('AZURE_DEPLOYMENT_ID'), |
| messages=context_messages, |
| temperature=0.7, |
| max_tokens=summary_max_tokens |
| ) |
|
|
| elif MODEL_CHOICE == "openai": |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) |
| openai_model = os.getenv("ANALYSIS_MODEL", OPENAI_MODELS['base']) |
| response = client.responses.create( |
| model=openai_model, |
| instructions="You are an expert summarizing content for use as context. Focus on the main points.", |
| input=str(prompt), |
| max_output_tokens=summary_max_tokens, |
| ) |
|
|
| elif MODEL_CHOICE == "groq": |
| client = OpenAI( |
| api_key=os.getenv('GROQ_API_KEY'), |
| base_url="https://api.groq.com/openai/v1" |
| ) |
| response = client.chat.completions.create( |
| model=os.getenv("ANALYSIS_MODEL", "llama-3.1-8b-instant"), |
| messages=context_messages, |
| temperature=0.7, |
| max_tokens=summary_max_tokens |
| ) |
| else: |
| return "Error: Invalid model choice. Please select 'azure', 'openai', or 'groq'." |
|
|
| if MODEL_CHOICE == "openai": |
| summary = extract_responses_text(response) |
| usage = getattr(response, "usage", None) |
| TOKENS_SUMMARIZATION += getattr(usage, "total_tokens", 0) or 0 |
| else: |
| summary = response.choices[0].message.content or "" |
| TOKENS_SUMMARIZATION += response.usage.total_tokens |
|
|
| if not summary.strip(): |
| return f"""<article_summary> |
| # SUMMARY: |
| Empty summary generated. IGNORE THIS OUTPUT. |
| |
| # URL: {url} |
| </article_summary> |
| """ |
|
|
| remaining_budget = RETRIEVAL_CONTEXT_BUDGET_TOKENS - RETRIEVAL_CONTEXT_USED_TOKENS |
| if remaining_budget <= 0: |
| return f"""<article_summary> |
| # SUMMARY: |
| Retrieval context budget reached. IGNORE THIS OUTPUT. |
| |
| # URL: {url} |
| </article_summary> |
| """ |
|
|
| summary = trim_text_to_token_budget(summary, remaining_budget) |
| consumed_tokens = estimate_tokens(summary) |
| RETRIEVAL_CONTEXT_USED_TOKENS += consumed_tokens |
| |
| summary_response = f"""<article_summary> |
| # SUMMARY: |
| {summary} |
| |
| # URL: {url} |
| </article_summary> |
| """ |
| |
| if MODEL_CHOICE == "groq": |
| time.sleep(10) |
| |
| return summary_response |
| |
| except Exception as e: |
| logger.error("Error generating summary: %s", str(e)) |
| return f"""<article_summary> |
| # SUMMARY: |
| Error generating summary. |
| IGNORE THIS OUTPUT. |
| |
| # URL: {url} |
| </article_summary> |
| """ |
|
|
|
|
| def capture_verbose_output( |
| agent_input, |
| provider_choice, |
| analysis_model, |
| azure_openai_key, |
| azure_api_base, |
| azure_api_version, |
| openai_api_key, |
| groq_api_key |
| ): |
| """ |
| This generator captures stdout produced by the multi-agent process in real time, |
| updating the Gradio interface with logs, while returning the final result once done. |
| """ |
| old_stdout = sys.stdout |
| mystdout = StringIO() |
| sys.stdout = mystdout |
|
|
| result_container = [("", "")] |
|
|
| def run_kickoff(): |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| result_container[0] = kickoff_crew( |
| topic=agent_input, |
| provider_choice=provider_choice, |
| analysis_model=analysis_model, |
| azure_openai_key=azure_openai_key, |
| azure_api_base=azure_api_base, |
| azure_api_version=azure_api_version, |
| openai_api_key=openai_api_key, |
| groq_api_key=groq_api_key |
| ) |
| finally: |
| try: |
| loop.run_until_complete(loop.shutdown_asyncgens()) |
| except Exception: |
| pass |
| loop.close() |
|
|
| kickoff_thread = threading.Thread(target=run_kickoff) |
| kickoff_thread.start() |
|
|
| verbose_output = "" |
| result_output = "" |
| stats_output = "" |
|
|
| |
| yield result_output, verbose_output, stats_output |
|
|
| |
| ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') |
|
|
| while kickoff_thread.is_alive(): |
| |
| new_output = mystdout.getvalue() |
| if new_output != verbose_output: |
| verbose_output = new_output |
| clean_verbose = ansi_escape.sub('', verbose_output) |
| yield result_output, clean_verbose, stats_output |
| time.sleep(0.1) |
|
|
| |
| kickoff_thread.join() |
| sys.stdout = old_stdout |
| result_output, stats_output = result_container[0] |
|
|
| verbose_output = mystdout.getvalue() |
| clean_verbose = ansi_escape.sub('', verbose_output) |
| yield result_output, clean_verbose, stats_output |
|
|
|
|
| def kickoff_crew( |
| topic: str, |
| provider_choice: str, |
| analysis_model: str, |
| azure_openai_key: str, |
| azure_api_base: str, |
| azure_api_version: str, |
| openai_api_key: str, |
| groq_api_key: str |
| ) -> tuple[str, str]: |
| """ |
| Kick off the multi-agent pipeline. |
| """ |
| try: |
| global TOKENS_SUMMARIZATION, MODEL_CHOICE, RETRIEVAL_CONTEXT_BUDGET_TOKENS, RETRIEVAL_CONTEXT_USED_TOKENS |
| |
| TOKENS_SUMMARIZATION = 0 |
| MODEL_CHOICE = provider_choice |
| os.environ["ANALYSIS_MODEL"] = analysis_model |
| token_limits = get_model_token_limits(analysis_model) |
| RETRIEVAL_CONTEXT_BUDGET_TOKENS = get_retrieval_context_budget_tokens(analysis_model) |
| RETRIEVAL_CONTEXT_USED_TOKENS = 0 |
|
|
| |
| if not topic.strip(): |
| return "Error: The topic cannot be empty. Please provide a valid topic.", "" |
| if not analysis_model: |
| return "Error: Select an analysis model before starting.", "" |
|
|
| |
| |
| azure_llm_base = None |
| azure_llm_advanced = None |
| openai_llm_base = None |
| openai_llm_advanced = None |
| groq_llm_base = None |
| groq_llm_advanced = None |
|
|
| if provider_choice == "azure": |
| if not azure_openai_key or not azure_api_base or not azure_api_version: |
| return "Error: Please provide all the required Azure OpenAI API details.", "" |
| else: |
| os.environ['AZURE_API_BASE']=azure_api_base |
| os.environ['AZURE_API_VERSION']=azure_api_version |
| os.environ['AZURE_DEPLOYMENT_ID']=analysis_model |
| os.environ['AZURE_OPENAI_KEY']=azure_openai_key |
| |
| azure_llm_base = LLM( |
| temperature=0.3, |
| model=f"azure/{analysis_model}", |
| api_key=azure_openai_key, |
| base_url=azure_api_base, |
| api_version=azure_api_version, |
| max_tokens=token_limits["base"] |
| ) |
| azure_llm_advanced = LLM( |
| temperature=0.6, |
| model=f"azure/{analysis_model}", |
| api_key=azure_openai_key, |
| base_url=azure_api_base, |
| api_version=azure_api_version, |
| max_tokens=token_limits["advanced"] |
| ) |
| elif provider_choice == "openai": |
| if not openai_api_key: |
| return "Error: Please provide the OpenAI API key.", "" |
| if not openai_api_key.strip().startswith("sk-"): |
| return "Error: Invalid OpenAI API key format. It should start with 'sk-'.", "" |
| if analysis_model not in OPENAI_ALLOWED_MODELS: |
| return ( |
| f"Error: OpenAI model '{analysis_model}' is not allowed. " |
| f"Use one of: {', '.join(OPENAI_ALLOWED_MODELS)}.", |
| "", |
| ) |
| else: |
| os.environ['OPENAI_API_KEY']=openai_api_key |
| |
| openai_llm_base = LLM( |
| provider="openai", |
| model=analysis_model, |
| api_key=openai_api_key, |
| max_completion_tokens=token_limits["base"] |
| ) |
| openai_llm_advanced = LLM( |
| provider="openai", |
| model=analysis_model, |
| api_key=openai_api_key, |
| max_completion_tokens=token_limits["advanced"] |
| ) |
| elif provider_choice == "groq": |
| if not groq_api_key: |
| return "Error: Please provide the GROQ API key.", "" |
| if not groq_api_key.strip().startswith("gsk_"): |
| return "Error: Invalid GROQ API key format. It should start with 'gsk_'.", "" |
| else: |
| os.environ['GROQ_API_KEY']=groq_api_key |
| |
| groq_llm_base = LLM( |
| model=analysis_model, |
| provider="openai", |
| api_key=groq_api_key, |
| base_url="https://api.groq.com/openai/v1", |
| temperature=0.3, |
| max_tokens=token_limits["base"] |
| ) |
| groq_llm_advanced = LLM( |
| model=analysis_model, |
| provider="openai", |
| api_key=groq_api_key, |
| base_url="https://api.groq.com/openai/v1", |
| temperature=0.6, |
| max_tokens=token_limits["advanced"] |
| ) |
|
|
|
|
| |
| |
| llms = { |
| "azure": { |
| "base": azure_llm_base, |
| "advanced": azure_llm_advanced |
| }, |
| "openai": { |
| "base": openai_llm_base, |
| "advanced": openai_llm_advanced |
| }, |
| "groq": { |
| "base": groq_llm_base, |
| "advanced": groq_llm_advanced |
| } |
| } |
|
|
| |
| if provider_choice not in llms: |
| return f"Error: Invalid model choice. Please select from {list(llms.keys())}.", "" |
|
|
| selected_llm = llms[provider_choice] |
|
|
| |
| researcher = Agent( |
| role='Researcher', |
| goal=f'Search and collect detailed information on topic ## {topic} ##', |
| tools=[search_results, web_scrapper], |
| llm=selected_llm["base"], |
| backstory=( |
| "You are a meticulous researcher, skilled at navigating vast amounts of information to extract " |
| "essential insights on any given topic. Your dedication to detail ensures the reliability and " |
| "thoroughness of your findings." |
| ), |
| allow_delegation=False, |
| max_iter=8, |
| max_rpm=5 if provider_choice == "groq" else 120, |
| verbose=True |
| ) |
|
|
| editor = Agent( |
| role='Editor', |
| goal=f'Compile and refine the information into a comprehensive report on topic ## {topic} ##', |
| llm=selected_llm["advanced"], |
| backstory=( |
| "As an expert editor, you specialize in transforming raw data into clear, engaging reports. " |
| "Your strong command of language and attention to detail ensure that each report not only conveys " |
| "essential insights but is also easily understandable to diverse audiences." |
| ), |
| allow_delegation=False, |
| max_iter=5, |
| max_rpm=10 if provider_choice == "groq" else 120, |
| verbose=True |
| ) |
| |
| |
| research_task = Task( |
| description=( |
| "Be sure to translate the topic into English first. " |
| "Use the DuckDuckGoSearchResults tool to collect initial search snippets on ## {topic} ##. " |
| "If more detailed searches are required, generate and execute new searches related to ## {topic} ##. " |
| "Prioritize high-signal sources and limit deep scraping to the 3 most relevant URLs. " |
| "Subsequently, employ the WebScrapper tool to extract information from significant URLs, " |
| "extracting further insights. Compile these findings into a preliminary draft, documenting all " |
| "relevant sources, titles, and links associated with the topic. " |
| "Ensure high accuracy throughout the process and avoid any fabrication of information. " |
| "Never output tool invocation syntax or ask for user input; always return a written draft, " |
| "and if evidence is limited, clearly mark uncertainties." |
| ), |
| expected_output=( |
| "A structured draft report about the topic, featuring an introduction, a detailed main body, " |
| "and a conclusion. Properly cite sources. Provide a thorough overview of the info gathered." |
| ), |
| agent=researcher |
| ) |
|
|
| edit_task = Task( |
| description=( |
| "Review and refine the initial draft report from the research task. Organize the content logically. " |
| "Elaborate on each section to provide in-depth information and insights. " |
| "Verify the accuracy of all data, correct discrepancies, update info to ensure currency, " |
| "and maintain a consistent tone. Include a section listing all sources used, formatted as bullet points. " |
| "Do not ask the user for additional information; produce the best possible final report with available context." |
| ), |
| expected_output=( |
| "A polished, comprehensive report on topic ## {topic} ##, with a clear, professional narrative. " |
| "Include an introduction, an extensive discussion, a concise conclusion, and a source list with references." |
| ), |
| agent=editor, |
| context=[research_task] |
| ) |
| |
| |
| crew = Crew( |
| agents=[researcher, editor], |
| tasks=[research_task, edit_task], |
| process=Process.sequential |
| ) |
| |
| |
| total_time_start = time.perf_counter() |
| result = crew.kickoff(inputs={'topic': topic}) |
| total_time_seconds = time.perf_counter() - total_time_start |
| |
| |
| total_tokens = 0 |
| token_usage = getattr(result, "token_usage", None) |
| usage_metrics = getattr(result, "usage_metrics", None) |
|
|
| if token_usage is not None and hasattr(token_usage, "total_tokens"): |
| total_tokens = token_usage.total_tokens |
| elif usage_metrics is not None: |
| if isinstance(usage_metrics, dict): |
| total_tokens = usage_metrics.get("total_tokens", 0) |
| elif hasattr(usage_metrics, "total_tokens"): |
| total_tokens = usage_metrics.total_tokens |
|
|
| tokens = total_tokens / 1_000 |
| tokens_summ = TOKENS_SUMMARIZATION / 1_000 |
|
|
| def task_duration_seconds(task: Task) -> float: |
| if task.start_time and task.end_time: |
| return (task.end_time - task.start_time).total_seconds() |
| return 0.0 |
|
|
| researcher_seconds = task_duration_seconds(research_task) |
| editor_seconds = task_duration_seconds(edit_task) |
| stats_output = ( |
| "### Statistics\n" |
| f"- Provider: `{provider_choice}`\n" |
| f"- Analysis model: `{analysis_model}`\n" |
| f"- Estimated tokens (Agents): `{tokens:.5f} k`\n" |
| f"- Estimated tokens (Summarization): `{tokens_summ:.5f} k`\n" |
| f"- Retrieval context used: `{RETRIEVAL_CONTEXT_USED_TOKENS}` tokens\n" |
| f"- Retrieval context budget: `{RETRIEVAL_CONTEXT_BUDGET_TOKENS}` tokens\n" |
| f"- Researcher time: `{researcher_seconds:.2f} s`\n" |
| f"- Editor time: `{editor_seconds:.2f} s`\n" |
| f"- Total execution time: `{total_time_seconds:.2f} s`" |
| ) |
| |
| if not isinstance(result, str): |
| result = str(result) |
|
|
| return result, stats_output |
| except Exception as e: |
| logger.error("Error in kickoff_crew: %s", str(e)) |
| return f"Error in kickoff_crew: {str(e)}", "" |
|
|
| |
| description_demo = """# Automatic Insights Generation with Multi-Agents (CrewAI) |
| - **Multi-agent framework**: CrewAI |
| - **Multi-agents**: Two agents, Researcher and Editor, working together to extract information from the internet and compile a report on the topic of choice. |
| - **Search tool**: Duck-Duck-Go-Search |
| - **Web Retrieval**: Newspaper4k and PDF |
| """ |
|
|
| APP_CSS = """ |
| #verbose_output { font-family: monospace; } |
| #provider-block { |
| border: 1px solid var(--border-color-primary) !important; |
| border-radius: 12px !important; |
| padding: 12px !important; |
| background: var(--block-background-fill) !important; |
| } |
| #provider-block h3, |
| #provider-block label, |
| #provider-block .label-wrap, |
| #provider-block .label-text, |
| #provider-block .wrap > label { |
| color: #111827 !important; |
| opacity: 1 !important; |
| } |
| #provider-block .prose p, |
| #provider-block .prose strong, |
| #provider-block .prose em { |
| color: #111827 !important; |
| } |
| """ |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown(description_demo) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| with gr.Column(elem_id="provider-block"): |
| gr.Markdown("### Choose Provider") |
| provider_choice = gr.Radio( |
| choices=["azure", "openai", "groq"], |
| label="Provider", |
| value="openai", |
| interactive=True |
| ) |
|
|
| |
| |
| |
| |
| azure_api_base_input = gr.Textbox(label="Azure API Base (url)", type="password", visible=False, interactive=True) |
| azure_openai_key_input = gr.Textbox(label="Azure API Key", type="password", visible=False, interactive=True) |
| azure_api_version_input = gr.Textbox(label="Azure API Version", type="text", visible=False, interactive=True) |
|
|
| |
| openai_api_key_input = gr.Textbox(label="OpenAI API Key", type="password", visible=True, interactive=True) |
|
|
| |
| groq_api_key_input = gr.Textbox(label="GROQ API Key", type="password", visible=False, interactive=False) |
|
|
| load_models_button = gr.Button("Load Available Models", interactive=False) |
| model_status = gr.Markdown("Load models to enable research.") |
|
|
| analysis_model = gr.Dropdown( |
| choices=[], |
| value=None, |
| label="Analysis Model", |
| info="Load available models for the selected provider and keys.", |
| interactive=True |
| ) |
|
|
| export_button = gr.Button("Export to Markdown", interactive=True) |
| file_output = gr.File(label="Download Markdown File") |
| credits = gr.Markdown( |
| label="Credits", |
|
|
| value="This tool is powered by [CrewAI](https://crewai.com), " |
| "[OpenAI](https://openai.com), " |
| "[Azure OpenAI Services](https://azure.microsoft.com/en-us/products/ai-services/openai-service), " |
| "and [GROQ](https://console.groq.com/playground).", |
| ) |
|
|
| with gr.Column(scale=2): |
| topic_input = gr.Textbox( |
| label="Enter Topic", |
| placeholder="Type here the topic of interest...", |
| interactive=True |
| ) |
| submit_button = gr.Button("Start Research", interactive=False) |
| output = gr.Markdown( |
| label="Result", |
| value="The generated insighsts will appear here...", |
| latex_delimiters=[ |
| {"left": "\\[", "right": "\\]", "display": True}, |
| {"left": "\\(", "right": "\\)", "display": False}, |
| ] |
| ) |
|
|
| verbose_output = gr.Textbox( |
| label="Verbose Output", |
| placeholder="Verbose logs will appear here...", |
| lines=10, |
| interactive=False, |
| elem_id="verbose_output" |
| ) |
| stats_output = gr.Markdown( |
| label="Statistics", |
| value="Statistics will appear after execution." |
| ) |
|
|
| |
| |
| |
| def update_provider_inputs(provider): |
| """Update visibility of config inputs based on the selected LLM.""" |
| azure_visibility = False |
| openai_visibility = False |
| groq_visibility = False |
|
|
| if provider == "azure": |
| azure_visibility = True |
| elif provider == "openai": |
| openai_visibility = True |
| elif provider == "groq": |
| groq_visibility = True |
|
|
| return ( |
| gr.update(visible=azure_visibility, interactive=azure_visibility), |
| gr.update(visible=azure_visibility, interactive=azure_visibility), |
| gr.update(visible=azure_visibility, interactive=azure_visibility), |
| gr.update(visible=openai_visibility, interactive=openai_visibility), |
| gr.update(visible=groq_visibility, interactive=groq_visibility), |
| gr.update(choices=[], value=None), |
| gr.update(value="Load models to enable research."), |
| gr.update(interactive=False), |
| ) |
|
|
| def invalidate_model_selection(provider, azure_key, azure_base, azure_version, openai_key, groq_key): |
| """Disable start until model list is loaded again with current credentials.""" |
| load_enabled = False |
| status_message = "Credentials changed. Reload available models." |
|
|
| if provider == "openai": |
| load_enabled = bool(openai_key) |
| status_message = ( |
| "Credentials ready. Click Load Available Models." |
| if load_enabled |
| else "Enter OpenAI API key to load models." |
| ) |
| elif provider == "azure": |
| load_enabled = bool(azure_key and azure_base and azure_version) |
| status_message = ( |
| "Credentials ready. Click Load Available Models." |
| if load_enabled |
| else "Enter Azure API base, version and API key to load models." |
| ) |
| elif provider == "groq": |
| load_enabled = bool(groq_key) |
| status_message = ( |
| "Credentials ready. Click Load Available Models." |
| if load_enabled |
| else "Enter GROQ API key to load models." |
| ) |
|
|
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value=status_message), |
| gr.update(interactive=False), |
| gr.update(interactive=load_enabled), |
| ) |
|
|
| def toggle_submit_from_model(selected_model): |
| """Start is enabled only if an analysis model is selected.""" |
| return gr.update(interactive=bool(selected_model)) |
|
|
| def load_available_models(provider, azure_key, azure_base, azure_version, openai_key, groq_key): |
| """Fetch provider models and enable Start Research only with valid credentials + selected model.""" |
| try: |
| model_ids = [] |
| if provider == "openai": |
| if not openai_key: |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="Enter OpenAI API key and load models."), |
| gr.update(interactive=False) |
| ) |
| if not openai_key.strip().startswith("sk-"): |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="Invalid OpenAI API key format. It should start with 'sk-'."), |
| gr.update(interactive=False) |
| ) |
| model_ids = list_openai_models(openai_key) |
| elif provider == "azure": |
| if not azure_key or not azure_base or not azure_version: |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="Enter Azure API base, version and key, then load models."), |
| gr.update(interactive=False) |
| ) |
| model_ids = list_azure_models(azure_key, azure_base, azure_version) |
| elif provider == "groq": |
| if not groq_key: |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="Enter GROQ API key and load models."), |
| gr.update(interactive=False) |
| ) |
| if not groq_key.strip().startswith("gsk_"): |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="Invalid GROQ API key format. It should start with 'gsk_'."), |
| gr.update(interactive=False) |
| ) |
| model_ids = list_groq_models(groq_key) |
| else: |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="Select a provider first."), |
| gr.update(interactive=False) |
| ) |
|
|
| if not model_ids: |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value="No models found for the selected provider."), |
| gr.update(interactive=False) |
| ) |
|
|
| return ( |
| gr.update(choices=model_ids, value=model_ids[0]), |
| gr.update(value=f"Loaded {len(model_ids)} models. Ready to start."), |
| gr.update(interactive=True) |
| ) |
| except Exception as e: |
| return ( |
| gr.update(choices=[], value=None), |
| gr.update(value=f"Error loading models: {str(e)}"), |
| gr.update(interactive=False) |
| ) |
|
|
| provider_choice.change( |
| fn=update_provider_inputs, |
| inputs=[provider_choice], |
| outputs=[ |
| azure_openai_key_input, |
| azure_api_base_input, |
| azure_api_version_input, |
| openai_api_key_input, |
| groq_api_key_input, |
| analysis_model, |
| model_status, |
| submit_button, |
| ] |
| ) |
|
|
| provider_choice.change( |
| fn=invalidate_model_selection, |
| inputs=[ |
| provider_choice, |
| azure_openai_key_input, |
| azure_api_base_input, |
| azure_api_version_input, |
| openai_api_key_input, |
| groq_api_key_input, |
| ], |
| outputs=[analysis_model, model_status, submit_button, load_models_button] |
| ) |
|
|
| for input_component in [ |
| azure_openai_key_input, |
| azure_api_base_input, |
| azure_api_version_input, |
| openai_api_key_input, |
| groq_api_key_input, |
| ]: |
| input_component.change( |
| fn=invalidate_model_selection, |
| inputs=[ |
| provider_choice, |
| azure_openai_key_input, |
| azure_api_base_input, |
| azure_api_version_input, |
| openai_api_key_input, |
| groq_api_key_input, |
| ], |
| outputs=[analysis_model, model_status, submit_button, load_models_button] |
| ) |
|
|
| load_models_button.click( |
| fn=load_available_models, |
| inputs=[ |
| provider_choice, |
| azure_openai_key_input, |
| azure_api_base_input, |
| azure_api_version_input, |
| openai_api_key_input, |
| groq_api_key_input |
| ], |
| outputs=[analysis_model, model_status, submit_button] |
| ) |
|
|
| analysis_model.change( |
| fn=toggle_submit_from_model, |
| inputs=[analysis_model], |
| outputs=[submit_button] |
| ) |
|
|
|
|
| submit_button.click( |
| fn=capture_verbose_output, |
| inputs=[ |
| topic_input, |
| provider_choice, |
| analysis_model, |
| azure_openai_key_input, |
| azure_api_base_input, |
| azure_api_version_input, |
| openai_api_key_input, |
| groq_api_key_input |
| ], |
| outputs=[output, verbose_output, stats_output] |
| ) |
|
|
| export_button.click( |
| fn=export_to_markdown, |
| inputs=output, |
| outputs=file_output |
| ) |
|
|
| demo.queue(max_size=3) |
|
|
|
|
| def launch_app(debug_mode: bool = False) -> None: |
| """Launch Gradio app with optional debug settings.""" |
| demo.launch( |
| css=APP_CSS, |
| ssr_mode=False, |
| debug=debug_mode, |
| show_error=debug_mode, |
| ) |
|
|
|
|
| def str_to_bool(value: str) -> bool: |
| return value.strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--debug", |
| action="store_true", |
| help="Enable debug mode with local auto-reload when available.", |
| ) |
| args = parser.parse_args() |
| debug_mode = args.debug or str_to_bool(os.getenv("DEBUG", "0")) |
|
|
| if debug_mode: |
| try: |
| from watchfiles import DefaultFilter, run_process |
|
|
| class AppWatchFilter(DefaultFilter): |
| def __call__(self, change, path): |
| excluded = ("/venv/", "/.venv/", "/.git/", "__pycache__", ".pytest_cache") |
| if any(token in path for token in excluded): |
| return False |
| return super().__call__(change, path) |
|
|
| print("Debug auto-reload enabled. Watching project files for changes...") |
| run_process( |
| ".", |
| target=launch_app, |
| kwargs={"debug_mode": True}, |
| watch_filter=AppWatchFilter(), |
| ) |
| except Exception as e: |
| print(f"Auto-reload not available ({e}). Starting in debug mode without watcher.") |
| launch_app(debug_mode=True) |
| else: |
| launch_app(debug_mode=False) |
|
|