Medium-MCP / src /main.py
Nikhil Pravin Pise
Initial deploy
a80eeb8
from bs4 import BeautifulSoup
from crawlee.crawlers import PlaywrightCrawler, PlaywrightCrawlingContext
from crawlee import Request
from apify import Actor
import asyncio
from datetime import timedelta
from src.config import ActorInput
from src.parser import extract_search_results, extract_article_content
from src.utils import block_resources
from src.state import StateManager
async def main():
await Actor.init()
try:
# Load and validate input
actor_input = await ActorInput.load()
Actor.log.info(f"Loaded Input: {actor_input}")
# Initialize State Manager
state_manager = None
if actor_input.enable_deduplication:
state_manager = StateManager()
await state_manager.load_state()
else:
Actor.log.info("Deduplication disabled. Scraping all articles.")
# Build start URLs
start_urls = []
if actor_input.tag:
# Tag-based scraping (Freshness)
tag_slug = actor_input.tag.lower().replace(" ", "-")
start_urls.append(f"https://medium.com/tag/{tag_slug}/latest")
Actor.log.info(f"Targeting Tag: {actor_input.tag} (Latest)")
elif actor_input.search_query:
# Search-based scraping
q = actor_input.search_query.replace(" ", "+")
start_urls.append(f"https://medium.com/search?q={q}")
# Add explicit start URLs if provided
for u in actor_input.start_urls:
if u.get("url") and "medium.com" in u["url"]:
start_urls.append(u["url"])
if not start_urls:
Actor.log.info("No search query or valid start URLs provided. Exiting.")
await Actor.exit()
return
# Create proxy configuration
proxy_config = None
if actor_input.proxy_configuration:
proxy_config = await Actor.create_proxy_configuration(
actor_proxy_input=actor_input.proxy_configuration
)
crawler = PlaywrightCrawler(
proxy_configuration=proxy_config,
max_requests_per_crawl=actor_input.max_requests_per_crawl,
max_request_retries=actor_input.max_request_retries,
request_handler_timeout=timedelta(seconds=60),
)
@crawler.router.default_handler
async def handler(context: PlaywrightCrawlingContext):
url = context.request.url
Actor.log.info(f"Processing: {url}")
# Enable resource blocking
await context.page.route("**/*", block_resources)
# Wait for content
try:
Actor.log.info("Waiting for selectors...")
await context.page.wait_for_load_state("domcontentloaded")
await context.page.wait_for_selector("article, .postArticle, .js-block", timeout=10000)
Actor.log.info("Selectors found.")
except Exception as e:
Actor.log.warning(f"Timeout waiting for selectors on {url}: {e}")
# Parse Content
html = await context.page.content()
soup = BeautifulSoup(html, "html.parser")
# --- Router Logic ---
# 1. Article Page (Deep Scraping)
if context.request.label == "ARTICLE" or "/@/" in url or "-{12}" in url:
Actor.log.info(f"Scraping Article Content: {url}")
user_data = context.request.user_data
if not isinstance(user_data, dict):
user_data = {}
try:
loop = asyncio.get_running_loop()
content_data = await loop.run_in_executor(None, extract_article_content, soup)
Actor.log.info(f"Extracted content keys: {list(content_data.keys())}")
if content_data.get("markdownContent"):
Actor.log.info(f"Markdown length: {len(content_data['markdownContent'])}")
else:
Actor.log.warning("No markdown content extracted.")
except Exception as e:
Actor.log.error(f"Error extracting content: {e}")
content_data = {}
# Merge metadata
final_data = user_data.copy()
final_data.update({
"url": url,
"title": final_data.get("title") or (soup.title.string if soup.title else None),
**content_data
})
await context.push_data(final_data)
# 2. Search Page or Tag Page
elif "medium.com/search" in url or "/tag/" in url:
Actor.log.info(f"Scraping Listing Page: {url}")
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(None, extract_search_results, soup, url)
Actor.log.info(f"Found {len(results)} articles.")
pushed = 0
for rec in results:
if pushed >= actor_input.max_articles:
break
full_url = rec["url"]
# Deduplication Check
if state_manager and state_manager.is_seen(full_url):
Actor.log.info(f"Skipping seen URL: {full_url}")
continue
# Add to state
if state_manager:
state_manager.add_seen(full_url)
if actor_input.scrape_full_content:
# Enqueue for deep scraping
await context.add_requests([Request.from_url(
url=full_url,
label="ARTICLE",
user_data={
"title": rec.get("title"),
"author": rec.get("author"),
"publishingDate": rec.get("publishingDate"),
"readingTime": rec.get("readingTime"),
"search_query": actor_input.search_query
}
)])
else:
# Fast mode
await context.push_data(rec)
pushed += 1
# Push search page summary
await context.push_data({
"type": "search_page",
"url": url,
"enqueued": pushed
})
Actor.log.info(f"Starting crawler with URLs: {start_urls}")
await crawler.run(start_urls)
except Exception as e:
Actor.log.error(f"Crawler failed: {e}")
raise
finally:
if state_manager:
await state_manager.save_state()
await Actor.exit()
if __name__ == "__main__":
asyncio.run(main())