Medium-MCP / server.py
Nikhil Pravin Pise
feat: Migrate to google-genai SDK + fix remaining low-res images
545358b
"""
Medium MCP Server v3.0
A comprehensive MCP server for Medium article scraping with:
- Full MCP specification compliance (annotations, progress, logging)
- 12 Tools, 4 Resources, 9 Prompts
- ElevenLabs Creator API for premium audio
- HTTP transport support for remote deployment
"""
import sys
import os
import asyncio
import uuid
from typing import List, Dict, Any, Optional
from contextlib import asynccontextmanager
# No sys.path needed - src/ is in same project now
from mcp.server.fastmcp import FastMCP, Context, Image
import httpx
# Local imports
from src.config import MCPConfig, ELEVENLABS_CHAR_LIMITS, ELEVENLABS_OUTPUT_FORMATS
from elevenlabs_voices import ELEVENLABS_VOICES, get_voice_id, get_voices_info, VOICE_CATEGORIES
# Medium-Scraper imports
from src.service import ScraperService
from src.html_renderer import render_article_html, render_full_page
# LLM imports
from groq import Groq
# ============================================================================
# LIFESPAN MANAGEMENT
# ============================================================================
class AppContext:
"""Application-wide resources managed by lifespan."""
def __init__(self, scraper: ScraperService, config: MCPConfig, elevenlabs):
self.scraper = scraper
self.config = config
self.elevenlabs = elevenlabs
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Manage scraper and API clients lifecycle."""
global _app_context
config = MCPConfig.from_env()
scraper = ScraperService(max_workers=config.max_workers)
# Check if ElevenLabs is available
elevenlabs_available = bool(os.environ.get("ELEVENLABS_API_KEY"))
if elevenlabs_available:
print("[INFO] ElevenLabs API key found")
else:
print("[WARN] ELEVENLABS_API_KEY not set, TTS will use fallbacks")
try:
await scraper.ensure_initialized()
app_ctx = AppContext(scraper=scraper, config=config, elevenlabs=elevenlabs_available)
_app_context = app_ctx # Set module-level reference for resources
print("[INFO] Medium MCP Server v2.0 initialized")
yield app_ctx
finally:
_app_context = None
await scraper.close()
print("[INFO] Medium MCP Server shutdown complete")
# Initialize FastMCP with lifespan and instructions
mcp = FastMCP(
"Medium Scraper v3",
lifespan=app_lifespan,
instructions="""This MCP server provides comprehensive access to Medium articles.
**Key Capabilities:**
- Scrape any Medium article (35+ domains supported including TowardsDataScience)
- Search and discover trending content by topic or tag
- Generate audio podcasts from articles using ElevenLabs TTS
- Synthesize research reports using AI (Gemini/OpenAI)
- Export to markdown, HTML, or JSON
**Recommended Workflow:**
1. Use `medium_search(topic)` or `medium_fresh(tag)` to find articles
2. Use `medium_scrape(url)` to get full article content
3. Use `medium_synthesize(topic)` for AI-powered topic analysis
4. Use `medium_cast(url)` to generate audio versions
**Resources available:** Trending articles, tag feeds, search results
**Prompts available:** Article summarization, social media posts, research reports
"""
)
# Module-level reference for resources (set during lifespan)
_app_context: Optional[AppContext] = None
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def get_app_context(ctx: Context) -> AppContext:
"""Get application context from request context."""
return ctx.request_context.lifespan_context
def truncate_for_model(text: str, model: str) -> str:
"""Truncate text to model's character limit."""
max_chars = ELEVENLABS_CHAR_LIMITS.get(model, 10000)
if len(text) > max_chars:
return text[:max_chars - 50] + "\n\n... End of audio preview."
return text
def handle_paywall(article: Dict) -> Dict:
"""Add paywall warning if content appears truncated."""
if not article:
return {"error": "No article data"}
content = article.get("markdownContent", "")
is_locked = article.get("isLocked", False)
if is_locked or (content and len(content) < 500):
article["_paywall_warning"] = "Content may be behind a paywall"
return article
# ============================================================================
# RESOURCES (Structured JSON responses)
# Note: MCP resources have different signature requirements than tools.
# We use a module-level reference that gets set during lifespan.
# ============================================================================
import json
@mcp.resource(
"medium://trending",
name="Trending Articles",
description="Top trending Medium articles updated hourly",
mime_type="application/json"
)
async def get_trending() -> str:
"""Returns trending articles as JSON string."""
if not _app_context:
return '{"error": "Server not initialized"}'
results = await _app_context.scraper.scrape_tag("trending", max_articles=10)
return json.dumps([
{
"title": r.get("title"),
"url": r.get("url"),
"author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"),
"readingTime": r.get("readingTime"),
}
for r in results
], ensure_ascii=False)
@mcp.resource(
"medium://tag/{tag}",
name="Tag Feed",
description="Latest articles for a specific topic tag",
mime_type="application/json"
)
async def get_tag_feed(tag: str) -> str:
"""Returns articles for a specific tag as JSON string."""
if not _app_context:
return '{"error": "Server not initialized"}'
results = await _app_context.scraper.scrape_tag(tag, max_articles=10)
return json.dumps([
{
"title": r.get("title"),
"url": r.get("url"),
"author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"),
"readingTime": r.get("readingTime"),
}
for r in results
], ensure_ascii=False)
@mcp.resource(
"medium://search/{query}",
name="Search Results",
description="Search Medium for articles matching a query",
mime_type="application/json"
)
async def get_search_results(query: str) -> str:
"""Returns search results as JSON string."""
if not _app_context:
return '{"error": "Server not initialized"}'
results = await _app_context.scraper.scrape_search(query, max_articles=10)
return json.dumps([
{
"title": r.get("title"),
"url": r.get("url"),
"author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"),
"preview": (r.get("subtitle") or r.get("description", ""))[:200],
}
for r in results
], ensure_ascii=False)
@mcp.resource(
"medium://stats",
name="Server Statistics",
description="Current server stats and capabilities",
mime_type="application/json"
)
async def get_server_stats() -> str:
"""Returns server statistics and capabilities."""
return json.dumps({
"version": "3.0",
"capabilities": {
"tools": 10,
"resources": 4,
"prompts": 8,
"features": [
"article_scraping",
"batch_processing",
"audio_generation",
"ai_synthesis",
"progress_notifications",
"mcp_logging"
]
},
"supported_domains": [
"medium.com",
"towardsdatascience.com",
"levelup.gitconnected.com",
"betterprogramming.pub",
"javascript.plainenglish.io",
"35+ total domains"
],
"tts_providers": ["elevenlabs", "edge-tts", "openai"],
"ai_providers": ["groq", "gemini", "openai"]
}, ensure_ascii=False)
# ============================================================================
# TOOLS - Core Scraping
# ============================================================================
@mcp.tool(annotations={
"title": "Scrape Medium Article",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": True
})
async def medium_scrape(
url: str,
output_format: str = "both",
force_refresh: bool = False,
enable_enhancements: bool = False,
ctx: Context = None
) -> Dict[str, Any]:
"""
Scrape a Medium article with full v3.0 capabilities.
Args:
url: Medium article URL (supports 35+ domains including towardsdatascience.com)
output_format: "markdown", "html", or "both" (default: both)
force_refresh: Bypass cache and re-scrape (default: false)
enable_enhancements: Enable KG extraction, embeddings (adds ~15s, default: false)
Returns:
Article with title, author, content, tags, and metadata
"""
app = get_app_context(ctx)
article = await app.scraper.scrape_article(
url,
force_refresh=force_refresh,
enable_enhancements=enable_enhancements
)
if not article or article.get("error"):
return article or {"error": "Failed to scrape article", "url": url}
# Add HTML if requested
if output_format in ["html", "both"]:
try:
article["htmlContent"] = render_article_html(article)
except Exception as e:
article["htmlContent"] = f"<p>Error rendering HTML: {e}</p>"
# Remove markdown if only HTML requested
if output_format == "html":
article.pop("markdownContent", None)
return handle_paywall(article)
@mcp.tool(annotations={
"title": "Batch Scrape Articles",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": True
})
async def medium_batch(
urls: List[str],
max_concurrency: int = 5,
output_format: str = "both",
ctx: Context = None
) -> Dict[str, Any]:
"""
Scrape multiple Medium articles in parallel.
Args:
urls: List of Medium article URLs (max 20)
max_concurrency: Number of parallel workers (1-10, default: 5)
output_format: Output format for all articles (default: both)
Returns:
{success: [...], failed: [...], stats: {total, success, failed}}
"""
app = get_app_context(ctx)
if len(urls) > app.config.max_batch_size:
return {"error": f"Maximum batch size is {app.config.max_batch_size} URLs"}
max_concurrency = min(max(1, max_concurrency), 10)
semaphore = asyncio.Semaphore(max_concurrency)
success = []
failed = []
async def scrape_one(url: str, index: int):
async with semaphore:
try:
article = await app.scraper.scrape_article(url)
if article and not article.get("error"):
if output_format in ["html", "both"]:
article["htmlContent"] = render_article_html(article)
if output_format == "html":
article.pop("markdownContent", None)
success.append(article)
else:
failed.append({"url": url, "error": article.get("error", "Unknown error")})
except Exception as e:
failed.append({"url": url, "error": str(e)})
finally:
# Report progress after each URL is processed
if ctx:
await ctx.report_progress(
progress=len(success) + len(failed),
total=len(urls)
)
await asyncio.gather(*[scrape_one(url, i) for i, url in enumerate(urls)])
return {
"success": success,
"failed": failed,
"stats": {
"total": len(urls),
"success": len(success),
"failed": len(failed)
}
}
@mcp.tool(annotations={
"title": "Search Medium",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": False
})
async def medium_search(query: str, max_articles: int = 10, ctx: Context = None) -> List[Dict[str, Any]]:
"""
Search Medium for articles.
Args:
query: Search query (e.g., "AI Agents", "Python Asyncio")
max_articles: Maximum articles to return (default: 10)
Returns:
List of article previews with title, url, author
"""
app = get_app_context(ctx)
results = await app.scraper.scrape_search(query, max_articles=max_articles)
return results
@mcp.tool(annotations={
"title": "Get Fresh Articles by Tag",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": False
})
async def medium_fresh(tag: str, max_articles: int = 10, ctx: Context = None) -> List[Dict[str, Any]]:
"""
Get the latest articles for a specific tag.
Args:
tag: Topic tag (e.g., "artificial-intelligence", "python")
max_articles: Maximum articles to return (default: 10)
Returns:
List of article previews
"""
app = get_app_context(ctx)
results = await app.scraper.scrape_tag(tag, max_articles=max_articles)
return results
@mcp.tool(annotations={
"title": "Render Article as HTML",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": True
})
async def medium_render_html(url: str, standalone: bool = False, ctx: Context = None) -> str:
"""
Render a Medium article as beautiful HTML.
Args:
url: Medium article URL
standalone: If True, returns complete HTML page with <html>, <head>, etc.
Returns:
HTML string with Tailwind CSS styling
"""
app = get_app_context(ctx)
article = await app.scraper.scrape_article(url)
if not article or article.get("error"):
return f"<div class='error'>Failed to scrape: {article.get('error', 'Unknown')}</div>"
if standalone:
return render_full_page(article)
else:
return render_article_html(article)
@mcp.tool(annotations={
"title": "Export Article",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": True
})
async def medium_export(
url: str,
format: str = "markdown",
ctx: Context = None
) -> Dict[str, Any]:
"""
Export a Medium article to various formats.
Args:
url: Medium article URL
format: Export format - "markdown", "html", "json"
Returns:
{content: ..., format: ..., title: ...}
"""
app = get_app_context(ctx)
article = await app.scraper.scrape_article(url)
if not article or article.get("error"):
return {"error": article.get("error", "Failed to scrape")}
title = article.get("title", "article")
if format == "markdown":
return {
"content": article.get("markdownContent", ""),
"format": "markdown",
"title": title
}
elif format == "html":
html = render_full_page(article)
return {
"content": html,
"format": "html",
"title": title
}
elif format == "json":
return {
"content": article,
"format": "json",
"title": title
}
else:
return {"error": f"Unsupported format: {format}. Use: markdown, html, json"}
# ============================================================================
# TOOLS - Audio (ElevenLabs)
# ============================================================================
@mcp.tool(annotations={
"title": "Generate Audio Podcast",
"readOnlyHint": False,
"destructiveHint": False,
"openWorldHint": True,
"idempotentHint": False
})
async def medium_cast(
url: str,
voice: str = "george",
model: str = "eleven_multilingual_v2",
quality: str = "premium",
summarize: str = "auto",
max_chars: int = 250,
ctx: Context = None
) -> Dict[str, Any]:
"""
Convert a Medium article into premium audio podcast using ElevenLabs.
Args:
url: Medium article URL
voice: Voice name or ID. Popular: "george" (British), "adam" (American),
"rachel" (calm female), "brian" (narrator), "alice" (British female)
model: TTS model - "eleven_multilingual_v2" (10k chars, recommended),
"eleven_flash_v2_5" (40k, fastest), "eleven_turbo_v2_5" (40k, balanced)
quality: "standard", "high", or "premium" (Creator tier)
summarize: "auto" (summarize if > max_chars), "always", or "none"
max_chars: Target character limit for summarization (default: 250)
Returns:
{audio_path, title, voice, model, duration_estimate, provider}
"""
app = get_app_context(ctx)
# Scrape article (Phase 1)
if ctx:
await ctx.report_progress(progress=1, total=3)
article = await app.scraper.scrape_article(url)
if not article or not article.get("markdownContent"):
return {"error": "Failed to scrape article or no content", "url": url}
text = article["markdownContent"]
title = article.get("title", "article")
original_length = len(text)
# Summarization logic
should_summarize = (
summarize == "always" or
(summarize == "auto" and len(text) > max_chars)
)
if should_summarize and summarize != "none":
groq_key = os.environ.get("GROQ_API_KEY")
gemini_key = os.environ.get("GEMINI_API_KEY")
summarize_success = False
prompt = f"""You are creating a quick audio summary for busy professionals. In EXACTLY {max_chars} characters or less, give the ONE most valuable insight or actionable takeaway from this article.
Format: Start with the key insight, then briefly explain why it matters.
Style: Conversational, engaging, like a smart friend sharing a tip.
Goal: The listener should feel they learned something useful in 15 seconds.
Article Title: "{title}"
Article Content:
{text[:8000]}
Your {max_chars}-character summary (make every word count):"""
# Try Groq first (PRIMARY - fastest)
if groq_key and not summarize_success:
try:
client = Groq(api_key=groq_key)
response = client.chat.completions.create(
model="llama-3.1-8b-instant", # Fast model for summarization
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
temperature=0.7
)
text = response.choices[0].message.content.strip()[:max_chars]
summarize_success = True
if ctx:
await ctx.info(f"Summarized with Groq: {original_length} -> {len(text)} chars")
except Exception as e:
if ctx:
await ctx.warning(f"Groq failed: {e}, trying Gemini...")
# Fallback to Gemini (BACKUP) - Using new google.genai SDK
if gemini_key and not summarize_success:
try:
from google import genai
client = genai.Client(api_key=gemini_key)
response = client.models.generate_content(
model='gemini-2.0-flash-exp',
contents=prompt
)
text = response.text.strip()[:max_chars]
summarize_success = True
if ctx:
await ctx.info(f"Summarized with Gemini: {original_length} -> {len(text)} chars")
except Exception as e:
if ctx:
await ctx.warning(f"Gemini also failed: {e}, using truncation")
# Final fallback: truncation
if not summarize_success:
text = text[:max_chars]
else:
# Just truncate to model limit
text = truncate_for_model(text, model)
# Resolve voice ID
voice_id = get_voice_id(voice)
# Output format
output_format = ELEVENLABS_OUTPUT_FORMATS.get(quality, "mp3_44100_192")
# Output path
outputs_dir = app.config.audio_output_dir
os.makedirs(outputs_dir, exist_ok=True)
safe_title = "".join(c if c.isalnum() else "_" for c in title)[:40]
output_path = os.path.join(outputs_dir, f"{safe_title}_{voice}_{uuid.uuid4().hex[:6]}.mp3")
# Try ElevenLabs (PRIMARY)
elevenlabs_key = os.environ.get("ELEVENLABS_API_KEY")
if elevenlabs_key:
try:
from elevenlabs.client import ElevenLabs
def _generate_audio():
client = ElevenLabs(api_key=elevenlabs_key)
audio = client.text_to_speech.convert(
text=text,
voice_id=voice_id,
model_id=model,
output_format=output_format,
)
with open(output_path, "wb") as f:
for chunk in audio:
f.write(chunk)
await asyncio.to_thread(_generate_audio)
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
return {
"audio_path": os.path.abspath(output_path),
"title": title,
"voice": voice,
"voice_id": voice_id,
"model": model,
"quality": quality,
"duration_estimate": f"{len(text) // 150} min",
"characters_used": len(text),
"provider": "elevenlabs"
}
except Exception as e:
if ctx:
await ctx.warning(f"ElevenLabs failed: {e}, trying fallback...")
# Fallback: Edge-TTS (Free)
try:
import edge_tts
text_truncated = text[:4000] # Edge-TTS limit
communicate = edge_tts.Communicate(text_truncated, "en-US-ChristopherNeural")
await communicate.save(output_path)
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
return {
"audio_path": os.path.abspath(output_path),
"title": title,
"voice": "en-US-ChristopherNeural",
"duration_estimate": f"{len(text_truncated) // 150} min",
"characters_used": len(text_truncated),
"provider": "edge-tts",
"note": "Free fallback, limited to 4000 chars"
}
except Exception as e:
if ctx:
await ctx.warning(f"Edge-TTS failed: {e}")
# Fallback: OpenAI TTS
openai_key = os.environ.get("OPENAI_API_KEY")
if openai_key:
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=openai_key)
response = await client.audio.speech.create(
model="tts-1-hd" if quality == "premium" else "tts-1",
voice="onyx",
input=text[:4096]
)
response.stream_to_file(output_path)
return {
"audio_path": os.path.abspath(output_path),
"title": title,
"voice": "onyx",
"duration_estimate": f"{min(len(text), 4096) // 150} min",
"provider": "openai"
}
except Exception as e:
print(f"[WARN] OpenAI TTS failed: {e}")
return {"error": "All TTS providers failed", "url": url}
@mcp.tool(annotations={
"title": "List Available Voices",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": True
})
async def medium_voices(ctx: Context = None) -> Dict[str, Any]:
"""
List available ElevenLabs voices for medium_cast.
Returns:
Voice categories, recommendations, and model info
"""
app = get_app_context(ctx)
# Try to fetch live voices
live_voices = []
if app.elevenlabs:
try:
result = await app.elevenlabs.voices.search()
live_voices = [
{"name": v.name, "id": v.voice_id, "category": getattr(v, 'category', 'unknown')}
for v in result.voices[:20]
]
except Exception:
pass
return {
"recommended": {
"george": {"id": "JBFqnCBsd6RMkjVDRZzb", "desc": "British, warm narrator (DEFAULT)"},
"adam": {"id": "pNInz6obpgDQGcFmaJgB", "desc": "American, deep narrator"},
"rachel": {"id": "21m00Tcm4TlvDq8ikWAM", "desc": "American, calm female"},
"brian": {"id": "nPczCjzI2devNBz1zQrb", "desc": "American, narrator"},
"alice": {"id": "Xb7hH8MSUJpSbSDYk0k2", "desc": "British, confident female"},
},
"categories": VOICE_CATEGORIES,
"models": {
"eleven_multilingual_v2": "Recommended, 10k chars, 29 languages",
"eleven_flash_v2_5": "Fastest (~75ms), 40k chars, 32 languages",
"eleven_turbo_v2_5": "Balanced, 40k chars, 32 languages",
"eleven_v3": "Most expressive, 5k chars, 70+ languages",
},
"quality_options": ELEVENLABS_OUTPUT_FORMATS,
"live_voices": live_voices,
"total_premade_voices": len(ELEVENLABS_VOICES),
}
# ============================================================================
# TOOLS - Synthesis
# ============================================================================
@mcp.tool(annotations={
"title": "Synthesize Research Report",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": False
})
async def medium_synthesize(topic: str, max_articles: int = 5, ctx: Context = None) -> str:
"""
Synthesize a 'State of the Union' report on a topic using top Medium articles.
Args:
topic: Topic to analyze (e.g., "Generative AI", "Web Development 2024")
max_articles: Number of articles to analyze (default: 5)
Returns:
Synthesized research report
"""
app = get_app_context(ctx)
groq_key = os.environ.get("GROQ_API_KEY")
gemini_key = os.environ.get("GEMINI_API_KEY")
openai_key = os.environ.get("OPENAI_API_KEY")
if not groq_key and not gemini_key and not openai_key:
return "Error: No AI API keys set (GROQ_API_KEY, GEMINI_API_KEY, or OPENAI_API_KEY)."
# Scrape articles
if ctx:
await ctx.report_progress(progress=1, total=3) # Phase 1: Search
articles = await app.scraper.scrape_search(topic, max_articles=max_articles)
if not articles:
return "No articles found to synthesize."
# Prepare context
async def get_article_content(art):
url = art.get('url')
title = art.get('title', 'Untitled')
author = art.get('author', {}).get('name') if isinstance(art.get('author'), dict) else art.get('author', 'Unknown')
try:
full_art = await app.scraper.scrape_article(url)
content = full_art.get("markdownContent", "")[:2000]
except:
content = f"(Content unavailable)"
return f"\nTitle: {title}\nAuthor: {author}\nURL: {url}\nContent:\n{content}\n"
results = await asyncio.gather(*[get_article_content(art) for art in articles])
context_text = "".join(results)
if ctx:
await ctx.report_progress(progress=2, total=3) # Phase 2: Scraped articles
prompt = f"""You are a tech analyst. Synthesize the following Medium articles into a 'State of the Union' report.
Topic: {topic}
Structure your report:
1. Executive Summary (2-3 sentences)
2. Key Trends
3. Notable Insights
4. Contrarian Views (if any)
5. Recommended Reading
Articles:
{context_text}
"""
# Try Groq first (PRIMARY - fastest)
if groq_key:
try:
client = Groq(api_key=groq_key)
response = client.chat.completions.create(
model="llama-3.3-70b-versatile", # Best model for synthesis
messages=[{"role": "user", "content": prompt}],
max_tokens=2000,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
if ctx:
await ctx.warning(f"Groq failed: {e}")
# Fallback to Gemini - Using new google.genai SDK
if gemini_key:
try:
from google import genai
client = genai.Client(api_key=gemini_key)
response = client.models.generate_content(
model='gemini-2.0-flash-exp',
contents=prompt
)
return response.text
except Exception as e:
if ctx:
await ctx.warning(f"Gemini failed: {e}")
# Fallback to OpenAI
if openai_key:
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=openai_key)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
return f"Error: All providers failed. Last error: {e}"
return "Error: No AI service available."
# ============================================================================
# TOOLS - Utility
# ============================================================================
@mcp.tool(annotations={
"title": "Fetch Image Thumbnail",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": True
})
async def get_thumbnail(image_url: str) -> Image:
"""
Fetch an image from a URL and return it as an MCP Image.
Args:
image_url: The URL of the image to fetch
Returns:
Image object for display
"""
async with httpx.AsyncClient() as client:
response = await client.get(image_url)
response.raise_for_status()
return Image(data=response.content, format="png")
@mcp.tool(annotations={
"title": "Find Related Articles",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": False
})
async def medium_related(
url: str,
max_articles: int = 5,
ctx: Context = None
) -> List[Dict[str, Any]]:
"""
Find articles related to a given Medium article.
Args:
url: URL of the source article
max_articles: Maximum related articles to return (default: 5)
Returns:
List of related articles with similarity scores
"""
app = get_app_context(ctx)
# Scrape the source article to get its tags and topic
article = await app.scraper.scrape_article(url)
if not article or article.get("error"):
return [{"error": "Failed to scrape source article"}]
# Get tags from the article
tags = article.get("tags", [])
if not tags:
# Try to infer from title
title = article.get("title", "")
tags = [title.split()[0]] if title else ["technology"]
# Search for related articles using the first tag
primary_tag = tags[0] if isinstance(tags, list) and tags else "technology"
related = await app.scraper.scrape_tag(primary_tag, max_articles=max_articles + 2)
# Filter out the source article
source_url = url.rstrip("/")
related = [r for r in related if r.get("url", "").rstrip("/") != source_url][:max_articles]
return [{
"title": r.get("title"),
"url": r.get("url"),
"author": r.get("author", {}).get("name") if isinstance(r.get("author"), dict) else r.get("author"),
"readingTime": r.get("readingTime"),
"relevance": "tag_match"
} for r in related]
@mcp.tool(annotations={
"title": "Get Personalized Recommendations",
"readOnlyHint": True,
"openWorldHint": True,
"idempotentHint": False
})
async def medium_recommend(
interests: List[str],
reading_time: int = 30,
ctx: Context = None
) -> Dict[str, Any]:
"""
Get personalized article recommendations based on interests.
Args:
interests: List of topics you're interested in (e.g., ["AI", "Python", "startups"])
reading_time: Target total reading time in minutes (default: 30)
Returns:
Curated reading list with estimated total time
"""
app = get_app_context(ctx)
all_articles = []
for interest in interests[:3]: # Limit to 3 interests
articles = await app.scraper.scrape_search(interest, max_articles=5)
for art in articles:
art["interest"] = interest
all_articles.extend(articles)
# Deduplicate by URL
seen_urls = set()
unique_articles = []
for art in all_articles:
url = art.get("url", "")
if url not in seen_urls:
seen_urls.add(url)
unique_articles.append(art)
# Estimate reading times and filter
reading_list = []
total_time = 0
for art in unique_articles:
est_time = art.get("readingTime", 5)
if isinstance(est_time, str):
est_time = int(est_time.split()[0]) if est_time.split()[0].isdigit() else 5
if total_time + est_time <= reading_time:
reading_list.append({
"title": art.get("title"),
"url": art.get("url"),
"author": art.get("author", {}).get("name") if isinstance(art.get("author"), dict) else art.get("author"),
"readingTime": est_time,
"interest": art.get("interest")
})
total_time += est_time
return {
"reading_list": reading_list,
"total_articles": len(reading_list),
"total_reading_time": total_time,
"interests_covered": list(set(a.get("interest") for a in reading_list))
}
# ============================================================================
# PROMPTS
# ============================================================================
@mcp.prompt()
def summarize_article(url: str) -> str:
"""Create a prompt to summarize a Medium article."""
return f"""Read and summarize this Medium article: {url}
Structure your summary:
1. **Main Thesis**: One sentence summary
2. **Key Points**: 3-5 bullet points
3. **Novel Insights**: What's new or surprising
4. **Actionable Takeaways**: What can the reader do"""
@mcp.prompt()
def tweet_thread(url: str) -> str:
"""Create a prompt to turn an article into a Twitter thread."""
return f"""Convert this article into a viral Twitter thread: {url}
Guidelines:
- 5-7 tweets maximum
- First tweet must be a hook
- Use emojis strategically (not excessively)
- End with a call to action
- Include relevant hashtags in final tweet"""
@mcp.prompt()
def linkedin_post(url: str) -> str:
"""Create a prompt to turn an article into a LinkedIn post."""
return f"""Transform this article into an engaging LinkedIn post: {url}
Guidelines:
- Start with a hook (question or bold statement)
- Keep it under 1300 characters
- Use line breaks for readability
- Include 3-5 relevant hashtags at the end
- End with a question to drive engagement"""
@mcp.prompt()
def newsletter(topic: str, article_count: int = 5) -> str:
"""Create a prompt for a newsletter digest on a topic."""
return f"""Create a newsletter digest on "{topic}" using the top {article_count} Medium articles.
Format:
- Catchy subject line
- Brief intro paragraph (2-3 sentences)
- For each article:
• Title with brief summary
• Why it matters
- Closing with call to action"""
@mcp.prompt()
def research_report(topic: str) -> str:
"""Create a prompt for a comprehensive research report."""
return f"""Create a comprehensive research report on "{topic}" using Medium articles.
Structure:
1. **Executive Summary** (2-3 sentences)
2. **Current Trends** (What's hot in this space)
3. **Key Players** (Who's writing about this)
4. **Diverse Perspectives** (Different viewpoints)
5. **Future Outlook** (Predictions)
6. **Recommended Reading** (Top 3 articles with links)"""
@mcp.prompt()
def code_tutorial(url: str) -> str:
"""Create a prompt to extract a code tutorial from an article."""
return f"""Extract and structure the code tutorial from this article: {url}
Format:
1. **Prerequisites**: What you need installed/configured
2. **Step-by-Step**:
- Step 1: [Description + Code]
- Step 2: [Description + Code]
- ...
3. **Complete Code**: Full working example
4. **Common Issues**: Troubleshooting tips"""
@mcp.prompt()
def analyze_trending(focus: str = "technology") -> str:
"""Create a prompt to analyze trending Medium articles with a specific focus."""
return f"""Analyze the current trending articles on Medium with a focus on "{focus}".
WORKFLOW:
1. First, use the `medium://trending` resource to get current trending articles
2. Select 3-5 articles most relevant to "{focus}"
3. Use `medium_scrape()` on each selected article
ANALYSIS STRUCTURE:
1. **Trend Overview**: What themes are dominating?
2. **Key Insights**: Most valuable takeaways from each article
3. **Emerging Patterns**: What's changing in this space?
4. **Contrarian Views**: Any articles going against the grain?
5. **Recommendations**: Top 2-3 must-reads with reasons
Focus area: {focus}
Be specific and cite the articles you analyze."""
@mcp.prompt()
def deep_research(topic: str, depth: str = "comprehensive") -> str:
"""Create a structured multi-step research workflow prompt."""
return f"""Conduct a {depth} research analysis on "{topic}" using Medium articles.
PHASE 1 - DISCOVERY:
1. Use `medium_search("{topic}")` to find relevant articles
2. Use `medium_fresh("{topic.replace(' ', '-')}")` for latest content
3. Note the top 5 most relevant articles
PHASE 2 - DEEP ANALYSIS:
4. Use `medium_scrape()` on each selected article
5. Extract: main arguments, evidence, unique perspectives
6. Note any contradictions between articles
PHASE 3 - SYNTHESIS:
7. Use `medium_synthesize("{topic}")` for AI-powered summary
8. Cross-reference with your own analysis
9. Identify gaps in coverage
OUTPUT FORMAT:
# Research Report: {topic}
## Executive Summary
[2-3 sentences]
## Key Findings
[Bullet points with citations]
## Diverse Perspectives
[Different viewpoints from articles]
## Emerging Trends
[What's changing?]
## Knowledge Gaps
[What's missing from the discourse?]
## Recommended Reading
[Top 3 articles with reasons]
## Sources
[Full list of analyzed articles]
Research depth: {depth}
Topic: {topic}"""
@mcp.prompt()
def content_repurpose(url: str, platforms: str = "all") -> str:
"""Create a prompt to repurpose an article for multiple platforms."""
return f"""Repurpose this Medium article for multiple content platforms: {url}
TARGET PLATFORMS: {platforms}
First, scrape the article using `medium_scrape("{url}")`.
Then create content for each platform:
## Twitter/X Thread
- 5-7 tweets
- Hook first, value in middle, CTA at end
- Include relevant emojis
## LinkedIn Post
- Professional tone
- 1000-1300 characters
- Include a question for engagement
## Newsletter Blurb
- 2-3 paragraphs
- Highlight key insights
- Clear call-to-action
## YouTube Script Outline
- Hook (30 sec)
- Main points (3-5 min)
- Conclusion + CTA (1 min)
## Instagram Carousel
- 7-10 slides
- One key point per slide
- Visual descriptions
Ensure each format maintains the core message while optimizing for the platform's unique characteristics."""
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
import sys
# Check for HTTP transport flag
if "--http" in sys.argv or "-h" in sys.argv:
# Get port from args or use default
port = 8000
for i, arg in enumerate(sys.argv):
if arg in ("--port", "-p") and i + 1 < len(sys.argv):
port = int(sys.argv[i + 1])
print(f"[INFO] Starting Medium MCP Server v3 in HTTP mode on port {port}")
print(f"[INFO] Connect via: http://127.0.0.1:{port}/mcp")
# Run with HTTP transport
mcp.run(transport="sse", host="127.0.0.1", port=port)
else:
# Default: stdio transport for Claude Desktop
mcp.run()