| |
| |
| |
| |
| |
|
|
| from __future__ import annotations |
|
|
| import re |
| import json |
| import sys |
| from io import StringIO |
| from typing import List, Dict, Tuple, Annotated |
|
|
| import gradio as gr |
| import requests |
| from bs4 import BeautifulSoup |
| from readability import Document |
| from urllib.parse import urljoin, urldefrag, urlparse |
| from duckduckgo_search import DDGS |
|
|
| |
| import numpy as np |
| try: |
| import torch |
| except Exception: |
| torch = None |
| try: |
| from kokoro import KModel, KPipeline |
| except Exception: |
| KModel = None |
| KPipeline = None |
|
|
|
|
| |
| |
| |
|
|
| def _http_get(url: str) -> requests.Response: |
| """ |
| Download the page politely with a short timeout and realistic headers. |
| (Layman's terms: grab the web page like a normal browser would, but quickly.) |
| """ |
| headers = { |
| "User-Agent": "Mozilla/5.0 (compatible; WebMCP/1.0; +https://example.com)", |
| "Accept-Language": "en-US,en;q=0.9", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| } |
| return requests.get(url, headers=headers, timeout=15) |
|
|
|
|
| def _normalize_whitespace(text: str) -> str: |
| """ |
| Squeeze extra spaces and blank lines to keep things compact. |
| (Layman's terms: tidy up the text so it’s not full of weird spacing.) |
| """ |
| text = re.sub(r"[ \t\u00A0]+", " ", text) |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
| return text.strip() |
|
|
|
|
| def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
| """ |
| Cut text if it gets too long; return the text and whether we trimmed. |
| (Layman's terms: shorten long text and tell us if we had to cut it.) |
| """ |
| if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
| return text, False |
| return text[:max_chars].rstrip() + " …", True |
|
|
|
|
| def _shorten(text: str, limit: int) -> str: |
| """ |
| Hard cap a string with an ellipsis to keep tokens small. |
| (Layman's terms: force a string to a max length with an ellipsis.) |
| """ |
| if limit <= 0 or len(text) <= limit: |
| return text |
| return text[: max(0, limit - 1)].rstrip() + "…" |
|
|
|
|
| def _domain_of(url: str) -> str: |
| """ |
| Show a friendly site name like "example.com". |
| (Layman's terms: pull the website's domain.) |
| """ |
| try: |
| return urlparse(url).netloc or "" |
| except Exception: |
| return "" |
|
|
|
|
| def _meta(soup: BeautifulSoup, name: str) -> str | None: |
| tag = soup.find("meta", attrs={"name": name}) |
| return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
| def _og(soup: BeautifulSoup, prop: str) -> str | None: |
| tag = soup.find("meta", attrs={"property": prop}) |
| return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
| def _extract_metadata(soup: BeautifulSoup, final_url: str) -> Dict[str, str]: |
| """ |
| Pull the useful bits: title, description, site name, canonical URL, language, etc. |
| (Layman's terms: gather page basics like title/description/address.) |
| """ |
| meta: Dict[str, str] = {} |
|
|
| |
| title_candidates = [ |
| (soup.title.string if soup.title and soup.title.string else None), |
| _og(soup, "og:title"), |
| _meta(soup, "twitter:title"), |
| ] |
| meta["title"] = next((t.strip() for t in title_candidates if t and t.strip()), "") |
|
|
| |
| desc_candidates = [ |
| _meta(soup, "description"), |
| _og(soup, "og:description"), |
| _meta(soup, "twitter:description"), |
| ] |
| meta["description"] = next((d.strip() for d in desc_candidates if d and d.strip()), "") |
|
|
| |
| link_canonical = soup.find("link", rel=lambda v: v and "canonical" in v) |
| meta["canonical"] = (link_canonical.get("href") or "").strip() if link_canonical else "" |
|
|
| |
| meta["site_name"] = (_og(soup, "og:site_name") or "").strip() |
| html_tag = soup.find("html") |
| meta["lang"] = (html_tag.get("lang") or "").strip() if html_tag else "" |
|
|
| |
| meta["fetched_url"] = final_url |
| meta["domain"] = _domain_of(final_url) |
|
|
| return meta |
|
|
|
|
| def _extract_main_text(html: str) -> Tuple[str, BeautifulSoup]: |
| """ |
| Use Readability to isolate the main article and turn it into clean text. |
| Returns (clean_text, soup_of_readable_html). |
| (Layman's terms: find the real article text and clean it.) |
| """ |
| |
| doc = Document(html) |
| readable_html = doc.summary(html_partial=True) |
|
|
| |
| s = BeautifulSoup(readable_html, "lxml") |
|
|
| |
| for sel in ["script", "style", "noscript", "iframe", "svg"]: |
| for tag in s.select(sel): |
| tag.decompose() |
|
|
| |
| text_parts: List[str] = [] |
| for p in s.find_all(["p", "li", "h2", "h3", "h4", "blockquote"]): |
| chunk = p.get_text(" ", strip=True) |
| if chunk: |
| text_parts.append(chunk) |
|
|
| clean_text = _normalize_whitespace("\n\n".join(text_parts)) |
| return clean_text, s |
|
|
|
|
| def _extract_links(readable_soup: BeautifulSoup, base_url: str, max_links: int) -> List[Tuple[str, str]]: |
| """ |
| Collect clean, unique, absolute links from the readable section only. |
| (Layman's terms: pull a tidy list of links from the article body.) |
| """ |
| seen = set() |
| links: List[Tuple[str, str]] = [] |
|
|
| for a in readable_soup.find_all("a", href=True): |
| href = a.get("href").strip() |
| |
| if not href or href.startswith("#") or href.startswith("mailto:") or href.startswith("javascript:"): |
| continue |
|
|
| |
| absolute = urljoin(base_url, href) |
| absolute, _ = urldefrag(absolute) |
|
|
| if absolute in seen: |
| continue |
| seen.add(absolute) |
|
|
| text = a.get_text(" ", strip=True) |
| if len(text) > 120: |
| text = text[:117] + "…" |
|
|
| links.append((text or absolute, absolute)) |
|
|
| if len(links) >= max_links > 0: |
| break |
|
|
| return links |
|
|
|
|
| def _format_markdown( |
| meta: Dict[str, str], |
| body: str, |
| body_truncated: bool, |
| links: List[Tuple[str, str]], |
| include_text: bool, |
| include_metadata: bool, |
| include_links: bool, |
| verbosity: str, |
| ) -> str: |
| """ |
| Assemble a compact Markdown summary with optional sections. |
| (Layman's terms: build the final markdown output with options.) |
| """ |
| lines: List[str] = [] |
|
|
| |
| title = meta.get("title") or meta.get("domain") or "Untitled" |
| lines.append(f"# {title}") |
|
|
| |
| if include_metadata: |
| md: List[str] = [] |
| if meta.get("description"): |
| md.append(f"- **Description:** {meta['description']}") |
| if meta.get("site_name"): |
| md.append(f"- **Site:** {meta['site_name']}") |
| if meta.get("canonical"): |
| md.append(f"- **Canonical:** {meta['canonical']}") |
| if meta.get("lang"): |
| md.append(f"- **Language:** {meta['lang']}") |
| if meta.get("fetched_url"): |
| md.append(f"- **Fetched From:** {meta['fetched_url']}") |
| if md: |
| lines.append("## Metadata") |
| lines.extend(md) |
|
|
| |
| if include_text and body: |
| if verbosity == "Brief": |
| brief, was_more = _truncate(body, 800) |
| lines.append("## Text") |
| lines.append(brief) |
| if was_more or body_truncated: |
| lines.append("\n> (Trimmed for brevity)") |
| else: |
| lines.append("## Text") |
| lines.append(body) |
| if body_truncated: |
| lines.append("\n> (Trimmed for brevity)") |
|
|
| |
| if include_links and links: |
| lines.append(f"## Links ({len(links)})") |
| for text, url in links: |
| lines.append(f"- [{text}]({url})") |
|
|
| return "\n\n".join(lines).strip() |
|
|
|
|
| def Fetch_Webpage( |
| url: Annotated[str, "The absolute URL to fetch (must return HTML)."] , |
| verbosity: Annotated[str, "Controls body length: one of 'Brief', 'Standard', or 'Full'."] = "Standard", |
| include_metadata: Annotated[bool, "Include a Metadata section (description, site name, canonical, lang, fetched URL)."] = True, |
| include_text: Annotated[bool, "Include the readable main text extracted with Readability."] = True, |
| include_links: Annotated[bool, "Include outbound links discovered in the readable section."] = True, |
| max_chars: Annotated[int, "Hard cap for body characters after the verbosity preset. Use 0 to disable the cap."] = 3000, |
| max_links: Annotated[int, "Maximum number of links to include from the readable content. Set 0 to omit links."] = 20, |
| ) -> str: |
| """ |
| Fetch a web page and return a compact Markdown summary containing title, key |
| metadata, readable main text, and outbound links. |
| |
| Args: |
| url: The absolute URL to fetch (must return HTML). |
| verbosity: Controls body length: one of 'Brief', 'Standard', or 'Full'. |
| include_metadata: Include a Metadata section (description, site name, canonical, lang, fetched URL). |
| include_text: Include the readable main text extracted with Readability. |
| include_links: Include outbound links discovered in the readable section. |
| max_chars: Hard cap for body characters after the verbosity preset. Use 0 to disable the cap. |
| max_links: Maximum number of links to include from the readable content. Set 0 to omit links. |
| |
| Returns: |
| str: Markdown that may contain the following sections: |
| - Title (H1) |
| - Metadata (optional) |
| - Text (optional, may be trimmed) |
| - Links (optional, deduped and absolute) |
| """ |
| if not url or not url.strip(): |
| return "Please enter a valid URL." |
|
|
| try: |
| resp = _http_get(url) |
| resp.raise_for_status() |
| except requests.exceptions.RequestException as e: |
| return f"An error occurred: {e}" |
|
|
| final_url = str(resp.url) |
| ctype = resp.headers.get("Content-Type", "") |
| if "html" not in ctype.lower(): |
| return f"Unsupported content type for extraction: {ctype or 'unknown'}" |
|
|
| |
| resp.encoding = resp.encoding or resp.apparent_encoding |
| html = resp.text |
|
|
| |
| full_soup = BeautifulSoup(html, "lxml") |
| meta = _extract_metadata(full_soup, final_url) |
|
|
| |
| body_text, readable_soup = _extract_main_text(html) |
| if not body_text: |
| |
| fallback_text = full_soup.get_text(" ", strip=True) |
| body_text = _normalize_whitespace(fallback_text) |
|
|
| |
| preset_caps = {"Brief": 1200, "Standard": 3000, "Full": 999_999} |
| target_cap = preset_caps.get(verbosity, 3000) |
| cap = min(max_chars if max_chars > 0 else target_cap, target_cap) |
| body_text, truncated = _truncate(body_text, cap) if include_text else ("", False) |
|
|
| |
| links = _extract_links(readable_soup, final_url, max_links=max_links if include_links else 0) |
|
|
| |
| md = _format_markdown( |
| meta=meta, |
| body=body_text, |
| body_truncated=truncated, |
| links=links, |
| include_text=include_text, |
| include_metadata=include_metadata, |
| include_links=include_links, |
| verbosity=verbosity, |
| ) |
| return md or "No content could be extracted." |
|
|
|
|
| |
| |
| |
|
|
| def Search_DuckDuckGo( |
| query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."] , |
| max_results: Annotated[int, "Number of results to return (1–20)."] = 5, |
| include_snippets: Annotated[bool, "Include a short snippet for each result (adds tokens)."] = False, |
| max_snippet_chars: Annotated[int, "Character cap applied to each snippet when included."] = 80, |
| dedupe_domains: Annotated[bool, "If true, only the first result from each domain is kept."] = True, |
| title_chars: Annotated[int, "Character cap applied to titles."] = 80, |
| ) -> str: |
| """ |
| Run a DuckDuckGo search and return ultra-compact JSONL with short keys to |
| minimize tokens. |
| |
| Args: |
| query: The search query (supports operators like site:, quotes, OR). |
| max_results: Number of results to return (1–20). |
| include_snippets: Include a short snippet for each result (adds tokens). |
| max_snippet_chars: Character cap applied to each snippet when included. |
| dedupe_domains: If true, only the first result from each domain is kept. |
| title_chars: Character cap applied to titles. |
| |
| Returns: |
| str: Newline-delimited JSON (JSONL). Each line has: |
| {"t": "title", "u": "url"[, "s": "snippet"]} |
| """ |
| if not query or not query.strip(): |
| return "" |
|
|
| try: |
| with DDGS() as ddgs: |
| raw = ddgs.text(query, max_results=max_results) |
| except Exception as e: |
| return json.dumps({"error": str(e)[:120]}, ensure_ascii=False, separators=(",", ":")) |
|
|
| seen_domains = set() |
| lines: List[str] = [] |
|
|
| for r in raw or []: |
| title = _shorten((r.get("title") or "").strip(), title_chars) |
| url = (r.get("href") or r.get("link") or "").strip() |
| body = (r.get("body") or r.get("snippet") or "").strip() |
|
|
| if not url: |
| continue |
|
|
| if dedupe_domains: |
| dom = _domain_of(url) |
| if dom in seen_domains: |
| continue |
| seen_domains.add(dom) |
|
|
| obj = {"t": title or _domain_of(url), "u": url} |
|
|
| if include_snippets and body: |
| obj["s"] = _shorten(body, max_snippet_chars) |
|
|
| |
| lines.append(json.dumps(obj, ensure_ascii=False, separators=(",", ":"))) |
|
|
| |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def Execute_Python(code: Annotated[str, "Python source code to run; stdout is captured and returned."]) -> str: |
| """ |
| Execute arbitrary Python code and return captured stdout or an error message. |
| |
| Args: |
| code: Python source code to run; stdout is captured and returned. |
| |
| Returns: |
| str: Combined stdout produced by the code, or the exception text if |
| execution failed. |
| """ |
| if code is None: |
| return "No code provided." |
|
|
| old_stdout = sys.stdout |
| redirected_output = sys.stdout = StringIO() |
| try: |
| exec(code) |
| return redirected_output.getvalue() |
| except Exception as e: |
| return str(e) |
| finally: |
| sys.stdout = old_stdout |
|
|
|
|
| |
| |
| |
|
|
| _KOKORO_STATE = { |
| "initialized": False, |
| "device": "cpu", |
| "model": None, |
| "pipelines": {}, |
| } |
|
|
|
|
| def _init_kokoro() -> None: |
| """Lazy-initialize Kokoro model and pipelines on first use. |
| |
| Tries CUDA if torch is present and available; falls back to CPU. Keeps a |
| minimal English pipeline and custom lexicon tweak for the word "kokoro". |
| """ |
| if _KOKORO_STATE["initialized"]: |
| return |
|
|
| if KModel is None or KPipeline is None: |
| raise RuntimeError( |
| "Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4)." |
| ) |
|
|
| device = "cpu" |
| if torch is not None: |
| try: |
| if torch.cuda.is_available(): |
| device = "cuda" |
| except Exception: |
| device = "cpu" |
|
|
| model = KModel().to(device).eval() |
| pipelines = {"a": KPipeline(lang_code="a", model=False)} |
| |
| try: |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" |
| except Exception: |
| pass |
|
|
| _KOKORO_STATE.update( |
| { |
| "initialized": True, |
| "device": device, |
| "model": model, |
| "pipelines": pipelines, |
| } |
| ) |
|
|
|
|
| def Kokoro_TextToAudio( |
| text: Annotated[str, "The text to synthesize (English)."], |
| speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.0, |
| voice: Annotated[str, "Voice identifier. Example: 'af_heart' (US English, female, Heart)."] = "af_heart", |
| ) -> Tuple[int, np.ndarray]: |
| """ |
| Synthesize speech from text using the Kokoro-82M model. |
| |
| This function returns raw audio suitable for a Gradio Audio component and is |
| also exposed as an MCP tool (per the latest Hugging Face/Gradio MCP docs, a |
| tool is created for each function wired into your app; docstrings and type |
| hints are used to describe the tool). |
| |
| Args: |
| text: The text to synthesize (English). |
| speed: Speech speed multiplier in 0.5–2.0; 1.0 = normal speed. |
| voice: Voice identifier. Example: 'af_heart' (US English, female, Heart). |
| |
| Returns: |
| A tuple of (sample_rate_hz, audio_waveform) where: |
| - sample_rate_hz: int sample rate in Hz (24_000) |
| - audio_waveform: numpy.ndarray float32 mono waveform in range [-1, 1] |
| |
| Notes: |
| - Requires the 'kokoro' package (>=0.9.4). If unavailable, an error is |
| raised with installation guidance. |
| - Runs on CUDA if available; otherwise CPU. |
| """ |
| if not text or not text.strip(): |
| raise gr.Error("Please provide non-empty text to synthesize.") |
|
|
| _init_kokoro() |
| model = _KOKORO_STATE["model"] |
| pipelines = _KOKORO_STATE["pipelines"] |
|
|
| pipeline = pipelines.get("a") |
| if pipeline is None: |
| raise gr.Error("Kokoro English pipeline not initialized.") |
|
|
| pack = pipeline.load_voice(voice) |
| |
| for _, ps, _ in pipeline(text, voice, speed): |
| ref_s = pack[len(ps) - 1] |
| try: |
| audio = model(ps, ref_s, float(speed)) |
| except Exception as e: |
| raise gr.Error(f"Error generating audio: {str(e)}") |
| |
| return 24_000, audio.detach().cpu().numpy() |
|
|
| |
| raise gr.Error("No audio was generated (empty synthesis result).") |
|
|
|
|
| |
| |
| |
|
|
| |
| fetch_interface = gr.Interface( |
| fn=Fetch_Webpage, |
| inputs=[ |
| gr.Textbox(label="URL", placeholder="https://example.com/article"), |
| gr.Dropdown(label="Verbosity", choices=["Brief", "Standard", "Full"], value="Standard"), |
| gr.Checkbox(value=True, label="Include Metadata"), |
| gr.Checkbox(value=True, label="Include Main Text"), |
| gr.Checkbox(value=True, label="Include Links"), |
| gr.Slider(400, 12000, value=3000, step=100, label="Max Characters (body text)"), |
| gr.Slider(0, 100, value=20, step=1, label="Max Links"), |
| ], |
| outputs=gr.Markdown(label="Extracted Summary"), |
| title="Fetch Webpage", |
| description=( |
| "<div style=\"text-align:center\">Extract title, key metadata, readable text, and links from webpages. No noisy HTML.</div>" |
| ), |
| api_description=( |
| "Fetch a web page and return a compact Markdown summary with title, key " |
| "metadata, readable body text, and outbound links. Parameters let you " |
| "control verbosity, whether to include metadata/text/links, and limits " |
| "for characters and number of links." |
| ), |
| allow_flagging="never", |
| ) |
|
|
| |
| concise_interface = gr.Interface( |
| fn=Search_DuckDuckGo, |
| inputs=[ |
| gr.Textbox(label="Query", placeholder="topic OR site:example.com"), |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), |
| gr.Checkbox(value=False, label="Include snippets (adds tokens)"), |
| gr.Slider(minimum=20, maximum=200, value=80, step=5, label="Max snippet chars"), |
| gr.Checkbox(value=True, label="Dedupe by domain"), |
| gr.Slider(minimum=20, maximum=120, value=80, step=5, label="Max title chars"), |
| ], |
| outputs=gr.Textbox(label="Results (JSONL)", interactive=False), |
| title="DuckDuckGo Search", |
| description=( |
| "<div style=\"text-align:center\">Very concise web search to avoid unnecessary context. Emits JSONL with short keys (t,u[,s]). Defaults avoid snippets and duplicate domains.</div>" |
| ), |
| api_description=( |
| "Run a DuckDuckGo search and return newline-delimited JSON with short keys: " |
| "t=title, u=url, optional s=snippet. Options control result count, " |
| "snippet inclusion and length, domain deduping, and title length." |
| ), |
| allow_flagging="never", |
| submit_btn="Search", |
| ) |
|
|
| |
|
|
| |
| code_interface = gr.Interface( |
| fn=Execute_Python, |
| inputs=gr.Code(label="Python Code", language="python"), |
| outputs=gr.Textbox(label="Output"), |
| title="Python Code Executor", |
| description=( |
| "<div style=\"text-align:center\">Execute Python code and see the output.</div>" |
| ), |
| api_description=( |
| "Execute arbitrary Python code and return captured stdout or an error message.\n\n" |
| "Parameters:\n" |
| "- code (string): The Python source code to run.\n\n" |
| "Returns:\n" |
| "- string: Combined stdout produced by the code, or the exception text if execution failed." |
| ), |
| allow_flagging="never", |
| ) |
|
|
| CSS_STYLES = """ |
| .gradio-container h1 { |
| text-align: center; |
| } |
| /* Default: add subtitle under titles */ |
| .gradio-container h1::after { |
| content: "Fetch Webpage | Search DuckDuckGo | Code Interpreter | Kokoro TTS"; |
| display: block; |
| font-size: 1rem; |
| font-weight: 500; |
| opacity: 0.9; |
| margin-top: 6px; |
| } |
| |
| /* But remove it inside tab panels so it doesn't duplicate under each tool title */ |
| .gradio-container [role=\"tabpanel\"] h1::after { |
| content: none !important; |
| } |
| """ |
|
|
| |
| kokoro_interface = gr.Interface( |
| fn=Kokoro_TextToAudio, |
| inputs=[ |
| gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), |
| gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed"), |
| gr.Textbox(label="Voice", value="af_heart", placeholder="e.g., af_heart"), |
| ], |
| outputs=gr.Audio(label="Audio", type="numpy"), |
| title="Kokoro TTS", |
| description=( |
| "<div style=\"text-align:center\">Generate English speech with Kokoro-82M. 30 second max output. Runs on CPU or CUDA if available.</div>" |
| ), |
| api_description=( |
| "Synthesize speech from text using Kokoro-82M. Returns (sample_rate, waveform) suitable for playback." |
| " Parameters: text (str), speed (float 0.5–2.0), voice (str)." |
| ), |
| allow_flagging="never", |
| ) |
|
|
| |
| demo = gr.TabbedInterface( |
| interface_list=[fetch_interface, concise_interface, code_interface, kokoro_interface], |
| tab_names=[ |
| "Fetch Webpage", |
| "DuckDuckGo Search", |
| "Python Code Executor", |
| "Kokoro TTS", |
| ], |
| title="Tools MCP", |
| theme="Nymbo/Nymbo_Theme", |
| css=CSS_STYLES, |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch(mcp_server=True) |