| import os |
| import asyncio |
| import time |
| import json |
| from typing import Optional, List, Dict, Any |
| from datetime import datetime |
| import httpx |
| import trafilatura |
| import gradio as gr |
| from dateutil import parser as dateparser |
| from limits import parse |
| from limits.aio.storage import MemoryStorage |
| from limits.aio.strategies import MovingWindowRateLimiter |
| from analytics import record_request, last_n_days_df, last_n_days_avg_time_df |
|
|
| |
| SERPER_API_KEY_ENV = os.getenv("SERPER_API_KEY") |
| SERPER_API_KEY_OVERRIDE: Optional[str] = None |
| SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" |
| SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" |
|
|
|
|
| def _get_serper_api_key() -> Optional[str]: |
| """Return the currently active Serper API key (override wins, else env).""" |
| return (SERPER_API_KEY_OVERRIDE or SERPER_API_KEY_ENV or None) |
|
|
|
|
| def _get_headers() -> Dict[str, str]: |
| api_key = _get_serper_api_key() |
| return {"X-API-KEY": api_key or "", "Content-Type": "application/json"} |
|
|
| |
| storage = MemoryStorage() |
| limiter = MovingWindowRateLimiter(storage) |
| rate_limit = parse("360/hour") |
|
|
|
|
| async def search_web( |
| query: str, search_type: str = "search", num_results: Optional[int] = 4 |
| ) -> str: |
| """ |
| Search the web for information or fresh news, returning extracted content. |
| |
| This tool can perform two types of searches: |
| - "search" (default): General web search for diverse, relevant content from various sources |
| - "news": Specifically searches for fresh news articles and breaking stories |
| |
| Use "news" mode when looking for: |
| - Breaking news or very recent events |
| - Time-sensitive information |
| - Current affairs and latest developments |
| - Today's/this week's happenings |
| |
| Use "search" mode (default) for: |
| - General information and research |
| - Technical documentation or guides |
| - Historical information |
| - Diverse perspectives from various sources |
| |
| Args: |
| query (str): The search query. This is REQUIRED. Examples: "apple inc earnings", |
| "climate change 2024", "AI developments" |
| search_type (str): Type of search. This is OPTIONAL. Default is "search". |
| Options: "search" (general web search) or "news" (fresh news articles). |
| Use "news" for time-sensitive, breaking news content. |
| num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4. |
| Range: 1-20. More results = more context but longer response time. |
| |
| Returns: |
| str: Formatted text containing extracted content with metadata (title, |
| source, date, URL, and main text) for each result, separated by dividers. |
| Returns error message if API key is missing or search fails. |
| |
| Examples: |
| - search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI |
| - search_web("python tutorial", "search") - Get 4 general results about Python (default count) |
| - search_web("stock market today", "news", 10) - Get 10 news articles about today's market |
| - search_web("machine learning basics") - Get 4 general search results (all defaults) |
| """ |
| start_time = time.time() |
|
|
| if not _get_serper_api_key(): |
| await record_request(None, num_results) |
| return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool." |
|
|
| |
| if num_results is None: |
| num_results = 4 |
| num_results = max(1, min(20, num_results)) |
|
|
| |
| if search_type not in ["search", "news"]: |
| search_type = "search" |
|
|
| try: |
| |
| if not await limiter.hit(rate_limit, "global"): |
| print(f"[{datetime.now().isoformat()}] Rate limit exceeded") |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return "Error: Rate limit exceeded. Please try again later (limit: 360 requests per hour)." |
|
|
| |
| endpoint = ( |
| SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT |
| ) |
|
|
| |
| payload = {"q": query, "num": num_results} |
| if search_type == "news": |
| payload["type"] = "news" |
| payload["page"] = 1 |
|
|
| async with httpx.AsyncClient(timeout=15) as client: |
| resp = await client.post(endpoint, headers=_get_headers(), json=payload) |
|
|
| if resp.status_code != 200: |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again." |
|
|
| |
| if search_type == "news": |
| results = resp.json().get("news", []) |
| else: |
| results = resp.json().get("organic", []) |
|
|
| if not results: |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return f"No {search_type} results found for query: '{query}'. Try a different search term or search type." |
|
|
| |
| urls = [r["link"] for r in results] |
| async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: |
| tasks = [client.get(u) for u in urls] |
| responses = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| |
| chunks = [] |
| successful_extractions = 0 |
|
|
| for meta, response in zip(results, responses): |
| if isinstance(response, Exception): |
| continue |
|
|
| |
| body = trafilatura.extract( |
| response.text, include_formatting=True, include_comments=False |
| ) |
|
|
| if not body: |
| continue |
|
|
| successful_extractions += 1 |
| print( |
| f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}" |
| ) |
|
|
| |
| if search_type == "news": |
| |
| try: |
| date_str = meta.get("date", "") |
| if date_str: |
| date_iso = dateparser.parse(date_str, fuzzy=True).strftime( |
| "%Y-%m-%d" |
| ) |
| else: |
| date_iso = "Unknown" |
| except Exception: |
| date_iso = "Unknown" |
|
|
| chunk = ( |
| f"## {meta['title']}\n" |
| f"**Source:** {meta.get('source', 'Unknown')} " |
| f"**Date:** {date_iso}\n" |
| f"**URL:** {meta['link']}\n\n" |
| f"{body.strip()}\n" |
| ) |
| else: |
| |
| domain = meta["link"].split("/")[2].replace("www.", "") |
|
|
| chunk = ( |
| f"## {meta['title']}\n" |
| f"**Domain:** {domain}\n" |
| f"**URL:** {meta['link']}\n\n" |
| f"{body.strip()}\n" |
| ) |
|
|
| chunks.append(chunk) |
|
|
| if not chunks: |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access." |
|
|
| result = "\n---\n".join(chunks) |
| summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n" |
|
|
| print( |
| f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'" |
| ) |
|
|
| |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
|
|
| return summary + result |
|
|
| except Exception as e: |
| |
| duration = time.time() - start_time |
| return f"Error occurred while searching: {str(e)}. Please try again or check your query." |
|
|
|
|
| async def search_and_chunk( |
| query: str, |
| search_type: str, |
| num_results: Optional[int], |
| tokenizer_or_token_counter: str, |
| chunk_size: int, |
| chunk_overlap: int, |
| heading_level: int, |
| min_characters_per_chunk: int, |
| max_characters_per_section: int, |
| clean_text: bool, |
| ) -> str: |
| """ |
| Complete flow: search -> fetch -> extract with trafilatura -> chunk with MarkdownChunker/Parser. |
| Returns a JSON string of a list[dict] where each dict is a chunk enriched with source metadata. |
| """ |
| start_time = time.time() |
|
|
| if not _get_serper_api_key(): |
| await record_request(None, num_results) |
| return json.dumps([ |
| {"error": "SERPER_API_KEY not set", "hint": "Set env or paste in the UI"} |
| ]) |
|
|
| |
| if num_results is None: |
| num_results = 4 |
| num_results = max(1, min(20, int(num_results))) |
| if search_type not in ["search", "news"]: |
| search_type = "search" |
|
|
| try: |
| |
| if not await limiter.hit(rate_limit, "global"): |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return json.dumps([ |
| {"error": "rate_limited", "limit": "360/hour"} |
| ]) |
|
|
| endpoint = ( |
| SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT |
| ) |
| payload = {"q": query, "num": num_results} |
| if search_type == "news": |
| payload["type"] = "news" |
| payload["page"] = 1 |
|
|
| async with httpx.AsyncClient(timeout=15) as client: |
| resp = await client.post(endpoint, headers=_get_headers(), json=payload) |
|
|
| if resp.status_code != 200: |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return json.dumps([ |
| {"error": "bad_status", "status": resp.status_code} |
| ]) |
|
|
| results = resp.json().get("news" if search_type == "news" else "organic", []) |
| if not results: |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return json.dumps([]) |
|
|
| |
| urls = [r.get("link") for r in results] |
| async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: |
| responses = await asyncio.gather(*[client.get(u) for u in urls], return_exceptions=True) |
|
|
| all_chunks: List[Dict[str, Any]] = [] |
|
|
| for meta, response in zip(results, responses): |
| if isinstance(response, Exception): |
| continue |
|
|
| extracted = trafilatura.extract( |
| response.text, include_formatting=True, include_comments=False |
| ) |
| if not extracted: |
| continue |
|
|
| |
| if search_type == "news": |
| |
| try: |
| date_str = meta.get("date", "") |
| date_iso = ( |
| dateparser.parse(date_str, fuzzy=True).strftime("%Y-%m-%d") if date_str else "Unknown" |
| ) |
| except Exception: |
| date_iso = "Unknown" |
| markdown_doc = ( |
| f"# {meta.get('title', 'Untitled')}\n\n" |
| f"**Source:** {meta.get('source', 'Unknown')} **Date:** {date_iso}\n\n" |
| f"**URL:** {meta.get('link', '')}\n\n" |
| f"{extracted.strip()}\n" |
| ) |
| else: |
| domain = (meta.get("link", "").split("/")[2].replace("www.", "") if meta.get("link") else "") |
| markdown_doc = ( |
| f"# {meta.get('title', 'Untitled')}\n\n" |
| f"**Domain:** {domain}\n\n" |
| f"**URL:** {meta.get('link', '')}\n\n" |
| f"{extracted.strip()}\n" |
| ) |
|
|
| |
| chunks = _run_markdown_chunker( |
| markdown_doc, |
| tokenizer_or_token_counter=tokenizer_or_token_counter, |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| heading_level=heading_level, |
| min_characters_per_chunk=min_characters_per_chunk, |
| max_characters_per_section=max_characters_per_section, |
| clean_text=clean_text, |
| ) |
|
|
| |
| for c in chunks: |
| c.setdefault("source_title", meta.get("title")) |
| c.setdefault("url", meta.get("link")) |
| if search_type == "news": |
| c.setdefault("source", meta.get("source")) |
| c.setdefault("date", meta.get("date")) |
| else: |
| c.setdefault("domain", domain) |
| all_chunks.append(c) |
|
|
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return json.dumps(all_chunks, ensure_ascii=False) |
|
|
| except Exception as e: |
| duration = time.time() - start_time |
| await record_request(duration, num_results) |
| return json.dumps([{"error": str(e)}]) |
|
|
| |
| with gr.Blocks(title="Web Search MCP Server") as demo: |
| gr.HTML( |
| """ |
| <div style="background-color: rgba(59, 130, 246, 0.1); border: 1px solid rgba(59, 130, 246, 0.3); border-radius: 8px; padding: 12px; margin-bottom: 16px; text-align: center;"> |
| <p style="color: rgb(59, 130, 246); margin: 0; font-size: 14px; font-weight: 500;"> |
| 🤝 Community resource — please use responsibly to keep this service available for everyone |
| </p> |
| </div> |
| """ |
| ) |
|
|
| gr.Markdown("# 🔍 Web Search MCP Server") |
|
|
| with gr.Tabs(): |
| with gr.Tab("App"): |
| gr.Markdown( |
| """ |
| This MCP server provides web search capabilities to LLMs. It can perform general web searches |
| or specifically search for fresh news articles, extracting the main content from results. |
| |
| **⚡ Speed-Focused:** Optimized to complete the entire search process - from query to |
| fully extracted web content - in under 2 seconds. Check out the Analytics tab |
| to see real-time performance metrics. |
| |
| **Search Types:** |
| - **General Search**: Diverse results from various sources (blogs, docs, articles, etc.) |
| - **News Search**: Fresh news articles and breaking stories from news sources |
| |
| **Note:** This interface is primarily designed for MCP tool usage by LLMs, but you can |
| also test it manually below. |
| """ |
| ) |
|
|
| gr.HTML( |
| """ |
| <div style="margin-bottom: 24px;"> |
| <a href="https://huggingface.co/spaces/victor/websearch?view=api"> |
| <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/use-with-mcp-lg-dark.svg" |
| alt="Use with MCP" |
| style="height: 36px;"> |
| </a> |
| </div> |
| """, |
| padding=0, |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| query_input = gr.Textbox( |
| label="Search Query", |
| placeholder='e.g. "OpenAI news", "climate change 2024", "AI developments"', |
| info="Required: Enter your search query", |
| ) |
| with gr.Column(scale=1): |
| search_type_input = gr.Radio( |
| choices=["search", "news"], |
| value="search", |
| label="Search Type", |
| info="Choose search type", |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| serper_key_input = gr.Textbox( |
| label="Serper API Key", |
| placeholder="Enter your Serper API key or set SERPER_API_KEY env var", |
| type="password", |
| ) |
| with gr.Column(scale=1): |
| set_key_btn = gr.Button("Save API Key") |
|
|
| with gr.Accordion("Chunking Parameters", open=False): |
| with gr.Row(): |
| num_results_input = gr.Slider( |
| minimum=1, |
| maximum=20, |
| value=4, |
| step=1, |
| label="Number of Results", |
| info="Results to fetch (1-20)", |
| ) |
| chunk_size_input = gr.Slider(100, 4000, value=1000, step=50, label="Chunk Size (characters)") |
| heading_level_input = gr.Slider(1, 6, value=3, step=1, label="Max Heading Level") |
| with gr.Row(): |
| min_chars_input = gr.Slider(0, 1000, value=50, step=10, label="Min characters per chunk") |
| max_chars_input = gr.Slider(500, 10000, value=4000, step=100, label="Max characters per section") |
| with gr.Row(): |
| tokenizer_input = gr.Dropdown(choices=["character"], value="character", label="Tokenizer") |
| overlap_input = gr.Slider(0, 400, value=0, step=10, label="Chunk overlap (reserved)") |
| clean_text_input = gr.Checkbox(value=True, label="Clean text (strip inline markdown/URLs)") |
|
|
| search_button = gr.Button("Search + Chunk", variant="primary") |
|
|
| output = gr.Textbox( |
| label="Chunks (JSON List[Dict])", |
| lines=25, |
| max_lines=50, |
| info="Output is a JSON string list of chunk dicts", |
| ) |
|
|
| |
| gr.Examples( |
| examples=[ |
| ["OpenAI GPT-5 latest developments", "news", 5], |
| ["React hooks useState", "search", 4], |
| ["Tesla stock price today", "news", 6], |
| ["Apple Vision Pro reviews", "search", 4], |
| ["best Italian restaurants NYC", "search", 4], |
| ], |
| inputs=[ |
| query_input, |
| search_type_input, |
| num_results_input, |
| tokenizer_input, |
| chunk_size_input, |
| overlap_input, |
| heading_level_input, |
| min_chars_input, |
| max_chars_input, |
| clean_text_input, |
| ], |
| outputs=output, |
| fn=search_and_chunk, |
| cache_examples=False, |
| ) |
|
|
| def _set_serper_key(key: str) -> str: |
| global SERPER_API_KEY_OVERRIDE |
| SERPER_API_KEY_OVERRIDE = (key or "").strip() or None |
| |
| if SERPER_API_KEY_OVERRIDE: |
| return "Serper API key saved in-session." |
| return "Cleared in-session API key. Using environment if set." |
|
|
| set_key_btn.click(fn=_set_serper_key, inputs=serper_key_input, outputs=output) |
|
|
| with gr.Tab("Analytics"): |
| gr.Markdown("## Community Usage Analytics") |
| gr.Markdown( |
| "Track daily request counts and average response times from all community users." |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| requests_plot = gr.BarPlot( |
| value=last_n_days_df( |
| 14 |
| ), |
| x="date", |
| y="count", |
| title="Daily Request Count", |
| tooltip=["date", "count"], |
| height=350, |
| x_label_angle=-45, |
| container=False, |
| ) |
|
|
| with gr.Column(): |
| avg_time_plot = gr.BarPlot( |
| value=last_n_days_avg_time_df(14), |
| x="date", |
| y="avg_time", |
| title="Average Request Time (seconds)", |
| tooltip=["date", "avg_time", "request_count"], |
| height=350, |
| x_label_angle=-45, |
| container=False, |
| ) |
|
|
| search_button.click( |
| fn=search_and_chunk, |
| inputs=[ |
| query_input, |
| search_type_input, |
| num_results_input, |
| tokenizer_input, |
| chunk_size_input, |
| overlap_input, |
| heading_level_input, |
| min_chars_input, |
| max_chars_input, |
| clean_text_input, |
| ], |
| outputs=output, |
| api_name=False, |
| ) |
|
|
| |
| demo.load( |
| fn=lambda: (last_n_days_df(14), last_n_days_avg_time_df(14)), |
| outputs=[requests_plot, avg_time_plot], |
| api_name=False, |
| ) |
|
|
| |
| gr.api(search_and_chunk, api_name="search_and_chunk") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
|
|
| def _run_markdown_chunker( |
| markdown_text: str, |
| tokenizer_or_token_counter: str = "character", |
| chunk_size: int = 1000, |
| chunk_overlap: int = 0, |
| heading_level: int = 3, |
| min_characters_per_chunk: int = 50, |
| max_characters_per_section: int = 4000, |
| clean_text: bool = True, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Use chonkie's MarkdownChunker or MarkdownParser to chunk markdown text and |
| return a List[Dict] with useful fields. |
| |
| This follows the documentation in the chonkie commit introducing MarkdownChunker |
| and its parameters. |
| """ |
| markdown_text = markdown_text or "" |
| if not markdown_text.strip(): |
| return [] |
|
|
| |
| try: |
| try: |
| from chonkie import MarkdownParser |
| except Exception: |
| try: |
| from chonkie.chunker.markdown import MarkdownParser |
| except Exception: |
| MarkdownParser = None |
| try: |
| from chonkie import MarkdownChunker |
| except Exception: |
| from chonkie.chunker.markdown import MarkdownChunker |
| except Exception as exc: |
| return [{ |
| "error": "chonkie not installed", |
| "detail": "Install chonkie from the feat/markdown-chunker branch", |
| "exception": str(exc), |
| }] |
|
|
| |
| if 'MarkdownParser' in globals() and MarkdownParser is not None: |
| try: |
| parser = MarkdownParser( |
| tokenizer_or_token_counter=tokenizer_or_token_counter, |
| chunk_size=int(chunk_size), |
| chunk_overlap=int(chunk_overlap), |
| heading_level=int(heading_level), |
| min_characters_per_chunk=int(min_characters_per_chunk), |
| max_characters_per_section=int(max_characters_per_section), |
| clean_text=bool(clean_text), |
| ) |
| result = parser.parse(markdown_text) if hasattr(parser, 'parse') else parser(markdown_text) |
| |
| if isinstance(result, list) and (not result or isinstance(result[0], dict)): |
| return result |
| |
| chunks = result |
| except Exception: |
| |
| chunks = None |
| else: |
| chunks = None |
|
|
| |
| if chunks is None: |
| chunker = MarkdownChunker( |
| tokenizer_or_token_counter=tokenizer_or_token_counter, |
| chunk_size=int(chunk_size), |
| chunk_overlap=int(chunk_overlap), |
| heading_level=int(heading_level), |
| min_characters_per_chunk=int(min_characters_per_chunk), |
| max_characters_per_section=int(max_characters_per_section), |
| clean_text=bool(clean_text), |
| ) |
| if hasattr(chunker, 'chunk'): |
| chunks = chunker.chunk(markdown_text) |
| elif hasattr(chunker, 'split_text'): |
| chunks = chunker.split_text(markdown_text) |
| elif callable(chunker): |
| chunks = chunker(markdown_text) |
| else: |
| return [{"error": "Unknown MarkdownChunker interface"}] |
|
|
| |
| normalized: List[Dict[str, Any]] = [] |
| for c in (chunks or []): |
| if isinstance(c, dict): |
| normalized.append(c) |
| continue |
| item: Dict[str, Any] = {} |
| for field in ("text", "start_index", "end_index", "token_count", "heading", "metadata"): |
| if hasattr(c, field): |
| try: |
| item[field] = getattr(c, field) |
| except Exception: |
| pass |
| if not item: |
| |
| item = {"text": str(c)} |
| normalized.append(item) |
| return normalized |
|
|
|
|
| with demo: |
| pass |
|
|
|
|
| if __name__ == "__main__": |
| |
| |
| demo.launch(mcp_server=True, show_api=True) |
|
|