Spaces:
Running
Running
| """ | |
| Medium MCP Server v3.0 | |
| A comprehensive MCP server for Medium article scraping with: | |
| - Full MCP specification compliance (annotations, progress, logging) | |
| - 12 Tools, 4 Resources, 9 Prompts | |
| - ElevenLabs Creator API for premium audio | |
| - HTTP transport support for remote deployment | |
| """ | |
| import sys | |
| import os | |
| import asyncio | |
| import uuid | |
| from typing import List, Dict, Any, Optional | |
| from contextlib import asynccontextmanager | |
| # No sys.path needed - src/ is in same project now | |
| from mcp.server.fastmcp import FastMCP, Context, Image | |
| import httpx | |
| # Local imports | |
| from src.config import MCPConfig, ELEVENLABS_CHAR_LIMITS, ELEVENLABS_OUTPUT_FORMATS | |
| from elevenlabs_voices import ELEVENLABS_VOICES, get_voice_id, get_voices_info, VOICE_CATEGORIES | |
| # Medium-Scraper imports | |
| from src.service import ScraperService | |
| from src.html_renderer import render_article_html, render_full_page | |
| # LLM imports | |
| from groq import Groq | |
| # ============================================================================ | |
| # LIFESPAN MANAGEMENT | |
| # ============================================================================ | |
| class AppContext: | |
| """Application-wide resources managed by lifespan.""" | |
| def __init__(self, scraper: ScraperService, config: MCPConfig, elevenlabs): | |
| self.scraper = scraper | |
| self.config = config | |
| self.elevenlabs = elevenlabs | |
| async def app_lifespan(server: FastMCP): | |
| """Manage scraper and API clients lifecycle.""" | |
| global _app_context | |
| config = MCPConfig.from_env() | |
| scraper = ScraperService(max_workers=config.max_workers) | |
| # Check if ElevenLabs is available | |
| elevenlabs_available = bool(os.environ.get("ELEVENLABS_API_KEY")) | |
| if elevenlabs_available: | |
| print("[INFO] ElevenLabs API key found") | |
| else: | |
| print("[WARN] ELEVENLABS_API_KEY not set, TTS will use fallbacks") | |
| try: | |
| await scraper.ensure_initialized() | |
| app_ctx = AppContext(scraper=scraper, config=config, elevenlabs=elevenlabs_available) | |
| _app_context = app_ctx # Set module-level reference for resources | |
| print("[INFO] Medium MCP Server v2.0 initialized") | |
| yield app_ctx | |
| finally: | |
| _app_context = None | |
| await scraper.close() | |
| print("[INFO] Medium MCP Server shutdown complete") | |
| # Initialize FastMCP with lifespan and instructions | |
| mcp = FastMCP( | |
| "Medium Scraper v3", | |
| lifespan=app_lifespan, | |
| instructions="""This MCP server provides comprehensive access to Medium articles. | |
| **Key Capabilities:** | |
| - Scrape any Medium article (35+ domains supported including TowardsDataScience) | |
| - Search and discover trending content by topic or tag | |
| - Generate audio podcasts from articles using ElevenLabs TTS | |
| - Synthesize research reports using AI (Gemini/OpenAI) | |
| - Export to markdown, HTML, or JSON | |
| **Recommended Workflow:** | |
| 1. Use `medium_search(topic)` or `medium_fresh(tag)` to find articles | |
| 2. Use `medium_scrape(url)` to get full article content | |
| 3. Use `medium_synthesize(topic)` for AI-powered topic analysis | |
| 4. Use `medium_cast(url)` to generate audio versions | |
| **Resources available:** Trending articles, tag feeds, search results | |
| **Prompts available:** Article summarization, social media posts, research reports | |
| """ | |
| ) | |
| # Module-level reference for resources (set during lifespan) | |
| _app_context: Optional[AppContext] = None | |
| # ============================================================================ | |
| # HELPER FUNCTIONS | |
| # ============================================================================ | |
| def get_app_context(ctx: Context) -> AppContext: | |
| """Get application context from request context.""" | |
| return ctx.request_context.lifespan_context | |
| def truncate_for_model(text: str, model: str) -> str: | |
| """Truncate text to model's character limit.""" | |
| max_chars = ELEVENLABS_CHAR_LIMITS.get(model, 10000) | |
| if len(text) > max_chars: | |
| return text[:max_chars - 50] + "\n\n... End of audio preview." | |
| return text | |
| def handle_paywall(article: Dict) -> Dict: | |
| """Add paywall warning if content appears truncated.""" | |
| if not article: | |
| return {"error": "No article data"} | |
| content = article.get("markdownContent", "") | |
| is_locked = article.get("isLocked", False) | |
| if is_locked or (content and len(content) < 500): | |
| article["_paywall_warning"] = "Content may be behind a paywall" | |
| return article | |
| # ============================================================================ | |
| # RESOURCES (Structured JSON responses) | |
| # Note: MCP resources have different signature requirements than tools. | |
| # We use a module-level reference that gets set during lifespan. | |
| # ============================================================================ | |
| import json | |
| async def get_trending() -> str: | |
| """Returns trending articles as JSON string.""" | |
| if not _app_context: | |
| return '{"error": "Server not initialized"}' | |
| results = await _app_context.scraper.scrape_tag("trending", max_articles=10) | |
| return json.dumps([ | |
| { | |
| "title": r.get("title"), | |
| "url": r.get("url"), | |
| "author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"), | |
| "readingTime": r.get("readingTime"), | |
| } | |
| for r in results | |
| ], ensure_ascii=False) | |
| async def get_tag_feed(tag: str) -> str: | |
| """Returns articles for a specific tag as JSON string.""" | |
| if not _app_context: | |
| return '{"error": "Server not initialized"}' | |
| results = await _app_context.scraper.scrape_tag(tag, max_articles=10) | |
| return json.dumps([ | |
| { | |
| "title": r.get("title"), | |
| "url": r.get("url"), | |
| "author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"), | |
| "readingTime": r.get("readingTime"), | |
| } | |
| for r in results | |
| ], ensure_ascii=False) | |
| async def get_search_results(query: str) -> str: | |
| """Returns search results as JSON string.""" | |
| if not _app_context: | |
| return '{"error": "Server not initialized"}' | |
| results = await _app_context.scraper.scrape_search(query, max_articles=10) | |
| return json.dumps([ | |
| { | |
| "title": r.get("title"), | |
| "url": r.get("url"), | |
| "author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"), | |
| "preview": (r.get("subtitle") or r.get("description", ""))[:200], | |
| } | |
| for r in results | |
| ], ensure_ascii=False) | |
| async def get_server_stats() -> str: | |
| """Returns server statistics and capabilities.""" | |
| return json.dumps({ | |
| "version": "3.0", | |
| "capabilities": { | |
| "tools": 10, | |
| "resources": 4, | |
| "prompts": 8, | |
| "features": [ | |
| "article_scraping", | |
| "batch_processing", | |
| "audio_generation", | |
| "ai_synthesis", | |
| "progress_notifications", | |
| "mcp_logging" | |
| ] | |
| }, | |
| "supported_domains": [ | |
| "medium.com", | |
| "towardsdatascience.com", | |
| "levelup.gitconnected.com", | |
| "betterprogramming.pub", | |
| "javascript.plainenglish.io", | |
| "35+ total domains" | |
| ], | |
| "tts_providers": ["elevenlabs", "edge-tts", "openai"], | |
| "ai_providers": ["groq", "gemini", "openai"] | |
| }, ensure_ascii=False) | |
| # ============================================================================ | |
| # TOOLS - Core Scraping | |
| # ============================================================================ | |
| async def medium_scrape( | |
| url: str, | |
| output_format: str = "both", | |
| force_refresh: bool = False, | |
| enable_enhancements: bool = False, | |
| ctx: Context = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Scrape a Medium article with full v3.0 capabilities. | |
| Args: | |
| url: Medium article URL (supports 35+ domains including towardsdatascience.com) | |
| output_format: "markdown", "html", or "both" (default: both) | |
| force_refresh: Bypass cache and re-scrape (default: false) | |
| enable_enhancements: Enable KG extraction, embeddings (adds ~15s, default: false) | |
| Returns: | |
| Article with title, author, content, tags, and metadata | |
| """ | |
| app = get_app_context(ctx) | |
| article = await app.scraper.scrape_article( | |
| url, | |
| force_refresh=force_refresh, | |
| enable_enhancements=enable_enhancements | |
| ) | |
| if not article or article.get("error"): | |
| return article or {"error": "Failed to scrape article", "url": url} | |
| # Add HTML if requested | |
| if output_format in ["html", "both"]: | |
| try: | |
| article["htmlContent"] = render_article_html(article) | |
| except Exception as e: | |
| article["htmlContent"] = f"<p>Error rendering HTML: {e}</p>" | |
| # Remove markdown if only HTML requested | |
| if output_format == "html": | |
| article.pop("markdownContent", None) | |
| return handle_paywall(article) | |
| async def medium_batch( | |
| urls: List[str], | |
| max_concurrency: int = 5, | |
| output_format: str = "both", | |
| ctx: Context = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Scrape multiple Medium articles in parallel. | |
| Args: | |
| urls: List of Medium article URLs (max 20) | |
| max_concurrency: Number of parallel workers (1-10, default: 5) | |
| output_format: Output format for all articles (default: both) | |
| Returns: | |
| {success: [...], failed: [...], stats: {total, success, failed}} | |
| """ | |
| app = get_app_context(ctx) | |
| if len(urls) > app.config.max_batch_size: | |
| return {"error": f"Maximum batch size is {app.config.max_batch_size} URLs"} | |
| max_concurrency = min(max(1, max_concurrency), 10) | |
| semaphore = asyncio.Semaphore(max_concurrency) | |
| success = [] | |
| failed = [] | |
| async def scrape_one(url: str, index: int): | |
| async with semaphore: | |
| try: | |
| article = await app.scraper.scrape_article(url) | |
| if article and not article.get("error"): | |
| if output_format in ["html", "both"]: | |
| article["htmlContent"] = render_article_html(article) | |
| if output_format == "html": | |
| article.pop("markdownContent", None) | |
| success.append(article) | |
| else: | |
| failed.append({"url": url, "error": article.get("error", "Unknown error")}) | |
| except Exception as e: | |
| failed.append({"url": url, "error": str(e)}) | |
| finally: | |
| # Report progress after each URL is processed | |
| if ctx: | |
| await ctx.report_progress( | |
| progress=len(success) + len(failed), | |
| total=len(urls) | |
| ) | |
| await asyncio.gather(*[scrape_one(url, i) for i, url in enumerate(urls)]) | |
| return { | |
| "success": success, | |
| "failed": failed, | |
| "stats": { | |
| "total": len(urls), | |
| "success": len(success), | |
| "failed": len(failed) | |
| } | |
| } | |
| async def medium_search(query: str, max_articles: int = 10, ctx: Context = None) -> List[Dict[str, Any]]: | |
| """ | |
| Search Medium for articles. | |
| Args: | |
| query: Search query (e.g., "AI Agents", "Python Asyncio") | |
| max_articles: Maximum articles to return (default: 10) | |
| Returns: | |
| List of article previews with title, url, author | |
| """ | |
| app = get_app_context(ctx) | |
| results = await app.scraper.scrape_search(query, max_articles=max_articles) | |
| return results | |
| async def medium_fresh(tag: str, max_articles: int = 10, ctx: Context = None) -> List[Dict[str, Any]]: | |
| """ | |
| Get the latest articles for a specific tag. | |
| Args: | |
| tag: Topic tag (e.g., "artificial-intelligence", "python") | |
| max_articles: Maximum articles to return (default: 10) | |
| Returns: | |
| List of article previews | |
| """ | |
| app = get_app_context(ctx) | |
| results = await app.scraper.scrape_tag(tag, max_articles=max_articles) | |
| return results | |
| async def medium_render_html(url: str, standalone: bool = False, ctx: Context = None) -> str: | |
| """ | |
| Render a Medium article as beautiful HTML. | |
| Args: | |
| url: Medium article URL | |
| standalone: If True, returns complete HTML page with <html>, <head>, etc. | |
| Returns: | |
| HTML string with Tailwind CSS styling | |
| """ | |
| app = get_app_context(ctx) | |
| article = await app.scraper.scrape_article(url) | |
| if not article or article.get("error"): | |
| return f"<div class='error'>Failed to scrape: {article.get('error', 'Unknown')}</div>" | |
| if standalone: | |
| return render_full_page(article) | |
| else: | |
| return render_article_html(article) | |
| async def medium_export( | |
| url: str, | |
| format: str = "markdown", | |
| ctx: Context = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Export a Medium article to various formats. | |
| Args: | |
| url: Medium article URL | |
| format: Export format - "markdown", "html", "json" | |
| Returns: | |
| {content: ..., format: ..., title: ...} | |
| """ | |
| app = get_app_context(ctx) | |
| article = await app.scraper.scrape_article(url) | |
| if not article or article.get("error"): | |
| return {"error": article.get("error", "Failed to scrape")} | |
| title = article.get("title", "article") | |
| if format == "markdown": | |
| return { | |
| "content": article.get("markdownContent", ""), | |
| "format": "markdown", | |
| "title": title | |
| } | |
| elif format == "html": | |
| html = render_full_page(article) | |
| return { | |
| "content": html, | |
| "format": "html", | |
| "title": title | |
| } | |
| elif format == "json": | |
| return { | |
| "content": article, | |
| "format": "json", | |
| "title": title | |
| } | |
| else: | |
| return {"error": f"Unsupported format: {format}. Use: markdown, html, json"} | |
| # ============================================================================ | |
| # TOOLS - Audio (ElevenLabs) | |
| # ============================================================================ | |
| async def medium_cast( | |
| url: str, | |
| voice: str = "george", | |
| model: str = "eleven_multilingual_v2", | |
| quality: str = "premium", | |
| summarize: str = "auto", | |
| max_chars: int = 250, | |
| ctx: Context = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convert a Medium article into premium audio podcast using ElevenLabs. | |
| Args: | |
| url: Medium article URL | |
| voice: Voice name or ID. Popular: "george" (British), "adam" (American), | |
| "rachel" (calm female), "brian" (narrator), "alice" (British female) | |
| model: TTS model - "eleven_multilingual_v2" (10k chars, recommended), | |
| "eleven_flash_v2_5" (40k, fastest), "eleven_turbo_v2_5" (40k, balanced) | |
| quality: "standard", "high", or "premium" (Creator tier) | |
| summarize: "auto" (summarize if > max_chars), "always", or "none" | |
| max_chars: Target character limit for summarization (default: 250) | |
| Returns: | |
| {audio_path, title, voice, model, duration_estimate, provider} | |
| """ | |
| app = get_app_context(ctx) | |
| # Scrape article (Phase 1) | |
| if ctx: | |
| await ctx.report_progress(progress=1, total=3) | |
| article = await app.scraper.scrape_article(url) | |
| if not article or not article.get("markdownContent"): | |
| return {"error": "Failed to scrape article or no content", "url": url} | |
| text = article["markdownContent"] | |
| title = article.get("title", "article") | |
| original_length = len(text) | |
| # Summarization logic | |
| should_summarize = ( | |
| summarize == "always" or | |
| (summarize == "auto" and len(text) > max_chars) | |
| ) | |
| if should_summarize and summarize != "none": | |
| groq_key = os.environ.get("GROQ_API_KEY") | |
| gemini_key = os.environ.get("GEMINI_API_KEY") | |
| summarize_success = False | |
| prompt = f"""You are creating a quick audio summary for busy professionals. In EXACTLY {max_chars} characters or less, give the ONE most valuable insight or actionable takeaway from this article. | |
| Format: Start with the key insight, then briefly explain why it matters. | |
| Style: Conversational, engaging, like a smart friend sharing a tip. | |
| Goal: The listener should feel they learned something useful in 15 seconds. | |
| Article Title: "{title}" | |
| Article Content: | |
| {text[:8000]} | |
| Your {max_chars}-character summary (make every word count):""" | |
| # Try Groq first (PRIMARY - fastest) | |
| if groq_key and not summarize_success: | |
| try: | |
| client = Groq(api_key=groq_key) | |
| response = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", # Fast model for summarization | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| text = response.choices[0].message.content.strip()[:max_chars] | |
| summarize_success = True | |
| if ctx: | |
| await ctx.info(f"Summarized with Groq: {original_length} -> {len(text)} chars") | |
| except Exception as e: | |
| if ctx: | |
| await ctx.warning(f"Groq failed: {e}, trying Gemini...") | |
| # Fallback to Gemini (BACKUP) - Using new google.genai SDK | |
| if gemini_key and not summarize_success: | |
| try: | |
| from google import genai | |
| client = genai.Client(api_key=gemini_key) | |
| response = client.models.generate_content( | |
| model='gemini-2.0-flash-exp', | |
| contents=prompt | |
| ) | |
| text = response.text.strip()[:max_chars] | |
| summarize_success = True | |
| if ctx: | |
| await ctx.info(f"Summarized with Gemini: {original_length} -> {len(text)} chars") | |
| except Exception as e: | |
| if ctx: | |
| await ctx.warning(f"Gemini also failed: {e}, using truncation") | |
| # Final fallback: truncation | |
| if not summarize_success: | |
| text = text[:max_chars] | |
| else: | |
| # Just truncate to model limit | |
| text = truncate_for_model(text, model) | |
| # Resolve voice ID | |
| voice_id = get_voice_id(voice) | |
| # Output format | |
| output_format = ELEVENLABS_OUTPUT_FORMATS.get(quality, "mp3_44100_192") | |
| # Output path | |
| outputs_dir = app.config.audio_output_dir | |
| os.makedirs(outputs_dir, exist_ok=True) | |
| safe_title = "".join(c if c.isalnum() else "_" for c in title)[:40] | |
| output_path = os.path.join(outputs_dir, f"{safe_title}_{voice}_{uuid.uuid4().hex[:6]}.mp3") | |
| # Try ElevenLabs (PRIMARY) | |
| elevenlabs_key = os.environ.get("ELEVENLABS_API_KEY") | |
| if elevenlabs_key: | |
| try: | |
| from elevenlabs.client import ElevenLabs | |
| def _generate_audio(): | |
| client = ElevenLabs(api_key=elevenlabs_key) | |
| audio = client.text_to_speech.convert( | |
| text=text, | |
| voice_id=voice_id, | |
| model_id=model, | |
| output_format=output_format, | |
| ) | |
| with open(output_path, "wb") as f: | |
| for chunk in audio: | |
| f.write(chunk) | |
| await asyncio.to_thread(_generate_audio) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| return { | |
| "audio_path": os.path.abspath(output_path), | |
| "title": title, | |
| "voice": voice, | |
| "voice_id": voice_id, | |
| "model": model, | |
| "quality": quality, | |
| "duration_estimate": f"{len(text) // 150} min", | |
| "characters_used": len(text), | |
| "provider": "elevenlabs" | |
| } | |
| except Exception as e: | |
| if ctx: | |
| await ctx.warning(f"ElevenLabs failed: {e}, trying fallback...") | |
| # Fallback: Edge-TTS (Free) | |
| try: | |
| import edge_tts | |
| text_truncated = text[:4000] # Edge-TTS limit | |
| communicate = edge_tts.Communicate(text_truncated, "en-US-ChristopherNeural") | |
| await communicate.save(output_path) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| return { | |
| "audio_path": os.path.abspath(output_path), | |
| "title": title, | |
| "voice": "en-US-ChristopherNeural", | |
| "duration_estimate": f"{len(text_truncated) // 150} min", | |
| "characters_used": len(text_truncated), | |
| "provider": "edge-tts", | |
| "note": "Free fallback, limited to 4000 chars" | |
| } | |
| except Exception as e: | |
| if ctx: | |
| await ctx.warning(f"Edge-TTS failed: {e}") | |
| # Fallback: OpenAI TTS | |
| openai_key = os.environ.get("OPENAI_API_KEY") | |
| if openai_key: | |
| try: | |
| from openai import AsyncOpenAI | |
| client = AsyncOpenAI(api_key=openai_key) | |
| response = await client.audio.speech.create( | |
| model="tts-1-hd" if quality == "premium" else "tts-1", | |
| voice="onyx", | |
| input=text[:4096] | |
| ) | |
| response.stream_to_file(output_path) | |
| return { | |
| "audio_path": os.path.abspath(output_path), | |
| "title": title, | |
| "voice": "onyx", | |
| "duration_estimate": f"{min(len(text), 4096) // 150} min", | |
| "provider": "openai" | |
| } | |
| except Exception as e: | |
| print(f"[WARN] OpenAI TTS failed: {e}") | |
| return {"error": "All TTS providers failed", "url": url} | |
| async def medium_voices(ctx: Context = None) -> Dict[str, Any]: | |
| """ | |
| List available ElevenLabs voices for medium_cast. | |
| Returns: | |
| Voice categories, recommendations, and model info | |
| """ | |
| app = get_app_context(ctx) | |
| # Try to fetch live voices | |
| live_voices = [] | |
| if app.elevenlabs: | |
| try: | |
| result = await app.elevenlabs.voices.search() | |
| live_voices = [ | |
| {"name": v.name, "id": v.voice_id, "category": getattr(v, 'category', 'unknown')} | |
| for v in result.voices[:20] | |
| ] | |
| except Exception: | |
| pass | |
| return { | |
| "recommended": { | |
| "george": {"id": "JBFqnCBsd6RMkjVDRZzb", "desc": "British, warm narrator (DEFAULT)"}, | |
| "adam": {"id": "pNInz6obpgDQGcFmaJgB", "desc": "American, deep narrator"}, | |
| "rachel": {"id": "21m00Tcm4TlvDq8ikWAM", "desc": "American, calm female"}, | |
| "brian": {"id": "nPczCjzI2devNBz1zQrb", "desc": "American, narrator"}, | |
| "alice": {"id": "Xb7hH8MSUJpSbSDYk0k2", "desc": "British, confident female"}, | |
| }, | |
| "categories": VOICE_CATEGORIES, | |
| "models": { | |
| "eleven_multilingual_v2": "Recommended, 10k chars, 29 languages", | |
| "eleven_flash_v2_5": "Fastest (~75ms), 40k chars, 32 languages", | |
| "eleven_turbo_v2_5": "Balanced, 40k chars, 32 languages", | |
| "eleven_v3": "Most expressive, 5k chars, 70+ languages", | |
| }, | |
| "quality_options": ELEVENLABS_OUTPUT_FORMATS, | |
| "live_voices": live_voices, | |
| "total_premade_voices": len(ELEVENLABS_VOICES), | |
| } | |
| # ============================================================================ | |
| # TOOLS - Synthesis | |
| # ============================================================================ | |
| async def medium_synthesize(topic: str, max_articles: int = 5, ctx: Context = None) -> str: | |
| """ | |
| Synthesize a 'State of the Union' report on a topic using top Medium articles. | |
| Args: | |
| topic: Topic to analyze (e.g., "Generative AI", "Web Development 2024") | |
| max_articles: Number of articles to analyze (default: 5) | |
| Returns: | |
| Synthesized research report | |
| """ | |
| app = get_app_context(ctx) | |
| groq_key = os.environ.get("GROQ_API_KEY") | |
| gemini_key = os.environ.get("GEMINI_API_KEY") | |
| openai_key = os.environ.get("OPENAI_API_KEY") | |
| if not groq_key and not gemini_key and not openai_key: | |
| return "Error: No AI API keys set (GROQ_API_KEY, GEMINI_API_KEY, or OPENAI_API_KEY)." | |
| # Scrape articles | |
| if ctx: | |
| await ctx.report_progress(progress=1, total=3) # Phase 1: Search | |
| articles = await app.scraper.scrape_search(topic, max_articles=max_articles) | |
| if not articles: | |
| return "No articles found to synthesize." | |
| # Prepare context | |
| async def get_article_content(art): | |
| url = art.get('url') | |
| title = art.get('title', 'Untitled') | |
| author = art.get('author', {}).get('name') if isinstance(art.get('author'), dict) else art.get('author', 'Unknown') | |
| try: | |
| full_art = await app.scraper.scrape_article(url) | |
| content = full_art.get("markdownContent", "")[:2000] | |
| except: | |
| content = f"(Content unavailable)" | |
| return f"\nTitle: {title}\nAuthor: {author}\nURL: {url}\nContent:\n{content}\n" | |
| results = await asyncio.gather(*[get_article_content(art) for art in articles]) | |
| context_text = "".join(results) | |
| if ctx: | |
| await ctx.report_progress(progress=2, total=3) # Phase 2: Scraped articles | |
| prompt = f"""You are a tech analyst. Synthesize the following Medium articles into a 'State of the Union' report. | |
| Topic: {topic} | |
| Structure your report: | |
| 1. Executive Summary (2-3 sentences) | |
| 2. Key Trends | |
| 3. Notable Insights | |
| 4. Contrarian Views (if any) | |
| 5. Recommended Reading | |
| Articles: | |
| {context_text} | |
| """ | |
| # Try Groq first (PRIMARY - fastest) | |
| if groq_key: | |
| try: | |
| client = Groq(api_key=groq_key) | |
| response = client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", # Best model for synthesis | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=2000, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| if ctx: | |
| await ctx.warning(f"Groq failed: {e}") | |
| # Fallback to Gemini - Using new google.genai SDK | |
| if gemini_key: | |
| try: | |
| from google import genai | |
| client = genai.Client(api_key=gemini_key) | |
| response = client.models.generate_content( | |
| model='gemini-2.0-flash-exp', | |
| contents=prompt | |
| ) | |
| return response.text | |
| except Exception as e: | |
| if ctx: | |
| await ctx.warning(f"Gemini failed: {e}") | |
| # Fallback to OpenAI | |
| if openai_key: | |
| try: | |
| from openai import AsyncOpenAI | |
| client = AsyncOpenAI(api_key=openai_key) | |
| response = await client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: All providers failed. Last error: {e}" | |
| return "Error: No AI service available." | |
| # ============================================================================ | |
| # TOOLS - Utility | |
| # ============================================================================ | |
| async def get_thumbnail(image_url: str) -> Image: | |
| """ | |
| Fetch an image from a URL and return it as an MCP Image. | |
| Args: | |
| image_url: The URL of the image to fetch | |
| Returns: | |
| Image object for display | |
| """ | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(image_url) | |
| response.raise_for_status() | |
| return Image(data=response.content, format="png") | |
| async def medium_related( | |
| url: str, | |
| max_articles: int = 5, | |
| ctx: Context = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Find articles related to a given Medium article. | |
| Args: | |
| url: URL of the source article | |
| max_articles: Maximum related articles to return (default: 5) | |
| Returns: | |
| List of related articles with similarity scores | |
| """ | |
| app = get_app_context(ctx) | |
| # Scrape the source article to get its tags and topic | |
| article = await app.scraper.scrape_article(url) | |
| if not article or article.get("error"): | |
| return [{"error": "Failed to scrape source article"}] | |
| # Get tags from the article | |
| tags = article.get("tags", []) | |
| if not tags: | |
| # Try to infer from title | |
| title = article.get("title", "") | |
| tags = [title.split()[0]] if title else ["technology"] | |
| # Search for related articles using the first tag | |
| primary_tag = tags[0] if isinstance(tags, list) and tags else "technology" | |
| related = await app.scraper.scrape_tag(primary_tag, max_articles=max_articles + 2) | |
| # Filter out the source article | |
| source_url = url.rstrip("/") | |
| related = [r for r in related if r.get("url", "").rstrip("/") != source_url][:max_articles] | |
| return [{ | |
| "title": r.get("title"), | |
| "url": r.get("url"), | |
| "author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"), | |
| "readingTime": r.get("readingTime"), | |
| "relevance": "tag_match" | |
| } for r in related] | |
| async def medium_recommend( | |
| interests: List[str], | |
| reading_time: int = 30, | |
| ctx: Context = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get personalized article recommendations based on interests. | |
| Args: | |
| interests: List of topics you're interested in (e.g., ["AI", "Python", "startups"]) | |
| reading_time: Target total reading time in minutes (default: 30) | |
| Returns: | |
| Curated reading list with estimated total time | |
| """ | |
| app = get_app_context(ctx) | |
| all_articles = [] | |
| for interest in interests[:3]: # Limit to 3 interests | |
| articles = await app.scraper.scrape_search(interest, max_articles=5) | |
| for art in articles: | |
| art["interest"] = interest | |
| all_articles.extend(articles) | |
| # Deduplicate by URL | |
| seen_urls = set() | |
| unique_articles = [] | |
| for art in all_articles: | |
| url = art.get("url", "") | |
| if url not in seen_urls: | |
| seen_urls.add(url) | |
| unique_articles.append(art) | |
| # Estimate reading times and filter | |
| reading_list = [] | |
| total_time = 0 | |
| for art in unique_articles: | |
| est_time = art.get("readingTime", 5) | |
| if isinstance(est_time, str): | |
| est_time = int(est_time.split()[0]) if est_time.split()[0].isdigit() else 5 | |
| if total_time + est_time <= reading_time: | |
| reading_list.append({ | |
| "title": art.get("title"), | |
| "url": art.get("url"), | |
| "author": art.get("author", {}).get("name") if isinstance(art.get("author"), dict) else art.get("author"), | |
| "readingTime": est_time, | |
| "interest": art.get("interest") | |
| }) | |
| total_time += est_time | |
| return { | |
| "reading_list": reading_list, | |
| "total_articles": len(reading_list), | |
| "total_reading_time": total_time, | |
| "interests_covered": list(set(a.get("interest") for a in reading_list)) | |
| } | |
| # ============================================================================ | |
| # PROMPTS | |
| # ============================================================================ | |
| def summarize_article(url: str) -> str: | |
| """Create a prompt to summarize a Medium article.""" | |
| return f"""Read and summarize this Medium article: {url} | |
| Structure your summary: | |
| 1. **Main Thesis**: One sentence summary | |
| 2. **Key Points**: 3-5 bullet points | |
| 3. **Novel Insights**: What's new or surprising | |
| 4. **Actionable Takeaways**: What can the reader do""" | |
| def tweet_thread(url: str) -> str: | |
| """Create a prompt to turn an article into a Twitter thread.""" | |
| return f"""Convert this article into a viral Twitter thread: {url} | |
| Guidelines: | |
| - 5-7 tweets maximum | |
| - First tweet must be a hook | |
| - Use emojis strategically (not excessively) | |
| - End with a call to action | |
| - Include relevant hashtags in final tweet""" | |
| def linkedin_post(url: str) -> str: | |
| """Create a prompt to turn an article into a LinkedIn post.""" | |
| return f"""Transform this article into an engaging LinkedIn post: {url} | |
| Guidelines: | |
| - Start with a hook (question or bold statement) | |
| - Keep it under 1300 characters | |
| - Use line breaks for readability | |
| - Include 3-5 relevant hashtags at the end | |
| - End with a question to drive engagement""" | |
| def newsletter(topic: str, article_count: int = 5) -> str: | |
| """Create a prompt for a newsletter digest on a topic.""" | |
| return f"""Create a newsletter digest on "{topic}" using the top {article_count} Medium articles. | |
| Format: | |
| - Catchy subject line | |
| - Brief intro paragraph (2-3 sentences) | |
| - For each article: | |
| • Title with brief summary | |
| • Why it matters | |
| - Closing with call to action""" | |
| def research_report(topic: str) -> str: | |
| """Create a prompt for a comprehensive research report.""" | |
| return f"""Create a comprehensive research report on "{topic}" using Medium articles. | |
| Structure: | |
| 1. **Executive Summary** (2-3 sentences) | |
| 2. **Current Trends** (What's hot in this space) | |
| 3. **Key Players** (Who's writing about this) | |
| 4. **Diverse Perspectives** (Different viewpoints) | |
| 5. **Future Outlook** (Predictions) | |
| 6. **Recommended Reading** (Top 3 articles with links)""" | |
| def code_tutorial(url: str) -> str: | |
| """Create a prompt to extract a code tutorial from an article.""" | |
| return f"""Extract and structure the code tutorial from this article: {url} | |
| Format: | |
| 1. **Prerequisites**: What you need installed/configured | |
| 2. **Step-by-Step**: | |
| - Step 1: [Description + Code] | |
| - Step 2: [Description + Code] | |
| - ... | |
| 3. **Complete Code**: Full working example | |
| 4. **Common Issues**: Troubleshooting tips""" | |
| def analyze_trending(focus: str = "technology") -> str: | |
| """Create a prompt to analyze trending Medium articles with a specific focus.""" | |
| return f"""Analyze the current trending articles on Medium with a focus on "{focus}". | |
| WORKFLOW: | |
| 1. First, use the `medium://trending` resource to get current trending articles | |
| 2. Select 3-5 articles most relevant to "{focus}" | |
| 3. Use `medium_scrape()` on each selected article | |
| ANALYSIS STRUCTURE: | |
| 1. **Trend Overview**: What themes are dominating? | |
| 2. **Key Insights**: Most valuable takeaways from each article | |
| 3. **Emerging Patterns**: What's changing in this space? | |
| 4. **Contrarian Views**: Any articles going against the grain? | |
| 5. **Recommendations**: Top 2-3 must-reads with reasons | |
| Focus area: {focus} | |
| Be specific and cite the articles you analyze.""" | |
| def deep_research(topic: str, depth: str = "comprehensive") -> str: | |
| """Create a structured multi-step research workflow prompt.""" | |
| return f"""Conduct a {depth} research analysis on "{topic}" using Medium articles. | |
| PHASE 1 - DISCOVERY: | |
| 1. Use `medium_search("{topic}")` to find relevant articles | |
| 2. Use `medium_fresh("{topic.replace(' ', '-')}")` for latest content | |
| 3. Note the top 5 most relevant articles | |
| PHASE 2 - DEEP ANALYSIS: | |
| 4. Use `medium_scrape()` on each selected article | |
| 5. Extract: main arguments, evidence, unique perspectives | |
| 6. Note any contradictions between articles | |
| PHASE 3 - SYNTHESIS: | |
| 7. Use `medium_synthesize("{topic}")` for AI-powered summary | |
| 8. Cross-reference with your own analysis | |
| 9. Identify gaps in coverage | |
| OUTPUT FORMAT: | |
| # Research Report: {topic} | |
| ## Executive Summary | |
| [2-3 sentences] | |
| ## Key Findings | |
| [Bullet points with citations] | |
| ## Diverse Perspectives | |
| [Different viewpoints from articles] | |
| ## Emerging Trends | |
| [What's changing?] | |
| ## Knowledge Gaps | |
| [What's missing from the discourse?] | |
| ## Recommended Reading | |
| [Top 3 articles with reasons] | |
| ## Sources | |
| [Full list of analyzed articles] | |
| Research depth: {depth} | |
| Topic: {topic}""" | |
| def content_repurpose(url: str, platforms: str = "all") -> str: | |
| """Create a prompt to repurpose an article for multiple platforms.""" | |
| return f"""Repurpose this Medium article for multiple content platforms: {url} | |
| TARGET PLATFORMS: {platforms} | |
| First, scrape the article using `medium_scrape("{url}")`. | |
| Then create content for each platform: | |
| ## Twitter/X Thread | |
| - 5-7 tweets | |
| - Hook first, value in middle, CTA at end | |
| - Include relevant emojis | |
| ## LinkedIn Post | |
| - Professional tone | |
| - 1000-1300 characters | |
| - Include a question for engagement | |
| ## Newsletter Blurb | |
| - 2-3 paragraphs | |
| - Highlight key insights | |
| - Clear call-to-action | |
| ## YouTube Script Outline | |
| - Hook (30 sec) | |
| - Main points (3-5 min) | |
| - Conclusion + CTA (1 min) | |
| ## Instagram Carousel | |
| - 7-10 slides | |
| - One key point per slide | |
| - Visual descriptions | |
| Ensure each format maintains the core message while optimizing for the platform's unique characteristics.""" | |
| # ============================================================================ | |
| # MAIN | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import sys | |
| # Check for HTTP transport flag | |
| if "--http" in sys.argv or "-h" in sys.argv: | |
| # Get port from args or use default | |
| port = 8000 | |
| for i, arg in enumerate(sys.argv): | |
| if arg in ("--port", "-p") and i + 1 < len(sys.argv): | |
| port = int(sys.argv[i + 1]) | |
| print(f"[INFO] Starting Medium MCP Server v3 in HTTP mode on port {port}") | |
| print(f"[INFO] Connect via: http://127.0.0.1:{port}/mcp") | |
| # Run with HTTP transport | |
| mcp.run(transport="sse", host="127.0.0.1", port=port) | |
| else: | |
| # Default: stdio transport for Claude Desktop | |
| mcp.run() | |