| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from __future__ import annotations |
|
|
| import re |
| import json |
| import sys |
| import os |
| import random |
| from io import StringIO |
| from typing import List, Dict, Tuple, Annotated, Literal, Optional |
|
|
| import gradio as gr |
| import requests |
| from bs4 import BeautifulSoup |
| from markdownify import markdownify as md |
| from readability import Document |
| from urllib.parse import urlparse |
| from ddgs import DDGS |
| from PIL import Image |
| from huggingface_hub import InferenceClient |
| import time |
| import tempfile |
| import uuid |
| import threading |
| from datetime import datetime |
|
|
| |
| import numpy as np |
| try: |
| import torch |
| except Exception: |
| torch = None |
| try: |
| from kokoro import KModel, KPipeline |
| except Exception: |
| KModel = None |
| KPipeline = None |
|
|
|
|
| |
| |
| |
|
|
| def _http_get_enhanced(url: str) -> requests.Response: |
| """ |
| Download the page with enhanced headers, timeout handling, and better error recovery. |
| """ |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
| "Accept-Language": "en-US,en;q=0.9", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| "Accept-Encoding": "gzip, deflate, br", |
| "DNT": "1", |
| "Connection": "keep-alive", |
| "Upgrade-Insecure-Requests": "1", |
| } |
| |
| |
| _fetch_rate_limiter.acquire() |
| |
| try: |
| response = requests.get( |
| url, |
| headers=headers, |
| timeout=30, |
| allow_redirects=True, |
| stream=False |
| ) |
| response.raise_for_status() |
| return response |
| except requests.exceptions.Timeout: |
| raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") |
| except requests.exceptions.ConnectionError: |
| raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") |
| except requests.exceptions.HTTPError as e: |
| if response.status_code == 403: |
| raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") |
| elif response.status_code == 404: |
| raise requests.exceptions.RequestException("Page not found. Please check the URL.") |
| elif response.status_code == 429: |
| raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") |
| else: |
| raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {str(e)}") |
|
|
| def _normalize_whitespace(text: str) -> str: |
| """ |
| Squeeze extra spaces and blank lines to keep things compact. |
| (Layman's terms: tidy up the text so it’s not full of weird spacing.) |
| """ |
| text = re.sub(r"[ \t\u00A0]+", " ", text) |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
| return text.strip() |
|
|
|
|
| def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
| """ |
| Cut text if it gets too long; return the text and whether we trimmed. |
| (Layman's terms: shorten long text and tell us if we had to cut it.) |
| """ |
| if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
| return text, False |
| return text[:max_chars].rstrip() + " …", True |
|
|
|
|
| def _shorten(text: str, limit: int) -> str: |
| """ |
| Hard cap a string with an ellipsis to keep tokens small. |
| (Layman's terms: force a string to a max length with an ellipsis.) |
| """ |
| if limit <= 0 or len(text) <= limit: |
| return text |
| return text[: max(0, limit - 1)].rstrip() + "…" |
|
|
|
|
| def _domain_of(url: str) -> str: |
| """ |
| Show a friendly site name like "example.com". |
| (Layman's terms: pull the website's domain.) |
| """ |
| try: |
| return urlparse(url).netloc or "" |
| except Exception: |
| return "" |
|
|
|
|
| def _meta(soup: BeautifulSoup, name: str) -> str | None: |
| tag = soup.find("meta", attrs={"name": name}) |
| return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
| def _og(soup: BeautifulSoup, prop: str) -> str | None: |
| tag = soup.find("meta", attrs={"property": prop}) |
| return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
| def _extract_metadata(soup: BeautifulSoup, final_url: str) -> Dict[str, str]: |
| """ |
| Pull the useful bits: title, description, site name, canonical URL, language, etc. |
| (Layman's terms: gather page basics like title/description/address.) |
| """ |
| meta: Dict[str, str] = {} |
|
|
| |
| title_candidates = [ |
| (soup.title.string if soup.title and soup.title.string else None), |
| _og(soup, "og:title"), |
| _meta(soup, "twitter:title"), |
| ] |
| meta["title"] = next((t.strip() for t in title_candidates if t and t.strip()), "") |
|
|
| |
| desc_candidates = [ |
| _meta(soup, "description"), |
| _og(soup, "og:description"), |
| _meta(soup, "twitter:description"), |
| ] |
| meta["description"] = next((d.strip() for d in desc_candidates if d and d.strip()), "") |
|
|
| |
| link_canonical = soup.find("link", rel=lambda v: v and "canonical" in v) |
| meta["canonical"] = (link_canonical.get("href") or "").strip() if link_canonical else "" |
|
|
| |
| meta["site_name"] = (_og(soup, "og:site_name") or "").strip() |
| html_tag = soup.find("html") |
| meta["lang"] = (html_tag.get("lang") or "").strip() if html_tag else "" |
|
|
| |
| meta["fetched_url"] = final_url |
| meta["domain"] = _domain_of(final_url) |
|
|
| return meta |
|
|
|
|
| def _extract_main_text(html: str) -> Tuple[str, BeautifulSoup]: |
| """ |
| Use Readability to isolate the main article and turn it into clean text. |
| Returns (clean_text, soup_of_readable_html). |
| (Layman's terms: find the real article text and clean it.) |
| """ |
| |
| doc = Document(html) |
| readable_html = doc.summary(html_partial=True) |
|
|
| |
| s = BeautifulSoup(readable_html, "lxml") |
|
|
| |
| for sel in ["script", "style", "noscript", "iframe", "svg"]: |
| for tag in s.select(sel): |
| tag.decompose() |
|
|
| |
| text_parts: List[str] = [] |
| for p in s.find_all(["p", "li", "h2", "h3", "h4", "blockquote"]): |
| chunk = p.get_text(" ", strip=True) |
| if chunk: |
| text_parts.append(chunk) |
|
|
| clean_text = _normalize_whitespace("\n\n".join(text_parts)) |
| return clean_text, s |
|
|
|
|
| def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str) -> str: |
| |
| |
| for element in full_soup.select("script, style, nav, footer, header, aside"): |
| element.decompose() |
|
|
| |
| main = ( |
| full_soup.find("main") |
| or full_soup.find("article") |
| or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) |
| or full_soup.find("body") |
| ) |
|
|
| if not main: |
| return "No main content found on the webpage." |
|
|
| |
| markdown_text = md(str(main), heading_style="ATX") |
|
|
| |
| markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) |
| markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) |
| markdown_text = re.sub(r"[ \t]+", " ", markdown_text) |
| markdown_text = markdown_text.strip() |
|
|
| |
| title = full_soup.find("title") |
| if title and title.get_text(strip=True): |
| markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" |
|
|
| return markdown_text or "No content could be extracted." |
|
|
|
|
| def _truncate_markdown(markdown: str, max_chars: int) -> str: |
| """ |
| Truncate markdown content to a maximum character count while preserving structure. |
| Tries to break at paragraph boundaries when possible. |
| """ |
| if len(markdown) <= max_chars: |
| return markdown |
| |
| |
| truncated = markdown[:max_chars] |
| |
| |
| last_paragraph = truncated.rfind('\n\n') |
| if last_paragraph > max_chars * 0.7: |
| truncated = truncated[:last_paragraph] |
| |
| |
| elif '.' in truncated[-100:]: |
| last_period = truncated.rfind('.') |
| if last_period > max_chars * 0.8: |
| truncated = truncated[:last_period + 1] |
| |
| return truncated.rstrip() + "\n\n> *[Content truncated for brevity]*" |
|
|
|
|
| def Fetch_Webpage( |
| url: Annotated[str, "The absolute URL to fetch (must return HTML)."], |
| verbosity: Annotated[str, "Controls output length: 'Brief' (1000 chars), 'Standard' (3000 chars), or 'Full' (complete page)."] = "Standard", |
| ) -> str: |
| """ |
| Fetch a web page and return it converted to Markdown format with configurable length. |
| |
| This function retrieves a webpage and converts its main content to clean Markdown, |
| preserving headings, formatting, and structure. It automatically removes navigation, |
| footers, scripts, and other non-content elements to focus on the main article or |
| content area. |
| |
| Args: |
| url (str): The absolute URL to fetch (must return HTML). |
| verbosity (str): Controls output length: |
| - "Brief": Truncate to 1000 characters for quick summaries |
| - "Standard": Truncate to 3000 characters for balanced content |
| - "Full": Return complete page content with no length limit |
| |
| Returns: |
| str: The webpage content converted to Markdown format with: |
| - Page title as H1 header |
| - Main content converted to clean Markdown |
| - Preserved heading hierarchy |
| - Clean formatting without navigation/sidebar elements |
| - Length controlled by verbosity setting |
| """ |
| _log_call_start("Fetch_Webpage", url=url, verbosity=verbosity) |
| if not url or not url.strip(): |
| result = "Please enter a valid URL." |
| _log_call_end("Fetch_Webpage", _truncate_for_log(result)) |
| return result |
|
|
| try: |
| resp = _http_get_enhanced(url) |
| resp.raise_for_status() |
| except requests.exceptions.RequestException as e: |
| result = f"An error occurred: {e}" |
| _log_call_end("Fetch_Webpage", _truncate_for_log(result)) |
| return result |
|
|
| final_url = str(resp.url) |
| ctype = resp.headers.get("Content-Type", "") |
| if "html" not in ctype.lower(): |
| result = f"Unsupported content type for extraction: {ctype or 'unknown'}" |
| _log_call_end("Fetch_Webpage", _truncate_for_log(result)) |
| return result |
|
|
| |
| resp.encoding = resp.encoding or resp.apparent_encoding |
| html = resp.text |
|
|
| |
| full_soup = BeautifulSoup(html, "lxml") |
| markdown_content = _fullpage_markdown_from_soup(full_soup, final_url) |
| |
| |
| if verbosity == "Brief": |
| result = _truncate_markdown(markdown_content, 1000) |
| elif verbosity == "Standard": |
| result = _truncate_markdown(markdown_content, 3000) |
| else: |
| result = markdown_content |
| _log_call_end("Fetch_Webpage", f"markdown_chars={len(result)}") |
| return result |
|
|
|
|
| |
| |
| |
|
|
| import asyncio |
| from datetime import datetime, timedelta |
|
|
| class RateLimiter: |
| def __init__(self, requests_per_minute: int = 30): |
| self.requests_per_minute = requests_per_minute |
| self.requests = [] |
|
|
| def acquire(self): |
| """Synchronous rate limiting for non-async context""" |
| now = datetime.now() |
| |
| self.requests = [ |
| req for req in self.requests if now - req < timedelta(minutes=1) |
| ] |
|
|
| if len(self.requests) >= self.requests_per_minute: |
| |
| wait_time = 60 - (now - self.requests[0]).total_seconds() |
| if wait_time > 0: |
| time.sleep(max(1, wait_time)) |
| |
| self.requests.append(now) |
|
|
| |
| _search_rate_limiter = RateLimiter(requests_per_minute=20) |
| _fetch_rate_limiter = RateLimiter(requests_per_minute=25) |
|
|
| |
| |
| |
|
|
| def _truncate_for_log(value: str, limit: int = 500) -> str: |
| """Truncate long strings for concise terminal logging.""" |
| if len(value) <= limit: |
| return value |
| return value[:limit - 1] + "…" |
|
|
|
|
| def _serialize_input(val): |
| """Best-effort compact serialization of arbitrary input values for logging.""" |
| try: |
| if isinstance(val, (str, int, float, bool)) or val is None: |
| return val |
| if isinstance(val, (list, tuple)): |
| return [_serialize_input(v) for v in list(val)[:10]] + (["…"] if len(val) > 10 else []) |
| if isinstance(val, dict): |
| out = {} |
| for i, (k, v) in enumerate(val.items()): |
| if i >= 12: |
| out["…"] = "…" |
| break |
| out[str(k)] = _serialize_input(v) |
| return out |
| return repr(val)[:120] |
| except Exception: |
| return "<unserializable>" |
|
|
|
|
| def _log_call_start(func_name: str, **kwargs) -> None: |
| try: |
| compact = {k: _serialize_input(v) for k, v in kwargs.items()} |
| print(f"[TOOL CALL] {func_name} inputs: {json.dumps(compact, ensure_ascii=False)[:800]}", flush=True) |
| except Exception as e: |
| print(f"[TOOL CALL] {func_name} (failed to log inputs: {e})", flush=True) |
|
|
|
|
| def _log_call_end(func_name: str, output_desc: str) -> None: |
| try: |
| print(f"[TOOL RESULT] {func_name} output: {output_desc}", flush=True) |
| except Exception as e: |
| print(f"[TOOL RESULT] {func_name} (failed to log output: {e})", flush=True) |
|
|
| def Search_DuckDuckGo( |
| query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], |
| max_results: Annotated[int, "Number of results to return (1–20)."] = 5, |
| ) -> str: |
| """ |
| Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. |
| |
| Args: |
| query (str): The search query string. Supports operators like site:, quotes for exact matching, |
| OR for alternatives, and other DuckDuckGo search syntax. |
| Examples: |
| - Basic search: "Python programming" |
| - Site search: "site:example.com" |
| - Exact phrase: "artificial intelligence" |
| - Exclude terms: "cats -dogs" |
| max_results (int): Number of results to return (1–20). Default: 5. |
| |
| Returns: |
| str: Search results in readable format with titles, URLs, and snippets as a numbered list. |
| """ |
| _log_call_start("Search_DuckDuckGo", query=query, max_results=max_results) |
| if not query or not query.strip(): |
| result = "No search query provided. Please enter a search term." |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
| return result |
|
|
| |
| max_results = max(1, min(20, max_results)) |
| |
| try: |
| |
| _search_rate_limiter.acquire() |
| |
| |
| with DDGS() as ddgs: |
| raw = ddgs.text(query, max_results=max_results) |
| |
| except Exception as e: |
| error_msg = f"Search failed: {str(e)[:200]}" |
| if "blocked" in str(e).lower() or "rate" in str(e).lower(): |
| error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." |
| elif "timeout" in str(e).lower(): |
| error_msg = "Search timed out. Please try again with a simpler query." |
| elif "network" in str(e).lower() or "connection" in str(e).lower(): |
| error_msg = "Network connection error. Please check your internet connection and try again." |
| result = f"Error: {error_msg}" |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
| return result |
|
|
| if not raw: |
| result = f"No results found for query: {query}" |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
| return result |
|
|
| results = [] |
|
|
| for r in raw or []: |
| title = (r.get("title") or "").strip() |
| url = (r.get("href") or r.get("link") or "").strip() |
| body = (r.get("body") or r.get("snippet") or "").strip() |
|
|
| if not url: |
| continue |
|
|
| result_obj = { |
| "title": title or _domain_of(url), |
| "url": url, |
| "snippet": body |
| } |
| |
| results.append(result_obj) |
|
|
| if not results: |
| result = f"No valid results found for query: {query}" |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
| return result |
|
|
| |
| lines = [f"Found {len(results)} search results for: {query}\n"] |
| for i, result in enumerate(results, 1): |
| lines.append(f"{i}. {result['title']}") |
| lines.append(f" URL: {result['url']}") |
| if result['snippet']: |
| lines.append(f" Summary: {result['snippet']}") |
| lines.append("") |
| result = "\n".join(lines) |
| _log_call_end("Search_DuckDuckGo", f"results={len(results)} chars={len(result)}") |
| return result |
|
|
|
|
| |
| |
| |
|
|
| def Execute_Python(code: Annotated[str, "Python source code to run; stdout is captured and returned."]) -> str: |
| """ |
| Execute arbitrary Python code and return captured stdout or an error message. |
| |
| Args: |
| code (str): Python source code to run; stdout is captured and returned. |
| |
| Returns: |
| str: Combined stdout produced by the code, or the exception text if |
| execution failed. |
| """ |
| _log_call_start("Execute_Python", code=_truncate_for_log(code or "", 300)) |
| if code is None: |
| result = "No code provided." |
| _log_call_end("Execute_Python", result) |
| return result |
|
|
| old_stdout = sys.stdout |
| redirected_output = sys.stdout = StringIO() |
| try: |
| exec(code) |
| result = redirected_output.getvalue() |
| except Exception as e: |
| result = str(e) |
| finally: |
| sys.stdout = old_stdout |
| _log_call_end("Execute_Python", _truncate_for_log(result)) |
| return result |
|
|
|
|
| |
| |
| |
|
|
| _KOKORO_STATE = { |
| "initialized": False, |
| "device": "cpu", |
| "model": None, |
| "pipelines": {}, |
| } |
|
|
|
|
| def get_kokoro_voices(): |
| """Get comprehensive list of available Kokoro voice IDs (54 total).""" |
| try: |
| from huggingface_hub import list_repo_files |
| |
| files = list_repo_files('hexgrad/Kokoro-82M') |
| voice_files = [f for f in files if f.endswith('.pt') and f.startswith('voices/')] |
| voices = [f.replace('voices/', '').replace('.pt', '') for f in voice_files] |
| return sorted(voices) if voices else _get_fallback_voices() |
| except Exception: |
| return _get_fallback_voices() |
|
|
|
|
| def _get_fallback_voices(): |
| """Return comprehensive fallback list of known Kokoro voices (54 total).""" |
| return [ |
| |
| "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", |
| "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", |
| |
| "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", |
| "am_michael", "am_onyx", "am_puck", "am_santa", |
| |
| "bf_alice", "bf_emma", "bf_isabella", "bf_lily", |
| |
| "bm_daniel", "bm_fable", "bm_george", "bm_lewis", |
| |
| "ef_dora", "em_alex", "em_santa", |
| |
| "ff_siwis", |
| |
| "hf_alpha", "hf_beta", "hm_omega", "hm_psi", |
| |
| "if_sara", "im_nicola", |
| |
| "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", |
| |
| "pf_dora", "pm_alex", "pm_santa", |
| |
| "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", |
| "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" |
| ] |
|
|
|
|
| def _init_kokoro() -> None: |
| """Lazy-initialize Kokoro model and pipelines on first use. |
| |
| Tries CUDA if torch is present and available; falls back to CPU. Keeps a |
| minimal English pipeline and custom lexicon tweak for the word "kokoro". |
| """ |
| if _KOKORO_STATE["initialized"]: |
| return |
|
|
| if KModel is None or KPipeline is None: |
| raise RuntimeError( |
| "Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4)." |
| ) |
|
|
| device = "cpu" |
| if torch is not None: |
| try: |
| if torch.cuda.is_available(): |
| device = "cuda" |
| except Exception: |
| device = "cpu" |
|
|
| model = KModel().to(device).eval() |
| pipelines = {"a": KPipeline(lang_code="a", model=False)} |
| |
| try: |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" |
| except Exception: |
| pass |
|
|
| _KOKORO_STATE.update( |
| { |
| "initialized": True, |
| "device": device, |
| "model": model, |
| "pipelines": pipelines, |
| } |
| ) |
|
|
|
|
| def List_Kokoro_Voices() -> List[str]: |
| """ |
| Get a list of all available Kokoro voice identifiers. |
| |
| This MCP tool helps clients discover the 54 available voice options |
| for the Generate_Speech tool. |
| |
| Returns: |
| List[str]: A list of voice identifiers (e.g., ["af_heart", "am_adam", "bf_alice", ...]) |
| |
| Voice naming convention: |
| - First 2 letters: Language/Region (af=American Female, am=American Male, bf=British Female, etc.) |
| - Following letters: Voice name (heart, adam, alice, etc.) |
| |
| Available categories: |
| - American Female/Male (20 voices) |
| - British Female/Male (8 voices) |
| - European Female/Male (3 voices) |
| - French Female (1 voice) |
| - Hindi Female/Male (4 voices) |
| - Italian Female/Male (2 voices) |
| - Japanese Female/Male (5 voices) |
| - Portuguese Female/Male (3 voices) |
| - Chinese Female/Male (8 voices) |
| """ |
| return get_kokoro_voices() |
|
|
|
|
| def Generate_Speech( |
| text: Annotated[str, "The text to synthesize (English)."], |
| speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25, |
| voice: Annotated[str, "Voice identifier from 54 available options."] = "af_heart", |
| ) -> Tuple[int, np.ndarray]: |
| """ |
| Synthesize speech from text using the Kokoro-82M TTS model. |
| |
| This function returns raw audio suitable for a Gradio Audio component and is |
| also exposed as an MCP tool. It supports 54 different voices across multiple |
| languages and accents including American, British, European, Hindi, Italian, |
| Japanese, Portuguese, and Chinese speakers. |
| |
| Args: |
| text (str): The text to synthesize. Works best with English but supports multiple languages. |
| speed (float): Speech speed multiplier in 0.5–2.0; 1.0 = normal speed. Default: 1.25 (slightly brisk). |
| voice (str): Voice identifier from 54 available options. Default: 'af_heart'. |
| |
| Returns: |
| A tuple of (sample_rate_hz, audio_waveform) where: |
| - sample_rate_hz: int sample rate in Hz (24_000) |
| - audio_waveform: numpy.ndarray float32 mono waveform in range [-1, 1] |
| """ |
| _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice) |
| if not text or not text.strip(): |
| try: |
| _log_call_end("Generate_Speech", "error=empty text") |
| finally: |
| pass |
| raise gr.Error("Please provide non-empty text to synthesize.") |
|
|
| _init_kokoro() |
| model = _KOKORO_STATE["model"] |
| pipelines = _KOKORO_STATE["pipelines"] |
|
|
| pipeline = pipelines.get("a") |
| if pipeline is None: |
| raise gr.Error("Kokoro English pipeline not initialized.") |
|
|
| |
| audio_segments = [] |
| pack = pipeline.load_voice(voice) |
|
|
| try: |
| |
| segments = list(pipeline(text, voice, speed)) |
| total_segments = len(segments) |
|
|
| |
| for segment_idx, (text_chunk, ps, _) in enumerate(segments): |
| ref_s = pack[len(ps) - 1] |
| try: |
| audio = model(ps, ref_s, float(speed)) |
| audio_segments.append(audio.detach().cpu().numpy()) |
|
|
| |
| if total_segments > 10 and (segment_idx + 1) % 5 == 0: |
| print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") |
|
|
| except Exception as e: |
| raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {str(e)}") |
|
|
| if not audio_segments: |
| raise gr.Error("No audio was generated (empty synthesis result).") |
|
|
| |
| if len(audio_segments) == 1: |
| final_audio = audio_segments[0] |
| else: |
| final_audio = np.concatenate(audio_segments, axis=0) |
| |
| duration = len(final_audio) / 24_000 |
| if total_segments > 1: |
| print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") |
|
|
| |
| _log_call_end("Generate_Speech", f"samples={final_audio.shape[0]} duration_sec={len(final_audio)/24_000:.2f}") |
| return 24_000, final_audio |
|
|
| except gr.Error as e: |
| _log_call_end("Generate_Speech", f"gr_error={str(e)}") |
| raise |
| except Exception as e: |
| _log_call_end("Generate_Speech", f"error={str(e)[:120]}") |
| raise gr.Error(f"Error during speech generation: {str(e)}") |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| MEMORY_FILE = os.path.join(os.path.dirname(__file__), "memories.json") |
| _MEMORY_LOCK = threading.RLock() |
| _MAX_MEMORIES = 10_000 |
|
|
|
|
| def _now_iso() -> str: |
| return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
| def _load_memories() -> List[Dict[str, str]]: |
| """Internal helper: load memory list from disk. |
| |
| Returns an empty list if the file does not exist or is unreadable. |
| If the JSON is corrupted, a *.corrupt backup is written once and a |
| fresh empty list is returned (fail‑open philosophy for tool usage). |
| """ |
| if not os.path.exists(MEMORY_FILE): |
| return [] |
| try: |
| with open(MEMORY_FILE, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| if isinstance(data, list): |
| |
| cleaned: List[Dict[str, str]] = [] |
| for item in data: |
| if isinstance(item, dict) and "id" in item and "text" in item: |
| cleaned.append(item) |
| return cleaned |
| return [] |
| except Exception: |
| |
| try: |
| backup = MEMORY_FILE + ".corrupt" |
| if not os.path.exists(backup): |
| os.replace(MEMORY_FILE, backup) |
| except Exception: |
| pass |
| return [] |
|
|
|
|
| def _save_memories(memories: List[Dict[str, str]]) -> None: |
| """Persist memory list atomically to disk (write temp then replace).""" |
| tmp_path = MEMORY_FILE + ".tmp" |
| with open(tmp_path, "w", encoding="utf-8") as f: |
| json.dump(memories, f, ensure_ascii=False, indent=2) |
| os.replace(tmp_path, MEMORY_FILE) |
|
|
|
|
| def _mem_save( |
| text: Annotated[str, "Raw textual content to remember (will be stored verbatim)."], |
| tags: Annotated[str, "Optional comma-separated tags for lightweight categorization (e.g. 'user, preference')."] = "", |
| ) -> str: |
| """(Internal) Persist a new memory record. |
| |
| Summary: |
| Adds a memory object to the local JSON store (no external database). |
| |
| Stored Fields: |
| - id (str, UUID4) |
| - text (str, verbatim user content) |
| - timestamp (UTC "YYYY-MM-DD HH:MM:SS") |
| - tags (str, original comma-separated tag string) |
| |
| Behavior / Rules: |
| 1. Whitespace is trimmed; empty text is rejected. |
| 2. If the most recent existing memory has identical text, the new one is skipped (light dedupe heuristic). |
| 3. When total entries exceed _MAX_MEMORIES, oldest entries are pruned (soft cap). |
| 4. Operation is protected by an in‑process reentrant lock only (no cross‑process locking). |
| |
| Returns: |
| str: Human readable confirmation containing the new memory UUID (full or prefix |
| |
| Security / Privacy: |
| Data is plaintext JSON on local disk; do NOT store secrets or regulated data. |
| """ |
| text_clean = (text or "").strip() |
| if not text_clean: |
| return "Error: memory text is empty." |
|
|
| with _MEMORY_LOCK: |
| memories = _load_memories() |
| if memories and memories[-1].get("text") == text_clean: |
| return "Skipped: identical to last stored memory." |
|
|
| mem_id = str(uuid.uuid4()) |
| entry = { |
| "id": mem_id, |
| "text": text_clean, |
| "timestamp": _now_iso(), |
| "tags": tags.strip(), |
| } |
| memories.append(entry) |
| if len(memories) > _MAX_MEMORIES: |
| |
| overflow = len(memories) - _MAX_MEMORIES |
| memories = memories[overflow:] |
| _save_memories(memories) |
| return f"Memory saved: {mem_id}" |
|
|
|
|
| def _mem_list( |
| limit: Annotated[int, "Maximum number of most recent memories to return (1–200)."] = 20, |
| include_tags: Annotated[bool, "If true, include tags column in output."] = True, |
| ) -> str: |
| """(Internal) List most recent memories. |
| |
| Parameters: |
| limit (int): Max rows to return; clamped to [1, 200]. |
| include_tags (bool): Include tags section when True. |
| |
| Output Format (one per line): |
| <uuid_prefix> [YYYY-MM-DD HH:MM:SS] <text> | tags: <tag list> |
| (Tag column omitted if empty or include_tags=False.) |
| |
| Returns: |
| str: Joined newline string or a friendly "No memories stored." message. |
| """ |
| limit = max(1, min(200, limit)) |
| with _MEMORY_LOCK: |
| memories = _load_memories() |
| if not memories: |
| return "No memories stored yet." |
| |
| chosen = memories[-limit:][::-1] |
| lines: List[str] = [] |
| for m in chosen: |
| base = f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" |
| if include_tags and m.get("tags"): |
| base += f" | tags: {m['tags']}" |
| lines.append(base) |
| omitted = len(memories) - len(chosen) |
| if omitted > 0: |
| lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})") |
| return "\n".join(lines) |
|
|
|
|
| def _mem_search( |
| query: Annotated[str, "Case-insensitive substring search; space-separated terms are ANDed."], |
| limit: Annotated[int, "Maximum number of matches (1–200)."] = 20, |
| ) -> str: |
| """(Internal) Full-text style AND search across text and tags. |
| |
| Search Semantics: |
| - Split query on whitespace into individual terms. |
| - A memory matches only if EVERY term appears (case-insensitive) in the text OR tags field. |
| - Results are ordered newest-first (descending timestamp). |
| |
| Parameters: |
| query (str): Raw user query string; must contain at least one non-space character. |
| limit (int): Max rows to return; clamped to [1, 200]. |
| |
| Returns: |
| str: Formatted lines identical to _mem_list output or "No matches". |
| """ |
| q = (query or "").strip() |
| if not q: |
| return "Error: empty query." |
| terms = [t.lower() for t in q.split() if t.strip()] |
| if not terms: |
| return "Error: no valid search terms." |
| limit = max(1, min(200, limit)) |
| with _MEMORY_LOCK: |
| memories = _load_memories() |
| |
| matches: List[Dict[str, str]] = [] |
| total_matches = 0 |
| for m in reversed(memories): |
| hay = (m.get("text", "") + " " + m.get("tags", "")).lower() |
| if all(t in hay for t in terms): |
| total_matches += 1 |
| if len(matches) < limit: |
| matches.append(m) |
| if not matches: |
| return f"No matches for: {query}" |
| lines = [ |
| f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" + (f" | tags: {m['tags']}" if m.get('tags') else "") |
| for m in matches |
| ] |
| omitted = total_matches - len(matches) |
| if omitted > 0: |
| lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})") |
| return "\n".join(lines) |
|
|
|
|
| def _mem_delete( |
| memory_id: Annotated[str, "Full UUID or a unique prefix (>=4 chars) of the memory id to delete."], |
| ) -> str: |
| """(Internal) Delete one memory by UUID or unique prefix. |
| |
| Parameters: |
| memory_id (str): Full UUID4 (preferred) OR a unique prefix (>=4 chars). If prefix is ambiguous, no deletion occurs. |
| |
| Returns: |
| str: One of: success message, ambiguity notice, or not-found message. |
| |
| Safety: |
| Ambiguous prefixes are rejected to prevent accidental mass deletion. |
| """ |
| key = (memory_id or "").strip().lower() |
| if len(key) < 4: |
| return "Error: supply at least 4 characters of the id." |
| with _MEMORY_LOCK: |
| memories = _load_memories() |
| matched = [m for m in memories if m["id"].lower().startswith(key)] |
| if not matched: |
| return "Memory not found." |
| if len(matched) > 1 and key != matched[0]["id"].lower(): |
| |
| sample = ", ".join(m["id"][:8] for m in matched[:5]) |
| more = "…" if len(matched) > 5 else "" |
| return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters." |
| |
| target_id = matched[0]["id"] |
| memories = [m for m in memories if m["id"] != target_id] |
| _save_memories(memories) |
| return f"Deleted memory: {target_id}" |
|
|
|
|
| |
| |
| |
|
|
| |
| fetch_interface = gr.Interface( |
| fn=Fetch_Webpage, |
| inputs=[ |
| gr.Textbox(label="URL", placeholder="https://example.com/article"), |
| gr.Dropdown( |
| label="Verbosity", |
| choices=["Brief", "Standard", "Full"], |
| value="Standard", |
| info="Brief: 1000 chars, Standard: 3000 chars, Full: complete page" |
| ), |
| ], |
| outputs=gr.Markdown(label="Extracted Markdown"), |
| title="Fetch Webpage", |
| description=( |
| "<div style=\"text-align:center\">Convert any webpage to clean Markdown format with configurable length, preserving structure and formatting while removing navigation and clutter.</div>" |
| ), |
| api_description=( |
| "Fetch a web page and return it converted to Markdown format with configurable length. " |
| "Parameters: url (str - absolute URL), verbosity (str - Brief/Standard/Full controlling output length: Brief=1000 chars, Standard=3000 chars, Full=complete page)." |
| ), |
| flagging_mode="never", |
| ) |
|
|
| |
| concise_interface = gr.Interface( |
| fn=Search_DuckDuckGo, |
| inputs=[ |
| gr.Textbox(label="Query", placeholder="topic OR site:example.com"), |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), |
| ], |
| outputs=gr.Textbox(label="Search Results", interactive=False), |
| title="DuckDuckGo Search", |
| description=( |
| "<div style=\"text-align:center\">Web search with readable output format. Supports advanced search operators.</div>" |
| ), |
| api_description=( |
| "Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. " |
| "Supports advanced search operators: site: for specific domains, quotes for exact phrases, " |
| "OR for alternatives, and - to exclude terms. Examples: 'Python programming', 'site:example.com', " |
| "'\"artificial intelligence\"', 'cats -dogs', 'Python OR JavaScript'." |
| ), |
| flagging_mode="never", |
| submit_btn="Search", |
| ) |
|
|
| |
|
|
| |
| code_interface = gr.Interface( |
| fn=Execute_Python, |
| inputs=gr.Code(label="Python Code", language="python"), |
| outputs=gr.Textbox(label="Output"), |
| title="Python Code Executor", |
| description=( |
| "<div style=\"text-align:center\">Execute Python code and see the output.</div>" |
| ), |
| api_description=( |
| "Execute arbitrary Python code and return captured stdout or an error message. " |
| "Supports any valid Python code including imports, variables, functions, loops, and calculations. " |
| "Examples: 'print(2+2)', 'import math; print(math.sqrt(16))', 'for i in range(3): print(i)'. " |
| "Parameters: code (str - Python source code to execute). " |
| "Returns: Combined stdout output or exception text if execution fails." |
| ), |
| flagging_mode="never", |
| ) |
|
|
| CSS_STYLES = """ |
| .gradio-container h1 { |
| text-align: center; |
| /* Ensure main title appears first, then our two subtitle lines */ |
| display: grid; |
| justify-items: center; |
| } |
| /* Place bold tools list on line 2, normal auth note on line 3 (below title) */ |
| .gradio-container h1::before { |
| grid-row: 2; |
| content: "Fetch Webpage | Search DuckDuckGo | Python Interpreter | Memory Manager | Kokoro TTS | Image Generation | Video Generation"; |
| display: block; |
| font-size: 1rem; |
| font-weight: 700; |
| opacity: 0.9; |
| margin-top: 6px; |
| white-space: pre-wrap; |
| } |
| .gradio-container h1::after { |
| grid-row: 3; |
| content: "Authentication is optional. Image/Video generation require an HF token to function and may be hidden from MCP tools without one — but UI tabs remain visible. Memory is intended for local use and may be hidden from MCP tools."; |
| display: block; |
| font-size: 1rem; |
| font-weight: 400; |
| opacity: 0.9; |
| margin-top: 2px; |
| white-space: pre-wrap; |
| } |
| |
| /* Remove inside tab panels so it doesn't duplicate under each tool title */ |
| .gradio-container [role=\"tabpanel\"] h1::before, |
| .gradio-container [role=\"tabpanel\"] h1::after { |
| content: none !important; |
| } |
| """ |
|
|
| |
| available_voices = get_kokoro_voices() |
| kokoro_interface = gr.Interface( |
| fn=Generate_Speech, |
| inputs=[ |
| gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), |
| gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"), |
| gr.Dropdown( |
| label="Voice", |
| choices=available_voices, |
| value="af_heart", |
| info="Select from 54 available voices across multiple languages and accents" |
| ), |
| ], |
| outputs=gr.Audio(label="Audio", type="numpy", format="wav", show_download_button=True), |
| title="Kokoro TTS", |
| description=( |
| "<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>" |
| ), |
| api_description=( |
| "Synthesize speech from text using Kokoro-82M TTS model. Returns (sample_rate, waveform) suitable for playback. " |
| "Supports unlimited text length by processing all segments. Voice examples: 'af_heart' (US female), 'am_onyx' (US male), " |
| "'bf_emma' (British female), 'af_sky' (US female), 'af_nicole' (US female), " |
| "Parameters: text (str), speed (float 0.5–2.0, default 1.25x), voice (str from 54 available options, default 'af_heart'). " |
| "Return the generated media to the user in this format ``" |
| ), |
| flagging_mode="never", |
| ) |
|
|
| def Memory_Manager( |
| action: Annotated[Literal["save","list","search","delete"], "Action to perform: save | list | search | delete"], |
| text: Annotated[Optional[str], "Text content (Save only)"] = None, |
| tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, |
| query: Annotated[Optional[str], "Search query terms (Search only)"] = None, |
| limit: Annotated[int, "Max results (List/Search only)"] = 20, |
| memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None, |
| include_tags: Annotated[bool, "Include tags (List/Search only)"] = True, |
| ) -> str: |
| """Manage lightweight local JSON “memories” (save | list | search | delete) in one MCP tool. |
| |
| Overview: |
| This tool provides simple, local, append‑only style persistence for short text memories |
| with optional tags. Data is stored in a plaintext JSON file ("memories.json") beside the |
| application; no external database or network access is required. |
| |
| Supported Actions: |
| - save : Store a new memory (requires 'text'; optional 'tags'). |
| - list : Return the most recent memories (respects 'limit' + 'include_tags'). |
| - search : AND match space‑separated terms across text and tags (uses 'query', 'limit'). |
| - delete : Remove one memory by full UUID or unique prefix (uses 'memory_id'). |
| |
| Parameter Usage by Action: |
| action=save -> text (required), tags (optional) |
| action=list -> limit, include_tags |
| action=search -> query (required), limit, include_tags |
| action=delete -> memory_id (required) |
| |
| Parameters: |
| action (Literal[save|list|search|delete]): Operation selector (case-insensitive). |
| text (str): Raw memory content; leading/trailing whitespace trimmed (save only). |
| tags (str): Optional comma-separated tags; stored verbatim (save only). |
| query (str): Space-separated terms (AND logic, case-insensitive) across text+tags (search only). |
| limit (int): Maximum rows for list/search (clamped internally to 1–200). |
| memory_id (str): Full UUID or unique prefix (>=4 chars) (delete only). |
| include_tags (bool): When True, show tag column in list/search output. |
| |
| Storage Format (per entry): |
| {"id": "<uuid4>", "text": "<original text>", "timestamp": "YYYY-MM-DD HH:MM:SS", "tags": "tag1, tag2"} |
| |
| Lifecycle & Constraints: |
| - A soft cap of {_MAX_MEMORIES} entries is enforced by pruning oldest records on save. |
| - A light duplicate guard skips saving if the newest existing entry has identical text. |
| - All operations are protected by a thread‑local reentrant lock (NOT multi‑process safe). |
| |
| Returns: |
| str: Human‑readable status / result lines (never raw JSON) suitable for direct model consumption. |
| |
| Error Modes: |
| - Invalid action -> error string. |
| - Missing required field for the chosen action -> explanatory message. |
| - Ambiguous or unknown memory_id on delete -> clarification message. |
| |
| Security & Privacy: |
| Plaintext JSON; do not store secrets, credentials, or regulated personal data. |
| """ |
| act = (action or "").lower().strip() |
|
|
| |
| text = text or "" |
| tags = tags or "" |
| query = query or "" |
| memory_id = memory_id or "" |
|
|
| if act == "save": |
| if not text.strip(): |
| return "Error: 'text' is required when action=save." |
| return _mem_save(text=text, tags=tags) |
| if act == "list": |
| return _mem_list(limit=limit, include_tags=include_tags) |
| if act == "search": |
| if not query.strip(): |
| return "Error: 'query' is required when action=search." |
| return _mem_search(query=query, limit=limit) |
| if act == "delete": |
| if not memory_id.strip(): |
| return "Error: 'memory_id' is required when action=delete." |
| return _mem_delete(memory_id=memory_id) |
| return "Error: invalid action (use save|list|search|delete)." |
|
|
| memory_interface = gr.Interface( |
| fn=Memory_Manager, |
| inputs=[ |
| gr.Dropdown(label="Action", choices=["save","list","search","delete"], value="list"), |
| gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"), |
| gr.Textbox(label="Tags", placeholder="tag1, tag2"), |
| gr.Textbox(label="Query", placeholder="Search terms (search)"), |
| gr.Slider(1, 200, value=20, step=1, label="Limit"), |
| gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)"), |
| gr.Checkbox(value=True, label="Include Tags"), |
| ], |
| outputs=gr.Textbox(label="Result", lines=14), |
| title="Memory Manager", |
| description=( |
| "<div style=\"text-align:center\">Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.</div>" |
| ), |
| api_description=( |
| "Manage short text memories with optional tags. Actions: save(text,tags), list(limit,include_tags), " |
| "search(query,limit,include_tags), delete(memory_id). Returns plaintext JSON. Action parameter is always required. " |
| "Use Memory_Manager whenever you are given information worth remembering about the user, and search for memories when relevant." |
| ), |
| flagging_mode="never", |
| |
| show_api=bool(os.getenv("HF_READ_TOKEN")), |
| ) |
|
|
| |
| |
| |
|
|
| HF_API_TOKEN = os.getenv("HF_READ_TOKEN") |
|
|
|
|
| def Generate_Image( |
| prompt: Annotated[str, "Text description of the image to generate."], |
| model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name' (e.g., black-forest-labs/FLUX.1-Krea-dev)."] = "black-forest-labs/FLUX.1-Krea-dev", |
| negative_prompt: Annotated[str, "What should NOT appear in the image." ] = ( |
| "(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " |
| "missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " |
| "mutated, ugly, disgusting, blurry, amputation, misspellings, typos" |
| ), |
| steps: Annotated[int, "Number of denoising steps (1–100). Higher = slower, potentially higher quality."] = 35, |
| cfg_scale: Annotated[float, "Classifier-free guidance scale (1–20). Higher = follow the prompt more closely."] = 7.0, |
| sampler: Annotated[str, "Sampling method label (UI only). Common options: 'DPM++ 2M Karras', 'DPM++ SDE Karras', 'Euler', 'Euler a', 'Heun', 'DDIM'."] = "DPM++ 2M Karras", |
| seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, |
| width: Annotated[int, "Output width in pixels (64–1216, multiple of 32 recommended)."] = 1024, |
| height: Annotated[int, "Output height in pixels (64–1216, multiple of 32 recommended)."] = 1024, |
| ) -> Image.Image: |
| """ |
| Generate a single image from a text prompt using a Hugging Face model via serverless inference. |
| |
| Args: |
| prompt (str): Text description of the image to generate. |
| model_id (str): The Hugging Face model id (creator/model-name). Defaults to "black-forest-labs/FLUX.1-Krea-dev". |
| negative_prompt (str): What should NOT appear in the image. |
| steps (int): Number of denoising steps (1–100). Higher can improve quality. |
| cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. |
| sampler (str): Sampling method label for UI; not all providers expose this control. |
| seed (int): Random seed. Use -1 to randomize on each call. |
| width (int): Output width in pixels (64–1216; multiples of 32 recommended). |
| height (int): Output height in pixels (64–1216; multiples of 32 recommended). |
| |
| Returns: |
| PIL.Image.Image: The generated image. |
| |
| Error modes: |
| - Raises gr.Error with a user-friendly message on auth/model/load errors. |
| """ |
| _log_call_start("Generate_Image", prompt=_truncate_for_log(prompt, 200), model_id=model_id, steps=steps, cfg_scale=cfg_scale, seed=seed, size=f"{width}x{height}") |
| if not prompt or not prompt.strip(): |
| _log_call_end("Generate_Image", "error=empty prompt") |
| raise gr.Error("Please provide a non-empty prompt.") |
|
|
| |
| enhanced_prompt = f"{prompt} | ultra detail, ultra elaboration, ultra quality, perfect." |
|
|
| |
| providers = ["auto", "replicate", "fal-ai"] |
| last_error: Exception | None = None |
|
|
| for provider in providers: |
| try: |
| client = InferenceClient(api_key=HF_API_TOKEN, provider=provider) |
| image = client.text_to_image( |
| prompt=enhanced_prompt, |
| negative_prompt=negative_prompt, |
| model=model_id, |
| width=width, |
| height=height, |
| num_inference_steps=steps, |
| guidance_scale=cfg_scale, |
| seed=seed if seed != -1 else random.randint(1, 1_000_000_000), |
| ) |
| _log_call_end("Generate_Image", f"provider={provider} size={image.size}") |
| return image |
| except Exception as e: |
| last_error = e |
| continue |
|
|
| |
| msg = str(last_error) if last_error else "Unknown error" |
| if "404" in msg: |
| raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") |
| if "503" in msg: |
| raise gr.Error("The model is warming up. Please try again shortly.") |
| if "401" in msg or "403" in msg: |
| raise gr.Error("Authentication failed. Set HF_READ_TOKEN environment variable with access to the model.") |
| _log_call_end("Generate_Image", f"error={_truncate_for_log(msg, 200)}") |
| raise gr.Error(f"Image generation failed: {msg}") |
|
|
|
|
| image_generation_interface = gr.Interface( |
| fn=Generate_Image, |
| inputs=[ |
| gr.Textbox(label="Prompt", placeholder="Enter a prompt", lines=2), |
| gr.Textbox(label="Model", value="black-forest-labs/FLUX.1-Krea-dev", placeholder="creator/model-name"), |
| gr.Textbox( |
| label="Negative Prompt", |
| value=( |
| "(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " |
| "missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " |
| "mutated, ugly, disgusting, blurry, amputation, misspellings, typos" |
| ), |
| lines=2, |
| ), |
| gr.Slider(minimum=1, maximum=100, value=35, step=1, label="Steps"), |
| gr.Slider(minimum=1.0, maximum=20.0, value=7.0, step=0.1, label="CFG Scale"), |
| gr.Radio(label="Sampler", value="DPM++ 2M Karras", choices=[ |
| "DPM++ 2M Karras", "DPM++ SDE Karras", "Euler", "Euler a", "Heun", "DDIM" |
| ]), |
| gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), |
| gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Width"), |
| gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Height"), |
| ], |
| outputs=gr.Image(label="Generated Image"), |
| title="Image Generation", |
| description=( |
| "<div style=\"text-align:center\">Generate images via Hugging Face serverless inference. " |
| "Default model is FLUX.1-Krea-dev.</div>" |
| ), |
| api_description=( |
| "Generate a single image from a text prompt using a Hugging Face model via serverless inference. " |
| "Supports creative prompts like 'a serene mountain landscape at sunset', 'portrait of a wise owl', " |
| "'futuristic city with flying cars'. Default model: FLUX.1-Krea-dev. " |
| "Parameters: prompt (str), model_id (str, creator/model-name), negative_prompt (str), steps (int, 1–100), " |
| "cfg_scale (float, 1–20), sampler (str), seed (int, -1=random), width/height (int, 64–1216). " |
| "Returns a PIL.Image. Return the generated media to the user in this format ``" |
| ), |
| flagging_mode="never", |
| |
| show_api=bool(os.getenv("HF_READ_TOKEN")), |
| ) |
|
|
| |
| |
| |
|
|
| def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str: |
| """Write video bytes or iterable of bytes to a system temporary file and return its path. |
| |
| This avoids polluting the project directory. The file is created in the OS temp |
| location; Gradio will handle serving & offering the download button. |
| """ |
| fd, fname = tempfile.mkstemp(suffix=suffix) |
| try: |
| with os.fdopen(fd, "wb") as f: |
| if isinstance(data_iter_or_bytes, (bytes, bytearray)): |
| f.write(data_iter_or_bytes) |
| elif hasattr(data_iter_or_bytes, "read"): |
| f.write(data_iter_or_bytes.read()) |
| elif hasattr(data_iter_or_bytes, "content"): |
| f.write(data_iter_or_bytes.content) |
| elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)): |
| for chunk in data_iter_or_bytes: |
| if chunk: |
| f.write(chunk) |
| else: |
| raise gr.Error("Unsupported video data type returned by provider.") |
| except Exception: |
| |
| try: |
| os.remove(fname) |
| except Exception: |
| pass |
| raise |
| return fname |
|
|
|
|
| HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") |
|
|
|
|
| def Generate_Video( |
| prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."], |
| model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B", |
| negative_prompt: Annotated[str, "What should NOT appear in the video."] = "", |
| steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25, |
| cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5, |
| seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, |
| width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768, |
| height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768, |
| fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24, |
| duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0, |
| ) -> str: |
| """ |
| Generate a short video from a text prompt using a Hugging Face model via serverless inference. |
| |
| Args: |
| prompt (str): Text description of the video to generate. |
| model_id (str): The Hugging Face model id (creator/model-name). Defaults to "Wan-AI/Wan2.2-T2V-A14B". |
| negative_prompt (str): What should NOT appear in the video. |
| steps (int): Number of denoising steps (1–100). Higher can improve quality but is slower. |
| cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. |
| seed (int): Random seed. Use -1 to randomize on each call. |
| width (int): Output width in pixels. |
| height (int): Output height in pixels. |
| fps (int): Frames per second. |
| duration (float): Target duration in seconds. |
| |
| Returns: |
| str: Path to an MP4 file on disk (Gradio will serve this file; MCP converts it to a file URL). |
| |
| Error modes: |
| - Raises gr.Error with a user-friendly message on auth/model/load errors or unsupported parameters. |
| """ |
| _log_call_start("Generate_Video", prompt=_truncate_for_log(prompt, 160), model_id=model_id, steps=steps, cfg_scale=cfg_scale, fps=fps, duration=duration, size=f"{width}x{height}") |
| if not prompt or not prompt.strip(): |
| _log_call_end("Generate_Video", "error=empty prompt") |
| raise gr.Error("Please provide a non-empty prompt.") |
|
|
| if not HF_VIDEO_TOKEN: |
| |
| pass |
|
|
| providers = ["auto", "replicate", "fal-ai"] |
| last_error: Exception | None = None |
|
|
| |
| parameters = { |
| "negative_prompt": negative_prompt or None, |
| "num_inference_steps": steps, |
| "guidance_scale": cfg_scale, |
| "seed": seed if seed != -1 else random.randint(1, 1_000_000_000), |
| "width": width, |
| "height": height, |
| "fps": fps, |
| |
| |
| "duration": duration, |
| } |
|
|
| for provider in providers: |
| try: |
| client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider) |
| |
| if hasattr(client, "text_to_video"): |
| |
| num_frames = int(duration * fps) if duration and fps else None |
| |
| |
| extra_body = {} |
| if width: |
| extra_body["width"] = width |
| if height: |
| extra_body["height"] = height |
| if fps: |
| extra_body["fps"] = fps |
| if duration: |
| extra_body["duration"] = duration |
| |
| result = client.text_to_video( |
| prompt=prompt, |
| model=model_id, |
| guidance_scale=cfg_scale, |
| negative_prompt=[negative_prompt] if negative_prompt else None, |
| num_frames=num_frames, |
| num_inference_steps=steps, |
| seed=parameters["seed"], |
| extra_body=extra_body if extra_body else None, |
| ) |
| else: |
| |
| result = client.post( |
| model=model_id, |
| json={ |
| "inputs": prompt, |
| "parameters": {k: v for k, v in parameters.items() if v is not None}, |
| }, |
| ) |
|
|
| |
| path = _write_video_tmp(result, suffix=".mp4") |
| try: |
| size = os.path.getsize(path) |
| except Exception: |
| size = -1 |
| _log_call_end("Generate_Video", f"provider={provider} path={os.path.basename(path)} bytes={size}") |
| return path |
| except Exception as e: |
| last_error = e |
| continue |
|
|
| msg = str(last_error) if last_error else "Unknown error" |
| if "404" in msg: |
| raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.") |
| if "503" in msg: |
| raise gr.Error("The model is warming up. Please try again shortly.") |
| if "401" in msg or "403" in msg: |
| raise gr.Error("Authentication failed or not permitted. Set HF_READ_TOKEN/HF_TOKEN with inference access.") |
| _log_call_end("Generate_Video", f"error={_truncate_for_log(msg, 200)}") |
| raise gr.Error(f"Video generation failed: {msg}") |
|
|
|
|
| video_generation_interface = gr.Interface( |
| fn=Generate_Video, |
| inputs=[ |
| gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2), |
| gr.Textbox(label="Model", value="Wan-AI/Wan2.2-T2V-A14B", placeholder="creator/model-name"), |
| gr.Textbox(label="Negative Prompt", value="", lines=2), |
| gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"), |
| gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"), |
| gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), |
| gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"), |
| gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"), |
| gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"), |
| gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"), |
| ], |
| outputs=gr.Video(label="Generated Video", show_download_button=True, format="mp4"), |
| title="Video Generation", |
| description=( |
| "<div style=\"text-align:center\">Generate short videos via Hugging Face serverless inference. " |
| "Default model is Wan2.2-T2V-A14B.</div>" |
| ), |
| api_description=( |
| "Generate a short video from a text prompt using a Hugging Face model via serverless inference. " |
| "Create dynamic scenes like 'a red fox running through a snowy forest at sunrise', 'waves crashing on a rocky shore', " |
| "'time-lapse of clouds moving across a blue sky'. Default model: Wan2.2-T2V-A14B (2-6 second videos). " |
| "Parameters: prompt (str), model_id (str), negative_prompt (str), steps (int), cfg_scale (float), seed (int), " |
| "width/height (int), fps (int), duration (float in seconds). Returns MP4 file path. " |
| "Return the generated media to the user in this format ``" |
| ), |
| flagging_mode="never", |
| |
| show_api=bool(os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN")), |
| ) |
|
|
| _interfaces = [ |
| fetch_interface, |
| concise_interface, |
| code_interface, |
| memory_interface, |
| kokoro_interface, |
| image_generation_interface, |
| video_generation_interface, |
| ] |
| _tab_names = [ |
| "Fetch Webpage", |
| "DuckDuckGo Search", |
| "Python Code Executor", |
| "Memory Manager", |
| "Kokoro TTS", |
| "Image Generation", |
| "Video Generation", |
| ] |
|
|
| demo = gr.TabbedInterface( |
| interface_list=_interfaces, |
| tab_names=_tab_names, |
| title="Tools MCP", |
| theme="Nymbo/Nymbo_Theme", |
| css=CSS_STYLES, |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch(mcp_server=True) |