Spaces:
Sleeping
Sleeping
| # Purpose: One Space that offers up to seven tools/tabs (all exposed as MCP tools): | |
| # 1) Fetch — convert webpages to clean Markdown format | |
| # 2) DuckDuckGo Search — compact JSONL search output (short keys to minimize tokens) | |
| # 3) Python Code Executor — run Python code and capture stdout/errors | |
| # 4) Kokoro TTS — synthesize speech from text using Kokoro-82M with 54 voice options | |
| # 5) Memory Manager — lightweight JSON-based local memory store (requires HF_READ_TOKEN) | |
| # 6) Image Generation - HF serverless inference providers (requires HF_READ_TOKEN) | |
| # 7) Video Generation - HF serverless inference providers (requires HF_READ_TOKEN) | |
| from __future__ import annotations | |
| import re | |
| import json | |
| import sys | |
| import os | |
| import random | |
| from io import StringIO | |
| from typing import List, Dict, Tuple, Annotated, Literal, Optional | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify as md | |
| from readability import Document | |
| from urllib.parse import urlparse | |
| from ddgs import DDGS | |
| from PIL import Image | |
| from huggingface_hub import InferenceClient | |
| import time | |
| import tempfile | |
| import uuid | |
| import threading | |
| from datetime import datetime | |
| # Optional imports for Kokoro TTS (loaded lazily) | |
| import numpy as np | |
| try: | |
| import torch # type: ignore | |
| except Exception: # pragma: no cover - optional dependency | |
| torch = None # type: ignore | |
| try: | |
| from kokoro import KModel, KPipeline # type: ignore | |
| except Exception: # pragma: no cover - optional dependency | |
| KModel = None # type: ignore | |
| KPipeline = None # type: ignore | |
| # ============================== | |
| # Fetch: Enhanced HTTP + extraction utils | |
| # ============================== | |
| def _http_get_enhanced(url: str) -> requests.Response: | |
| """ | |
| Download the page with enhanced headers, timeout handling, and better error recovery. | |
| """ | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "DNT": "1", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| # Apply rate limiting | |
| _fetch_rate_limiter.acquire() | |
| try: | |
| response = requests.get( | |
| url, | |
| headers=headers, | |
| timeout=30, # Increased timeout | |
| allow_redirects=True, | |
| stream=False | |
| ) | |
| response.raise_for_status() | |
| return response | |
| except requests.exceptions.Timeout: | |
| raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") | |
| except requests.exceptions.ConnectionError: | |
| raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") | |
| except requests.exceptions.HTTPError as e: | |
| if response.status_code == 403: | |
| raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") | |
| elif response.status_code == 404: | |
| raise requests.exceptions.RequestException("Page not found. Please check the URL.") | |
| elif response.status_code == 429: | |
| raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") | |
| else: | |
| raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {str(e)}") | |
| def _normalize_whitespace(text: str) -> str: | |
| """ | |
| Squeeze extra spaces and blank lines to keep things compact. | |
| (Layman's terms: tidy up the text so it’s not full of weird spacing.) | |
| """ | |
| text = re.sub(r"[ \t\u00A0]+", " ", text) | |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) | |
| return text.strip() | |
| def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: | |
| """ | |
| Cut text if it gets too long; return the text and whether we trimmed. | |
| (Layman's terms: shorten long text and tell us if we had to cut it.) | |
| """ | |
| if max_chars is None or max_chars <= 0 or len(text) <= max_chars: | |
| return text, False | |
| return text[:max_chars].rstrip() + " …", True | |
| def _shorten(text: str, limit: int) -> str: | |
| """ | |
| Hard cap a string with an ellipsis to keep tokens small. | |
| (Layman's terms: force a string to a max length with an ellipsis.) | |
| """ | |
| if limit <= 0 or len(text) <= limit: | |
| return text | |
| return text[: max(0, limit - 1)].rstrip() + "…" | |
| def _domain_of(url: str) -> str: | |
| """ | |
| Show a friendly site name like "example.com". | |
| (Layman's terms: pull the website's domain.) | |
| """ | |
| try: | |
| return urlparse(url).netloc or "" | |
| except Exception: | |
| return "" | |
| def _meta(soup: BeautifulSoup, name: str) -> str | None: | |
| tag = soup.find("meta", attrs={"name": name}) | |
| return tag.get("content") if tag and tag.has_attr("content") else None | |
| def _og(soup: BeautifulSoup, prop: str) -> str | None: | |
| tag = soup.find("meta", attrs={"property": prop}) | |
| return tag.get("content") if tag and tag.has_attr("content") else None | |
| def _extract_metadata(soup: BeautifulSoup, final_url: str) -> Dict[str, str]: | |
| """ | |
| Pull the useful bits: title, description, site name, canonical URL, language, etc. | |
| (Layman's terms: gather page basics like title/description/address.) | |
| """ | |
| meta: Dict[str, str] = {} | |
| # Title preference: <title> > og:title > twitter:title | |
| title_candidates = [ | |
| (soup.title.string if soup.title and soup.title.string else None), | |
| _og(soup, "og:title"), | |
| _meta(soup, "twitter:title"), | |
| ] | |
| meta["title"] = next((t.strip() for t in title_candidates if t and t.strip()), "") | |
| # Description preference: description > og:description > twitter:description | |
| desc_candidates = [ | |
| _meta(soup, "description"), | |
| _og(soup, "og:description"), | |
| _meta(soup, "twitter:description"), | |
| ] | |
| meta["description"] = next((d.strip() for d in desc_candidates if d and d.strip()), "") | |
| # Canonical link (helps dedupe) | |
| link_canonical = soup.find("link", rel=lambda v: v and "canonical" in v) | |
| meta["canonical"] = (link_canonical.get("href") or "").strip() if link_canonical else "" | |
| # Site name + language info if present | |
| meta["site_name"] = (_og(soup, "og:site_name") or "").strip() | |
| html_tag = soup.find("html") | |
| meta["lang"] = (html_tag.get("lang") or "").strip() if html_tag else "" | |
| # Final URL + domain | |
| meta["fetched_url"] = final_url | |
| meta["domain"] = _domain_of(final_url) | |
| return meta | |
| def _extract_main_text(html: str) -> Tuple[str, BeautifulSoup]: | |
| """ | |
| Use Readability to isolate the main article and turn it into clean text. | |
| Returns (clean_text, soup_of_readable_html). | |
| (Layman's terms: find the real article text and clean it.) | |
| """ | |
| # Simplified article HTML from Readability | |
| doc = Document(html) | |
| readable_html = doc.summary(html_partial=True) | |
| # Parse simplified HTML | |
| s = BeautifulSoup(readable_html, "lxml") | |
| # Remove noisy tags | |
| for sel in ["script", "style", "noscript", "iframe", "svg"]: | |
| for tag in s.select(sel): | |
| tag.decompose() | |
| # Keep paragraphs, list items, and subheadings for structure without bloat | |
| text_parts: List[str] = [] | |
| for p in s.find_all(["p", "li", "h2", "h3", "h4", "blockquote"]): | |
| chunk = p.get_text(" ", strip=True) | |
| if chunk: | |
| text_parts.append(chunk) | |
| clean_text = _normalize_whitespace("\n\n".join(text_parts)) | |
| return clean_text, s | |
| def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str) -> str: | |
| # Remove unwanted elements globally first | |
| for element in full_soup.select("script, style, nav, footer, header, aside"): | |
| element.decompose() | |
| # Try common main-content containers, then fallback to body | |
| main = ( | |
| full_soup.find("main") | |
| or full_soup.find("article") | |
| or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) | |
| or full_soup.find("body") | |
| ) | |
| if not main: | |
| return "No main content found on the webpage." | |
| # Convert selected HTML to Markdown | |
| markdown_text = md(str(main), heading_style="ATX") | |
| # Clean up the markdown similar to web-scraper | |
| markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) | |
| markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) # empty links | |
| markdown_text = re.sub(r"[ \t]+", " ", markdown_text) | |
| markdown_text = markdown_text.strip() | |
| # Add title if present | |
| title = full_soup.find("title") | |
| if title and title.get_text(strip=True): | |
| markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" | |
| return markdown_text or "No content could be extracted." | |
| def _truncate_markdown(markdown: str, max_chars: int) -> str: | |
| """ | |
| Truncate markdown content to a maximum character count while preserving structure. | |
| Tries to break at paragraph boundaries when possible. | |
| """ | |
| if len(markdown) <= max_chars: | |
| return markdown | |
| # Find a good break point near the limit | |
| truncated = markdown[:max_chars] | |
| # Try to break at the end of a paragraph (double newline) | |
| last_paragraph = truncated.rfind('\n\n') | |
| if last_paragraph > max_chars * 0.7: # If we find a paragraph break in the last 30% | |
| truncated = truncated[:last_paragraph] | |
| # Try to break at the end of a sentence | |
| elif '.' in truncated[-100:]: # Look for a period in the last 100 chars | |
| last_period = truncated.rfind('.') | |
| if last_period > max_chars * 0.8: # If we find a period in the last 20% | |
| truncated = truncated[:last_period + 1] | |
| return truncated.rstrip() + "\n\n> *[Content truncated for brevity]*" | |
| def Fetch_Webpage( # <-- MCP tool #1 (Fetch) | |
| url: Annotated[str, "The absolute URL to fetch (must return HTML)."], | |
| verbosity: Annotated[str, "Controls output length: 'Brief' (1000 chars), 'Standard' (3000 chars), or 'Full' (complete page)."] = "Standard", | |
| ) -> str: | |
| """ | |
| Fetch a web page and return it converted to Markdown format with configurable length. | |
| This function retrieves a webpage and converts its main content to clean Markdown, | |
| preserving headings, formatting, and structure. It automatically removes navigation, | |
| footers, scripts, and other non-content elements to focus on the main article or | |
| content area. | |
| Args: | |
| url (str): The absolute URL to fetch (must return HTML). | |
| verbosity (str): Controls output length: | |
| - "Brief": Truncate to 1000 characters for quick summaries | |
| - "Standard": Truncate to 3000 characters for balanced content | |
| - "Full": Return complete page content with no length limit | |
| Returns: | |
| str: The webpage content converted to Markdown format with: | |
| - Page title as H1 header | |
| - Main content converted to clean Markdown | |
| - Preserved heading hierarchy | |
| - Clean formatting without navigation/sidebar elements | |
| - Length controlled by verbosity setting | |
| """ | |
| _log_call_start("Fetch_Webpage", url=url, verbosity=verbosity) | |
| if not url or not url.strip(): | |
| result = "Please enter a valid URL." | |
| _log_call_end("Fetch_Webpage", _truncate_for_log(result)) | |
| return result | |
| try: | |
| resp = _http_get_enhanced(url) | |
| resp.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| result = f"An error occurred: {e}" | |
| _log_call_end("Fetch_Webpage", _truncate_for_log(result)) | |
| return result | |
| final_url = str(resp.url) | |
| ctype = resp.headers.get("Content-Type", "") | |
| if "html" not in ctype.lower(): | |
| result = f"Unsupported content type for extraction: {ctype or 'unknown'}" | |
| _log_call_end("Fetch_Webpage", _truncate_for_log(result)) | |
| return result | |
| # Decode to text | |
| resp.encoding = resp.encoding or resp.apparent_encoding | |
| html = resp.text | |
| # Parse HTML and convert to full-page Markdown | |
| full_soup = BeautifulSoup(html, "lxml") | |
| markdown_content = _fullpage_markdown_from_soup(full_soup, final_url) | |
| # Apply verbosity-based truncation | |
| if verbosity == "Brief": | |
| result = _truncate_markdown(markdown_content, 1000) | |
| elif verbosity == "Standard": | |
| result = _truncate_markdown(markdown_content, 3000) | |
| else: # "Full" | |
| result = markdown_content | |
| _log_call_end("Fetch_Webpage", f"markdown_chars={len(result)}") | |
| return result | |
| # ============================================ | |
| # DuckDuckGo Search: Enhanced with error handling & rate limiting | |
| # ============================================ | |
| import asyncio | |
| from datetime import datetime, timedelta | |
| class RateLimiter: | |
| def __init__(self, requests_per_minute: int = 30): | |
| self.requests_per_minute = requests_per_minute | |
| self.requests = [] | |
| def acquire(self): | |
| """Synchronous rate limiting for non-async context""" | |
| now = datetime.now() | |
| # Remove requests older than 1 minute | |
| self.requests = [ | |
| req for req in self.requests if now - req < timedelta(minutes=1) | |
| ] | |
| if len(self.requests) >= self.requests_per_minute: | |
| # Wait until we can make another request | |
| wait_time = 60 - (now - self.requests[0]).total_seconds() | |
| if wait_time > 0: | |
| time.sleep(max(1, wait_time)) # At least 1 second wait | |
| self.requests.append(now) | |
| # Global rate limiters | |
| _search_rate_limiter = RateLimiter(requests_per_minute=20) | |
| _fetch_rate_limiter = RateLimiter(requests_per_minute=25) | |
| # ============================== | |
| # Logging Helpers (print I/O to terminal) | |
| # ============================== | |
| def _truncate_for_log(value: str, limit: int = 500) -> str: | |
| """Truncate long strings for concise terminal logging.""" | |
| if len(value) <= limit: | |
| return value | |
| return value[:limit - 1] + "…" | |
| def _serialize_input(val): # type: ignore[return-any] | |
| """Best-effort compact serialization of arbitrary input values for logging.""" | |
| try: | |
| if isinstance(val, (str, int, float, bool)) or val is None: | |
| return val | |
| if isinstance(val, (list, tuple)): | |
| return [_serialize_input(v) for v in list(val)[:10]] + (["…"] if len(val) > 10 else []) # type: ignore[index] | |
| if isinstance(val, dict): | |
| out = {} | |
| for i, (k, v) in enumerate(val.items()): | |
| if i >= 12: | |
| out["…"] = "…" | |
| break | |
| out[str(k)] = _serialize_input(v) | |
| return out | |
| return repr(val)[:120] | |
| except Exception: | |
| return "<unserializable>" | |
| def _log_call_start(func_name: str, **kwargs) -> None: | |
| try: | |
| compact = {k: _serialize_input(v) for k, v in kwargs.items()} | |
| print(f"[TOOL CALL] {func_name} inputs: {json.dumps(compact, ensure_ascii=False)[:800]}", flush=True) | |
| except Exception as e: # pragma: no cover - logging safety | |
| print(f"[TOOL CALL] {func_name} (failed to log inputs: {e})", flush=True) | |
| def _log_call_end(func_name: str, output_desc: str) -> None: | |
| try: | |
| print(f"[TOOL RESULT] {func_name} output: {output_desc}", flush=True) | |
| except Exception as e: # pragma: no cover | |
| print(f"[TOOL RESULT] {func_name} (failed to log output: {e})", flush=True) | |
| def Search_DuckDuckGo( # <-- MCP tool #2 (DDG Search) | |
| query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], | |
| max_results: Annotated[int, "Number of results to return (1–20)."] = 5, | |
| ) -> str: | |
| """ | |
| Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. | |
| Args: | |
| query (str): The search query string. Supports operators like site:, quotes for exact matching, | |
| OR for alternatives, and other DuckDuckGo search syntax. | |
| Examples: | |
| - Basic search: "Python programming" | |
| - Site search: "site:example.com" | |
| - Exact phrase: "artificial intelligence" | |
| - Exclude terms: "cats -dogs" | |
| max_results (int): Number of results to return (1–20). Default: 5. | |
| Returns: | |
| str: Search results in readable format with titles, URLs, and snippets as a numbered list. | |
| """ | |
| _log_call_start("Search_DuckDuckGo", query=query, max_results=max_results) | |
| if not query or not query.strip(): | |
| result = "No search query provided. Please enter a search term." | |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | |
| return result | |
| # Validate max_results | |
| max_results = max(1, min(20, max_results)) | |
| try: | |
| # Apply rate limiting to avoid being blocked | |
| _search_rate_limiter.acquire() | |
| # Perform search with timeout handling | |
| with DDGS() as ddgs: | |
| raw = ddgs.text(query, max_results=max_results) | |
| except Exception as e: | |
| error_msg = f"Search failed: {str(e)[:200]}" | |
| if "blocked" in str(e).lower() or "rate" in str(e).lower(): | |
| error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." | |
| elif "timeout" in str(e).lower(): | |
| error_msg = "Search timed out. Please try again with a simpler query." | |
| elif "network" in str(e).lower() or "connection" in str(e).lower(): | |
| error_msg = "Network connection error. Please check your internet connection and try again." | |
| result = f"Error: {error_msg}" | |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | |
| return result | |
| if not raw: | |
| result = f"No results found for query: {query}" | |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | |
| return result | |
| results = [] | |
| for r in raw or []: | |
| title = (r.get("title") or "").strip() | |
| url = (r.get("href") or r.get("link") or "").strip() | |
| body = (r.get("body") or r.get("snippet") or "").strip() | |
| if not url: | |
| continue | |
| result_obj = { | |
| "title": title or _domain_of(url), | |
| "url": url, | |
| "snippet": body | |
| } | |
| results.append(result_obj) | |
| if not results: | |
| result = f"No valid results found for query: {query}" | |
| _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | |
| return result | |
| # Format output in readable format | |
| lines = [f"Found {len(results)} search results for: {query}\n"] | |
| for i, result in enumerate(results, 1): | |
| lines.append(f"{i}. {result['title']}") | |
| lines.append(f" URL: {result['url']}") | |
| if result['snippet']: | |
| lines.append(f" Summary: {result['snippet']}") | |
| lines.append("") # Empty line between results | |
| result = "\n".join(lines) | |
| _log_call_end("Search_DuckDuckGo", f"results={len(results)} chars={len(result)}") | |
| return result | |
| # ====================================== | |
| # Code Execution: Python (MCP tool #3) | |
| # ====================================== | |
| def Execute_Python(code: Annotated[str, "Python source code to run; stdout is captured and returned."]) -> str: | |
| """ | |
| Execute arbitrary Python code and return captured stdout or an error message. | |
| Args: | |
| code (str): Python source code to run; stdout is captured and returned. | |
| Returns: | |
| str: Combined stdout produced by the code, or the exception text if | |
| execution failed. | |
| """ | |
| _log_call_start("Execute_Python", code=_truncate_for_log(code or "", 300)) | |
| if code is None: | |
| result = "No code provided." | |
| _log_call_end("Execute_Python", result) | |
| return result | |
| old_stdout = sys.stdout | |
| redirected_output = sys.stdout = StringIO() | |
| try: | |
| exec(code) | |
| result = redirected_output.getvalue() | |
| except Exception as e: | |
| result = str(e) | |
| finally: | |
| sys.stdout = old_stdout | |
| _log_call_end("Execute_Python", _truncate_for_log(result)) | |
| return result | |
| # ========================== | |
| # Kokoro TTS (MCP tool #4) | |
| # ========================== | |
| _KOKORO_STATE = { | |
| "initialized": False, | |
| "device": "cpu", | |
| "model": None, | |
| "pipelines": {}, | |
| } | |
| def get_kokoro_voices(): | |
| """Get comprehensive list of available Kokoro voice IDs (54 total).""" | |
| try: | |
| from huggingface_hub import list_repo_files | |
| # Get voice files from the Kokoro repository | |
| files = list_repo_files('hexgrad/Kokoro-82M') | |
| voice_files = [f for f in files if f.endswith('.pt') and f.startswith('voices/')] | |
| voices = [f.replace('voices/', '').replace('.pt', '') for f in voice_files] | |
| return sorted(voices) if voices else _get_fallback_voices() | |
| except Exception: | |
| return _get_fallback_voices() | |
| def _get_fallback_voices(): | |
| """Return comprehensive fallback list of known Kokoro voices (54 total).""" | |
| return [ | |
| # American Female (11 voices) | |
| "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", | |
| "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", | |
| # American Male (9 voices) | |
| "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", | |
| "am_michael", "am_onyx", "am_puck", "am_santa", | |
| # British Female (4 voices) | |
| "bf_alice", "bf_emma", "bf_isabella", "bf_lily", | |
| # British Male (4 voices) | |
| "bm_daniel", "bm_fable", "bm_george", "bm_lewis", | |
| # European Female/Male (3 voices) | |
| "ef_dora", "em_alex", "em_santa", | |
| # French Female (1 voice) | |
| "ff_siwis", | |
| # Hindi Female/Male (4 voices) | |
| "hf_alpha", "hf_beta", "hm_omega", "hm_psi", | |
| # Italian Female/Male (2 voices) | |
| "if_sara", "im_nicola", | |
| # Japanese Female/Male (5 voices) | |
| "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", | |
| # Portuguese Female/Male (3 voices) | |
| "pf_dora", "pm_alex", "pm_santa", | |
| # Chinese Female/Male (8 voices) | |
| "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", | |
| "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" | |
| ] | |
| def _init_kokoro() -> None: | |
| """Lazy-initialize Kokoro model and pipelines on first use. | |
| Tries CUDA if torch is present and available; falls back to CPU. Keeps a | |
| minimal English pipeline and custom lexicon tweak for the word "kokoro". | |
| """ | |
| if _KOKORO_STATE["initialized"]: | |
| return | |
| if KModel is None or KPipeline is None: | |
| raise RuntimeError( | |
| "Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4)." | |
| ) | |
| device = "cpu" | |
| if torch is not None: | |
| try: | |
| if torch.cuda.is_available(): # type: ignore[attr-defined] | |
| device = "cuda" | |
| except Exception: | |
| device = "cpu" | |
| model = KModel().to(device).eval() | |
| pipelines = {"a": KPipeline(lang_code="a", model=False)} | |
| # Custom pronunciation | |
| try: | |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" | |
| except Exception: | |
| pass | |
| _KOKORO_STATE.update( | |
| { | |
| "initialized": True, | |
| "device": device, | |
| "model": model, | |
| "pipelines": pipelines, | |
| } | |
| ) | |
| def List_Kokoro_Voices() -> List[str]: | |
| """ | |
| Get a list of all available Kokoro voice identifiers. | |
| This MCP tool helps clients discover the 54 available voice options | |
| for the Generate_Speech tool. | |
| Returns: | |
| List[str]: A list of voice identifiers (e.g., ["af_heart", "am_adam", "bf_alice", ...]) | |
| Voice naming convention: | |
| - First 2 letters: Language/Region (af=American Female, am=American Male, bf=British Female, etc.) | |
| - Following letters: Voice name (heart, adam, alice, etc.) | |
| Available categories: | |
| - American Female/Male (20 voices) | |
| - British Female/Male (8 voices) | |
| - European Female/Male (3 voices) | |
| - French Female (1 voice) | |
| - Hindi Female/Male (4 voices) | |
| - Italian Female/Male (2 voices) | |
| - Japanese Female/Male (5 voices) | |
| - Portuguese Female/Male (3 voices) | |
| - Chinese Female/Male (8 voices) | |
| """ | |
| return get_kokoro_voices() | |
| def Generate_Speech( # <-- MCP tool #4 (Generate Speech) | |
| text: Annotated[str, "The text to synthesize (English)."], | |
| speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25, | |
| voice: Annotated[str, "Voice identifier from 54 available options."] = "af_heart", | |
| ) -> Tuple[int, np.ndarray]: | |
| """ | |
| Synthesize speech from text using the Kokoro-82M TTS model. | |
| This function returns raw audio suitable for a Gradio Audio component and is | |
| also exposed as an MCP tool. It supports 54 different voices across multiple | |
| languages and accents including American, British, European, Hindi, Italian, | |
| Japanese, Portuguese, and Chinese speakers. | |
| Args: | |
| text (str): The text to synthesize. Works best with English but supports multiple languages. | |
| speed (float): Speech speed multiplier in 0.5–2.0; 1.0 = normal speed. Default: 1.25 (slightly brisk). | |
| voice (str): Voice identifier from 54 available options. Default: 'af_heart'. | |
| Returns: | |
| A tuple of (sample_rate_hz, audio_waveform) where: | |
| - sample_rate_hz: int sample rate in Hz (24_000) | |
| - audio_waveform: numpy.ndarray float32 mono waveform in range [-1, 1] | |
| """ | |
| _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice) | |
| if not text or not text.strip(): | |
| try: | |
| _log_call_end("Generate_Speech", "error=empty text") | |
| finally: | |
| pass | |
| raise gr.Error("Please provide non-empty text to synthesize.") | |
| _init_kokoro() | |
| model = _KOKORO_STATE["model"] | |
| pipelines = _KOKORO_STATE["pipelines"] | |
| pipeline = pipelines.get("a") | |
| if pipeline is None: | |
| raise gr.Error("Kokoro English pipeline not initialized.") | |
| # Process ALL segments for longer audio generation | |
| audio_segments = [] | |
| pack = pipeline.load_voice(voice) | |
| try: | |
| # Get all segments first to show progress for long text | |
| segments = list(pipeline(text, voice, speed)) | |
| total_segments = len(segments) | |
| # Iterate through ALL segments instead of just the first one | |
| for segment_idx, (text_chunk, ps, _) in enumerate(segments): | |
| ref_s = pack[len(ps) - 1] | |
| try: | |
| audio = model(ps, ref_s, float(speed)) | |
| audio_segments.append(audio.detach().cpu().numpy()) | |
| # For very long text (>10 segments), show progress every few segments | |
| if total_segments > 10 and (segment_idx + 1) % 5 == 0: | |
| print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") | |
| except Exception as e: | |
| raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {str(e)}") | |
| if not audio_segments: | |
| raise gr.Error("No audio was generated (empty synthesis result).") | |
| # Concatenate all segments to create the complete audio | |
| if len(audio_segments) == 1: | |
| final_audio = audio_segments[0] | |
| else: | |
| final_audio = np.concatenate(audio_segments, axis=0) | |
| # For multi-segment audio, provide completion info | |
| duration = len(final_audio) / 24_000 | |
| if total_segments > 1: | |
| print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") | |
| # Success logging & return | |
| _log_call_end("Generate_Speech", f"samples={final_audio.shape[0]} duration_sec={len(final_audio)/24_000:.2f}") | |
| return 24_000, final_audio | |
| except gr.Error as e: | |
| _log_call_end("Generate_Speech", f"gr_error={str(e)}") | |
| raise # Re-raise | |
| except Exception as e: | |
| _log_call_end("Generate_Speech", f"error={str(e)[:120]}") | |
| raise gr.Error(f"Error during speech generation: {str(e)}") | |
| # ========================== | |
| # JSON Memory System (MCP tools #7–#10 if enabled) | |
| # ========================== | |
| # Implementation goals (aligned with Gradio MCP docs): | |
| # * Each function has a rich docstring (used for tool description) | |
| # * Type hints + Annotated param docs become the schema | |
| # * Zero external dependencies (pure stdlib JSON file persistence) | |
| # * Safe concurrent access via a process‑local lock | |
| # * Human‑readable & recoverable even if file becomes corrupted | |
| MEMORY_FILE = os.path.join(os.path.dirname(__file__), "memories.json") | |
| _MEMORY_LOCK = threading.RLock() | |
| _MAX_MEMORIES = 10_000 # soft cap to avoid unbounded growth | |
| def _now_iso() -> str: | |
| return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") | |
| def _load_memories() -> List[Dict[str, str]]: | |
| """Internal helper: load memory list from disk. | |
| Returns an empty list if the file does not exist or is unreadable. | |
| If the JSON is corrupted, a *.corrupt backup is written once and a | |
| fresh empty list is returned (fail‑open philosophy for tool usage). | |
| """ | |
| if not os.path.exists(MEMORY_FILE): | |
| return [] | |
| try: | |
| with open(MEMORY_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| # Filter only dict items containing required keys if present | |
| cleaned: List[Dict[str, str]] = [] | |
| for item in data: | |
| if isinstance(item, dict) and "id" in item and "text" in item: | |
| cleaned.append(item) | |
| return cleaned | |
| return [] | |
| except Exception: | |
| # Backup corrupted file once | |
| try: | |
| backup = MEMORY_FILE + ".corrupt" | |
| if not os.path.exists(backup): | |
| os.replace(MEMORY_FILE, backup) | |
| except Exception: | |
| pass | |
| return [] | |
| def _save_memories(memories: List[Dict[str, str]]) -> None: | |
| """Persist memory list atomically to disk (write temp then replace).""" | |
| tmp_path = MEMORY_FILE + ".tmp" | |
| with open(tmp_path, "w", encoding="utf-8") as f: | |
| json.dump(memories, f, ensure_ascii=False, indent=2) | |
| os.replace(tmp_path, MEMORY_FILE) | |
| def _mem_save( | |
| text: Annotated[str, "Raw textual content to remember (will be stored verbatim)."], | |
| tags: Annotated[str, "Optional comma-separated tags for lightweight categorization (e.g. 'user, preference')."] = "", | |
| ) -> str: | |
| """(Internal) Persist a new memory record. | |
| Summary: | |
| Adds a memory object to the local JSON store (no external database). | |
| Stored Fields: | |
| - id (str, UUID4) | |
| - text (str, verbatim user content) | |
| - timestamp (UTC "YYYY-MM-DD HH:MM:SS") | |
| - tags (str, original comma-separated tag string) | |
| Behavior / Rules: | |
| 1. Whitespace is trimmed; empty text is rejected. | |
| 2. If the most recent existing memory has identical text, the new one is skipped (light dedupe heuristic). | |
| 3. When total entries exceed _MAX_MEMORIES, oldest entries are pruned (soft cap). | |
| 4. Operation is protected by an in‑process reentrant lock only (no cross‑process locking). | |
| Returns: | |
| str: Human readable confirmation containing the new memory UUID (full or prefix | |
| Security / Privacy: | |
| Data is plaintext JSON on local disk; do NOT store secrets or regulated data. | |
| """ | |
| text_clean = (text or "").strip() | |
| if not text_clean: | |
| return "Error: memory text is empty." | |
| with _MEMORY_LOCK: | |
| memories = _load_memories() | |
| if memories and memories[-1].get("text") == text_clean: | |
| return "Skipped: identical to last stored memory." | |
| mem_id = str(uuid.uuid4()) | |
| entry = { | |
| "id": mem_id, | |
| "text": text_clean, | |
| "timestamp": _now_iso(), | |
| "tags": tags.strip(), | |
| } | |
| memories.append(entry) | |
| if len(memories) > _MAX_MEMORIES: | |
| # Drop oldest overflow | |
| overflow = len(memories) - _MAX_MEMORIES | |
| memories = memories[overflow:] | |
| _save_memories(memories) | |
| return f"Memory saved: {mem_id}" | |
| def _mem_list( | |
| limit: Annotated[int, "Maximum number of most recent memories to return (1–200)."] = 20, | |
| include_tags: Annotated[bool, "If true, include tags column in output."] = True, | |
| ) -> str: | |
| """(Internal) List most recent memories. | |
| Parameters: | |
| limit (int): Max rows to return; clamped to [1, 200]. | |
| include_tags (bool): Include tags section when True. | |
| Output Format (one per line): | |
| <uuid_prefix> [YYYY-MM-DD HH:MM:SS] <text> | tags: <tag list> | |
| (Tag column omitted if empty or include_tags=False.) | |
| Returns: | |
| str: Joined newline string or a friendly "No memories stored." message. | |
| """ | |
| limit = max(1, min(200, limit)) | |
| with _MEMORY_LOCK: | |
| memories = _load_memories() | |
| if not memories: | |
| return "No memories stored yet." | |
| # Already chronological (append order); display newest first | |
| chosen = memories[-limit:][::-1] | |
| lines: List[str] = [] | |
| for m in chosen: | |
| base = f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" | |
| if include_tags and m.get("tags"): | |
| base += f" | tags: {m['tags']}" | |
| lines.append(base) | |
| omitted = len(memories) - len(chosen) | |
| if omitted > 0: | |
| lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})") | |
| return "\n".join(lines) | |
| def _mem_search( | |
| query: Annotated[str, "Case-insensitive substring search; space-separated terms are ANDed."], | |
| limit: Annotated[int, "Maximum number of matches (1–200)."] = 20, | |
| ) -> str: | |
| """(Internal) Full-text style AND search across text and tags. | |
| Search Semantics: | |
| - Split query on whitespace into individual terms. | |
| - A memory matches only if EVERY term appears (case-insensitive) in the text OR tags field. | |
| - Results are ordered newest-first (descending timestamp). | |
| Parameters: | |
| query (str): Raw user query string; must contain at least one non-space character. | |
| limit (int): Max rows to return; clamped to [1, 200]. | |
| Returns: | |
| str: Formatted lines identical to _mem_list output or "No matches". | |
| """ | |
| q = (query or "").strip() | |
| if not q: | |
| return "Error: empty query." | |
| terms = [t.lower() for t in q.split() if t.strip()] | |
| if not terms: | |
| return "Error: no valid search terms." | |
| limit = max(1, min(200, limit)) | |
| with _MEMORY_LOCK: | |
| memories = _load_memories() | |
| # Newest first iteration for early cutoff | |
| matches: List[Dict[str, str]] = [] # collected (capped at limit) | |
| total_matches = 0 | |
| for m in reversed(memories): # newest backward | |
| hay = (m.get("text", "") + " " + m.get("tags", "")).lower() | |
| if all(t in hay for t in terms): | |
| total_matches += 1 | |
| if len(matches) < limit: | |
| matches.append(m) | |
| if not matches: | |
| return f"No matches for: {query}" | |
| lines = [ | |
| f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" + (f" | tags: {m['tags']}" if m.get('tags') else "") | |
| for m in matches | |
| ] | |
| omitted = total_matches - len(matches) | |
| if omitted > 0: | |
| lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})") | |
| return "\n".join(lines) | |
| def _mem_delete( | |
| memory_id: Annotated[str, "Full UUID or a unique prefix (>=4 chars) of the memory id to delete."], | |
| ) -> str: | |
| """(Internal) Delete one memory by UUID or unique prefix. | |
| Parameters: | |
| memory_id (str): Full UUID4 (preferred) OR a unique prefix (>=4 chars). If prefix is ambiguous, no deletion occurs. | |
| Returns: | |
| str: One of: success message, ambiguity notice, or not-found message. | |
| Safety: | |
| Ambiguous prefixes are rejected to prevent accidental mass deletion. | |
| """ | |
| key = (memory_id or "").strip().lower() | |
| if len(key) < 4: | |
| return "Error: supply at least 4 characters of the id." | |
| with _MEMORY_LOCK: | |
| memories = _load_memories() | |
| matched = [m for m in memories if m["id"].lower().startswith(key)] | |
| if not matched: | |
| return "Memory not found." | |
| if len(matched) > 1 and key != matched[0]["id"].lower(): | |
| # ambiguous prefix | |
| sample = ", ".join(m["id"][:8] for m in matched[:5]) | |
| more = "…" if len(matched) > 5 else "" | |
| return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters." | |
| # Unique match | |
| target_id = matched[0]["id"] | |
| memories = [m for m in memories if m["id"] != target_id] | |
| _save_memories(memories) | |
| return f"Deleted memory: {target_id}" | |
| # ====================== | |
| # UI: four-tab interface | |
| # ====================== | |
| # --- Fetch tab (compact controllable extraction) --- | |
| fetch_interface = gr.Interface( | |
| fn=Fetch_Webpage, | |
| inputs=[ | |
| gr.Textbox(label="URL", placeholder="https://example.com/article"), | |
| gr.Dropdown( | |
| label="Verbosity", | |
| choices=["Brief", "Standard", "Full"], | |
| value="Standard", | |
| info="Brief: 1000 chars, Standard: 3000 chars, Full: complete page" | |
| ), | |
| ], | |
| outputs=gr.Markdown(label="Extracted Markdown"), | |
| title="Fetch Webpage", | |
| description=( | |
| "<div style=\"text-align:center\">Convert any webpage to clean Markdown format with configurable length, preserving structure and formatting while removing navigation and clutter.</div>" | |
| ), | |
| api_description=( | |
| "Fetch a web page and return it converted to Markdown format with configurable length. " | |
| "Parameters: url (str - absolute URL), verbosity (str - Brief/Standard/Full controlling output length: Brief=1000 chars, Standard=3000 chars, Full=complete page)." | |
| ), | |
| flagging_mode="never", | |
| ) | |
| # --- Simplified DDG tab (readable output only) --- | |
| concise_interface = gr.Interface( | |
| fn=Search_DuckDuckGo, | |
| inputs=[ | |
| gr.Textbox(label="Query", placeholder="topic OR site:example.com"), | |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), | |
| ], | |
| outputs=gr.Textbox(label="Search Results", interactive=False), | |
| title="DuckDuckGo Search", | |
| description=( | |
| "<div style=\"text-align:center\">Web search with readable output format. Supports advanced search operators.</div>" | |
| ), | |
| api_description=( | |
| "Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. " | |
| "Supports advanced search operators: site: for specific domains, quotes for exact phrases, " | |
| "OR for alternatives, and - to exclude terms. Examples: 'Python programming', 'site:example.com', " | |
| "'\"artificial intelligence\"', 'cats -dogs', 'Python OR JavaScript'." | |
| ), | |
| flagging_mode="never", | |
| submit_btn="Search", | |
| ) | |
| ## | |
| # --- Execute Python tab (simple code interpreter) --- | |
| code_interface = gr.Interface( | |
| fn=Execute_Python, | |
| inputs=gr.Code(label="Python Code", language="python"), | |
| outputs=gr.Textbox(label="Output"), | |
| title="Python Code Executor", | |
| description=( | |
| "<div style=\"text-align:center\">Execute Python code and see the output.</div>" | |
| ), | |
| api_description=( | |
| "Execute arbitrary Python code and return captured stdout or an error message. " | |
| "Supports any valid Python code including imports, variables, functions, loops, and calculations. " | |
| "Examples: 'print(2+2)', 'import math; print(math.sqrt(16))', 'for i in range(3): print(i)'. " | |
| "Parameters: code (str - Python source code to execute). " | |
| "Returns: Combined stdout output or exception text if execution fails." | |
| ), | |
| flagging_mode="never", | |
| ) | |
| CSS_STYLES = """ | |
| .gradio-container h1 { | |
| text-align: center; | |
| /* Ensure main title appears first, then our two subtitle lines */ | |
| display: grid; | |
| justify-items: center; | |
| } | |
| /* Place bold tools list on line 2, normal auth note on line 3 (below title) */ | |
| .gradio-container h1::before { | |
| grid-row: 2; | |
| content: "Fetch Webpage | Search DuckDuckGo | Python Interpreter | Memory Manager | Kokoro TTS | Image Generation | Video Generation"; | |
| display: block; | |
| font-size: 1rem; | |
| font-weight: 700; | |
| opacity: 0.9; | |
| margin-top: 6px; | |
| white-space: pre-wrap; | |
| } | |
| .gradio-container h1::after { | |
| grid-row: 3; | |
| content: "Authentication is optional. Image/Video generation require an HF token to function and may be hidden from MCP tools without one — but UI tabs remain visible. Memory is intended for local use and may be hidden from MCP tools."; | |
| display: block; | |
| font-size: 1rem; | |
| font-weight: 400; | |
| opacity: 0.9; | |
| margin-top: 2px; | |
| white-space: pre-wrap; | |
| } | |
| /* Remove inside tab panels so it doesn't duplicate under each tool title */ | |
| .gradio-container [role=\"tabpanel\"] h1::before, | |
| .gradio-container [role=\"tabpanel\"] h1::after { | |
| content: none !important; | |
| } | |
| """ | |
| # --- Kokoro TTS tab (text to speech) --- | |
| available_voices = get_kokoro_voices() | |
| kokoro_interface = gr.Interface( | |
| fn=Generate_Speech, | |
| inputs=[ | |
| gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), | |
| gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"), | |
| gr.Dropdown( | |
| label="Voice", | |
| choices=available_voices, | |
| value="af_heart", | |
| info="Select from 54 available voices across multiple languages and accents" | |
| ), | |
| ], | |
| outputs=gr.Audio(label="Audio", type="numpy", format="wav", show_download_button=True), | |
| title="Kokoro TTS", | |
| description=( | |
| "<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>" | |
| ), | |
| api_description=( | |
| "Synthesize speech from text using Kokoro-82M TTS model. Returns (sample_rate, waveform) suitable for playback. " | |
| "Supports unlimited text length by processing all segments. Voice examples: 'af_heart' (US female), 'am_onyx' (US male), " | |
| "'bf_emma' (British female), 'af_sky' (US female), 'af_nicole' (US female), " | |
| "Parameters: text (str), speed (float 0.5–2.0, default 1.25x), voice (str from 54 available options, default 'af_heart'). " | |
| "Return the generated media to the user in this format ``" | |
| ), | |
| flagging_mode="never", | |
| ) | |
| def Memory_Manager( | |
| action: Annotated[Literal["save","list","search","delete"], "Action to perform: save | list | search | delete"], | |
| text: Annotated[Optional[str], "Text content (Save only)"] = None, | |
| tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, | |
| query: Annotated[Optional[str], "Search query terms (Search only)"] = None, | |
| limit: Annotated[int, "Max results (List/Search only)"] = 20, | |
| memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None, | |
| include_tags: Annotated[bool, "Include tags (List/Search only)"] = True, | |
| ) -> str: | |
| """Manage lightweight local JSON “memories” (save | list | search | delete) in one MCP tool. | |
| Overview: | |
| This tool provides simple, local, append‑only style persistence for short text memories | |
| with optional tags. Data is stored in a plaintext JSON file ("memories.json") beside the | |
| application; no external database or network access is required. | |
| Supported Actions: | |
| - save : Store a new memory (requires 'text'; optional 'tags'). | |
| - list : Return the most recent memories (respects 'limit' + 'include_tags'). | |
| - search : AND match space‑separated terms across text and tags (uses 'query', 'limit'). | |
| - delete : Remove one memory by full UUID or unique prefix (uses 'memory_id'). | |
| Parameter Usage by Action: | |
| action=save -> text (required), tags (optional) | |
| action=list -> limit, include_tags | |
| action=search -> query (required), limit, include_tags | |
| action=delete -> memory_id (required) | |
| Parameters: | |
| action (Literal[save|list|search|delete]): Operation selector (case-insensitive). | |
| text (str): Raw memory content; leading/trailing whitespace trimmed (save only). | |
| tags (str): Optional comma-separated tags; stored verbatim (save only). | |
| query (str): Space-separated terms (AND logic, case-insensitive) across text+tags (search only). | |
| limit (int): Maximum rows for list/search (clamped internally to 1–200). | |
| memory_id (str): Full UUID or unique prefix (>=4 chars) (delete only). | |
| include_tags (bool): When True, show tag column in list/search output. | |
| Storage Format (per entry): | |
| {"id": "<uuid4>", "text": "<original text>", "timestamp": "YYYY-MM-DD HH:MM:SS", "tags": "tag1, tag2"} | |
| Lifecycle & Constraints: | |
| - A soft cap of {_MAX_MEMORIES} entries is enforced by pruning oldest records on save. | |
| - A light duplicate guard skips saving if the newest existing entry has identical text. | |
| - All operations are protected by a thread‑local reentrant lock (NOT multi‑process safe). | |
| Returns: | |
| str: Human‑readable status / result lines (never raw JSON) suitable for direct model consumption. | |
| Error Modes: | |
| - Invalid action -> error string. | |
| - Missing required field for the chosen action -> explanatory message. | |
| - Ambiguous or unknown memory_id on delete -> clarification message. | |
| Security & Privacy: | |
| Plaintext JSON; do not store secrets, credentials, or regulated personal data. | |
| """ | |
| act = (action or "").lower().strip() | |
| # Normalize None -> "" for internal helpers | |
| text = text or "" | |
| tags = tags or "" | |
| query = query or "" | |
| memory_id = memory_id or "" | |
| if act == "save": | |
| if not text.strip(): | |
| return "Error: 'text' is required when action=save." | |
| return _mem_save(text=text, tags=tags) | |
| if act == "list": | |
| return _mem_list(limit=limit, include_tags=include_tags) | |
| if act == "search": | |
| if not query.strip(): | |
| return "Error: 'query' is required when action=search." | |
| return _mem_search(query=query, limit=limit) | |
| if act == "delete": | |
| if not memory_id.strip(): | |
| return "Error: 'memory_id' is required when action=delete." | |
| return _mem_delete(memory_id=memory_id) | |
| return "Error: invalid action (use save|list|search|delete)." | |
| memory_interface = gr.Interface( | |
| fn=Memory_Manager, | |
| inputs=[ | |
| gr.Dropdown(label="Action", choices=["save","list","search","delete"], value="list"), | |
| gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"), | |
| gr.Textbox(label="Tags", placeholder="tag1, tag2"), | |
| gr.Textbox(label="Query", placeholder="Search terms (search)"), | |
| gr.Slider(1, 200, value=20, step=1, label="Limit"), | |
| gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)"), | |
| gr.Checkbox(value=True, label="Include Tags"), | |
| ], | |
| outputs=gr.Textbox(label="Result", lines=14), | |
| title="Memory Manager", | |
| description=( | |
| "<div style=\"text-align:center\">Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.</div>" | |
| ), | |
| api_description=( | |
| "Manage short text memories with optional tags. Actions: save(text,tags), list(limit,include_tags), " | |
| "search(query,limit,include_tags), delete(memory_id). Returns plaintext JSON. Action parameter is always required. " | |
| "Use Memory_Manager whenever you are given information worth remembering about the user, and search for memories when relevant." | |
| ), | |
| flagging_mode="never", | |
| # Always visible in the UI, but only exposed as an MCP tool when HF token is present | |
| show_api=bool(os.getenv("HF_READ_TOKEN")), | |
| ) | |
| # ========================== | |
| # Image Generation (Serverless) | |
| # ========================== | |
| HF_API_TOKEN = os.getenv("HF_READ_TOKEN") | |
| def Generate_Image( # <-- MCP tool #5 (Generate Image) | |
| prompt: Annotated[str, "Text description of the image to generate."], | |
| model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name' (e.g., black-forest-labs/FLUX.1-Krea-dev)."] = "black-forest-labs/FLUX.1-Krea-dev", | |
| negative_prompt: Annotated[str, "What should NOT appear in the image." ] = ( | |
| "(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " | |
| "missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " | |
| "mutated, ugly, disgusting, blurry, amputation, misspellings, typos" | |
| ), | |
| steps: Annotated[int, "Number of denoising steps (1–100). Higher = slower, potentially higher quality."] = 35, | |
| cfg_scale: Annotated[float, "Classifier-free guidance scale (1–20). Higher = follow the prompt more closely."] = 7.0, | |
| sampler: Annotated[str, "Sampling method label (UI only). Common options: 'DPM++ 2M Karras', 'DPM++ SDE Karras', 'Euler', 'Euler a', 'Heun', 'DDIM'."] = "DPM++ 2M Karras", | |
| seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, | |
| width: Annotated[int, "Output width in pixels (64–1216, multiple of 32 recommended)."] = 1024, | |
| height: Annotated[int, "Output height in pixels (64–1216, multiple of 32 recommended)."] = 1024, | |
| ) -> Image.Image: | |
| """ | |
| Generate a single image from a text prompt using a Hugging Face model via serverless inference. | |
| Args: | |
| prompt (str): Text description of the image to generate. | |
| model_id (str): The Hugging Face model id (creator/model-name). Defaults to "black-forest-labs/FLUX.1-Krea-dev". | |
| negative_prompt (str): What should NOT appear in the image. | |
| steps (int): Number of denoising steps (1–100). Higher can improve quality. | |
| cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. | |
| sampler (str): Sampling method label for UI; not all providers expose this control. | |
| seed (int): Random seed. Use -1 to randomize on each call. | |
| width (int): Output width in pixels (64–1216; multiples of 32 recommended). | |
| height (int): Output height in pixels (64–1216; multiples of 32 recommended). | |
| Returns: | |
| PIL.Image.Image: The generated image. | |
| Error modes: | |
| - Raises gr.Error with a user-friendly message on auth/model/load errors. | |
| """ | |
| _log_call_start("Generate_Image", prompt=_truncate_for_log(prompt, 200), model_id=model_id, steps=steps, cfg_scale=cfg_scale, seed=seed, size=f"{width}x{height}") | |
| if not prompt or not prompt.strip(): | |
| _log_call_end("Generate_Image", "error=empty prompt") | |
| raise gr.Error("Please provide a non-empty prompt.") | |
| # Slightly enhance prompt for quality (kept consistent with Serverless space) | |
| enhanced_prompt = f"{prompt} | ultra detail, ultra elaboration, ultra quality, perfect." | |
| # Try multiple providers for resilience | |
| providers = ["auto", "replicate", "fal-ai"] | |
| last_error: Exception | None = None | |
| for provider in providers: | |
| try: | |
| client = InferenceClient(api_key=HF_API_TOKEN, provider=provider) | |
| image = client.text_to_image( | |
| prompt=enhanced_prompt, | |
| negative_prompt=negative_prompt, | |
| model=model_id, | |
| width=width, | |
| height=height, | |
| num_inference_steps=steps, | |
| guidance_scale=cfg_scale, | |
| seed=seed if seed != -1 else random.randint(1, 1_000_000_000), | |
| ) | |
| _log_call_end("Generate_Image", f"provider={provider} size={image.size}") | |
| return image | |
| except Exception as e: # try next provider, transform last one to friendly error | |
| last_error = e | |
| continue | |
| # If we reach here, all providers failed | |
| msg = str(last_error) if last_error else "Unknown error" | |
| if "404" in msg: | |
| raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") | |
| if "503" in msg: | |
| raise gr.Error("The model is warming up. Please try again shortly.") | |
| if "401" in msg or "403" in msg: | |
| raise gr.Error("Authentication failed. Set HF_READ_TOKEN environment variable with access to the model.") | |
| _log_call_end("Generate_Image", f"error={_truncate_for_log(msg, 200)}") | |
| raise gr.Error(f"Image generation failed: {msg}") | |
| image_generation_interface = gr.Interface( | |
| fn=Generate_Image, | |
| inputs=[ | |
| gr.Textbox(label="Prompt", placeholder="Enter a prompt", lines=2), | |
| gr.Textbox(label="Model", value="black-forest-labs/FLUX.1-Krea-dev", placeholder="creator/model-name"), | |
| gr.Textbox( | |
| label="Negative Prompt", | |
| value=( | |
| "(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " | |
| "missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " | |
| "mutated, ugly, disgusting, blurry, amputation, misspellings, typos" | |
| ), | |
| lines=2, | |
| ), | |
| gr.Slider(minimum=1, maximum=100, value=35, step=1, label="Steps"), | |
| gr.Slider(minimum=1.0, maximum=20.0, value=7.0, step=0.1, label="CFG Scale"), | |
| gr.Radio(label="Sampler", value="DPM++ 2M Karras", choices=[ | |
| "DPM++ 2M Karras", "DPM++ SDE Karras", "Euler", "Euler a", "Heun", "DDIM" | |
| ]), | |
| gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), | |
| gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Width"), | |
| gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Height"), | |
| ], | |
| outputs=gr.Image(label="Generated Image"), | |
| title="Image Generation", | |
| description=( | |
| "<div style=\"text-align:center\">Generate images via Hugging Face serverless inference. " | |
| "Default model is FLUX.1-Krea-dev.</div>" | |
| ), | |
| api_description=( | |
| "Generate a single image from a text prompt using a Hugging Face model via serverless inference. " | |
| "Supports creative prompts like 'a serene mountain landscape at sunset', 'portrait of a wise owl', " | |
| "'futuristic city with flying cars'. Default model: FLUX.1-Krea-dev. " | |
| "Parameters: prompt (str), model_id (str, creator/model-name), negative_prompt (str), steps (int, 1–100), " | |
| "cfg_scale (float, 1–20), sampler (str), seed (int, -1=random), width/height (int, 64–1216). " | |
| "Returns a PIL.Image. Return the generated media to the user in this format ``" | |
| ), | |
| flagging_mode="never", | |
| # Only expose to MCP when HF token is provided; UI tab is always visible | |
| show_api=bool(os.getenv("HF_READ_TOKEN")), | |
| ) | |
| # ========================== | |
| # Video Generation (Serverless) | |
| # ========================== | |
| def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str: | |
| """Write video bytes or iterable of bytes to a system temporary file and return its path. | |
| This avoids polluting the project directory. The file is created in the OS temp | |
| location; Gradio will handle serving & offering the download button. | |
| """ | |
| fd, fname = tempfile.mkstemp(suffix=suffix) | |
| try: | |
| with os.fdopen(fd, "wb") as f: | |
| if isinstance(data_iter_or_bytes, (bytes, bytearray)): | |
| f.write(data_iter_or_bytes) # type: ignore[arg-type] | |
| elif hasattr(data_iter_or_bytes, "read"): | |
| f.write(data_iter_or_bytes.read()) # type: ignore[call-arg] | |
| elif hasattr(data_iter_or_bytes, "content"): | |
| f.write(data_iter_or_bytes.content) # type: ignore[attr-defined] | |
| elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)): | |
| for chunk in data_iter_or_bytes: # type: ignore[assignment] | |
| if chunk: | |
| f.write(chunk) | |
| else: | |
| raise gr.Error("Unsupported video data type returned by provider.") | |
| except Exception: | |
| # Clean up if writing failed | |
| try: | |
| os.remove(fname) | |
| except Exception: | |
| pass | |
| raise | |
| return fname | |
| HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") | |
| def Generate_Video( # <-- MCP tool #6 (Generate Video) | |
| prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."], | |
| model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B", | |
| negative_prompt: Annotated[str, "What should NOT appear in the video."] = "", | |
| steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25, | |
| cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5, | |
| seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, | |
| width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768, | |
| height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768, | |
| fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24, | |
| duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0, | |
| ) -> str: | |
| """ | |
| Generate a short video from a text prompt using a Hugging Face model via serverless inference. | |
| Args: | |
| prompt (str): Text description of the video to generate. | |
| model_id (str): The Hugging Face model id (creator/model-name). Defaults to "Wan-AI/Wan2.2-T2V-A14B". | |
| negative_prompt (str): What should NOT appear in the video. | |
| steps (int): Number of denoising steps (1–100). Higher can improve quality but is slower. | |
| cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. | |
| seed (int): Random seed. Use -1 to randomize on each call. | |
| width (int): Output width in pixels. | |
| height (int): Output height in pixels. | |
| fps (int): Frames per second. | |
| duration (float): Target duration in seconds. | |
| Returns: | |
| str: Path to an MP4 file on disk (Gradio will serve this file; MCP converts it to a file URL). | |
| Error modes: | |
| - Raises gr.Error with a user-friendly message on auth/model/load errors or unsupported parameters. | |
| """ | |
| _log_call_start("Generate_Video", prompt=_truncate_for_log(prompt, 160), model_id=model_id, steps=steps, cfg_scale=cfg_scale, fps=fps, duration=duration, size=f"{width}x{height}") | |
| if not prompt or not prompt.strip(): | |
| _log_call_end("Generate_Video", "error=empty prompt") | |
| raise gr.Error("Please provide a non-empty prompt.") | |
| if not HF_VIDEO_TOKEN: | |
| # Still attempt without a token (public models), but warn earlier if it fails. | |
| pass | |
| providers = ["auto", "replicate", "fal-ai"] | |
| last_error: Exception | None = None | |
| # Build a common parameters dict. Providers may ignore unsupported keys. | |
| parameters = { | |
| "negative_prompt": negative_prompt or None, | |
| "num_inference_steps": steps, | |
| "guidance_scale": cfg_scale, | |
| "seed": seed if seed != -1 else random.randint(1, 1_000_000_000), | |
| "width": width, | |
| "height": height, | |
| "fps": fps, | |
| # Some providers/models expect num_frames instead of duration; we pass both-friendly value | |
| # when supported; they may be ignored by the backend. | |
| "duration": duration, | |
| } | |
| for provider in providers: | |
| try: | |
| client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider) | |
| # Use the documented text_to_video API with correct parameters | |
| if hasattr(client, "text_to_video"): | |
| # Calculate num_frames from duration and fps if both provided | |
| num_frames = int(duration * fps) if duration and fps else None | |
| # Build extra_body for provider-specific parameters | |
| extra_body = {} | |
| if width: | |
| extra_body["width"] = width | |
| if height: | |
| extra_body["height"] = height | |
| if fps: | |
| extra_body["fps"] = fps | |
| if duration: | |
| extra_body["duration"] = duration | |
| result = client.text_to_video( | |
| prompt=prompt, | |
| model=model_id, | |
| guidance_scale=cfg_scale, | |
| negative_prompt=[negative_prompt] if negative_prompt else None, | |
| num_frames=num_frames, | |
| num_inference_steps=steps, | |
| seed=parameters["seed"], | |
| extra_body=extra_body if extra_body else None, | |
| ) | |
| else: | |
| # Generic POST fallback for older versions | |
| result = client.post( | |
| model=model_id, | |
| json={ | |
| "inputs": prompt, | |
| "parameters": {k: v for k, v in parameters.items() if v is not None}, | |
| }, | |
| ) | |
| # Save output to an .mp4 | |
| path = _write_video_tmp(result, suffix=".mp4") | |
| try: | |
| size = os.path.getsize(path) | |
| except Exception: | |
| size = -1 | |
| _log_call_end("Generate_Video", f"provider={provider} path={os.path.basename(path)} bytes={size}") | |
| return path | |
| except Exception as e: | |
| last_error = e | |
| continue | |
| msg = str(last_error) if last_error else "Unknown error" | |
| if "404" in msg: | |
| raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.") | |
| if "503" in msg: | |
| raise gr.Error("The model is warming up. Please try again shortly.") | |
| if "401" in msg or "403" in msg: | |
| raise gr.Error("Authentication failed or not permitted. Set HF_READ_TOKEN/HF_TOKEN with inference access.") | |
| _log_call_end("Generate_Video", f"error={_truncate_for_log(msg, 200)}") | |
| raise gr.Error(f"Video generation failed: {msg}") | |
| video_generation_interface = gr.Interface( | |
| fn=Generate_Video, | |
| inputs=[ | |
| gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2), | |
| gr.Textbox(label="Model", value="Wan-AI/Wan2.2-T2V-A14B", placeholder="creator/model-name"), | |
| gr.Textbox(label="Negative Prompt", value="", lines=2), | |
| gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"), | |
| gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"), | |
| gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), | |
| gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"), | |
| gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"), | |
| gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"), | |
| gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"), | |
| ], | |
| outputs=gr.Video(label="Generated Video", show_download_button=True, format="mp4"), | |
| title="Video Generation", | |
| description=( | |
| "<div style=\"text-align:center\">Generate short videos via Hugging Face serverless inference. " | |
| "Default model is Wan2.2-T2V-A14B.</div>" | |
| ), | |
| api_description=( | |
| "Generate a short video from a text prompt using a Hugging Face model via serverless inference. " | |
| "Create dynamic scenes like 'a red fox running through a snowy forest at sunrise', 'waves crashing on a rocky shore', " | |
| "'time-lapse of clouds moving across a blue sky'. Default model: Wan2.2-T2V-A14B (2-6 second videos). " | |
| "Parameters: prompt (str), model_id (str), negative_prompt (str), steps (int), cfg_scale (float), seed (int), " | |
| "width/height (int), fps (int), duration (float in seconds). Returns MP4 file path. " | |
| "Return the generated media to the user in this format ``" | |
| ), | |
| flagging_mode="never", | |
| # Only expose to MCP when HF token is provided; UI tab is always visible | |
| show_api=bool(os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN")), | |
| ) | |
| _interfaces = [ | |
| fetch_interface, | |
| concise_interface, | |
| code_interface, | |
| memory_interface, # Always visible in UI | |
| kokoro_interface, | |
| image_generation_interface, # Always visible in UI | |
| video_generation_interface, # Always visible in UI | |
| ] | |
| _tab_names = [ | |
| "Fetch Webpage", | |
| "DuckDuckGo Search", | |
| "Python Code Executor", | |
| "Memory Manager", | |
| "Kokoro TTS", | |
| "Image Generation", | |
| "Video Generation", | |
| ] | |
| demo = gr.TabbedInterface( | |
| interface_list=_interfaces, | |
| tab_names=_tab_names, | |
| title="Tools MCP", | |
| theme="Nymbo/Nymbo_Theme", | |
| css=CSS_STYLES, | |
| ) | |
| # Launch the UI and expose all functions as MCP tools in one server | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |