Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| import praw | |
| import re | |
| import os | |
| from typing import Optional | |
| mcp = FastMCP("Reddit") | |
| # Initialize Reddit client once with env vars | |
| def get_reddit_client(): | |
| client_id = os.getenv('REDDIT_CLIENT_ID') | |
| client_secret = os.getenv('REDDIT_CLIENT_SECRET') | |
| user_agent = os.getenv('REDDIT_USER_AGENT', 'MCP Reddit Tool v1.0') | |
| print(client_id,client_secret,user_agent) | |
| if not client_id or not client_secret: | |
| raise ValueError("Missing REDDIT_CLIENT_ID or REDDIT_CLIENT_SECRET environment variables") | |
| return praw.Reddit( | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| user_agent=user_agent, | |
| ) | |
| def get_reddit_thread( | |
| url: str, | |
| max_comment_depth: Optional[int] = None | |
| ) -> str: | |
| """Get full Reddit thread as formatted text. | |
| Args: | |
| url: Reddit post URL | |
| max_comment_depth: Max depth of comments to include (None=all, 0=no comments) | |
| Returns: | |
| Formatted text of Reddit post and comments | |
| """ | |
| try: | |
| reddit = get_reddit_client() | |
| # Get the submission | |
| submission = reddit.submission(url=url) | |
| # Build post header | |
| author = submission.author.name if submission.author else "[deleted]" | |
| selftext = re.sub(r'\n+', ' ', submission.selftext) if submission.selftext else "" | |
| output = [ | |
| f"Title: {submission.title}", | |
| f"Author: {author}", | |
| f"Score: {submission.score} points", | |
| f"Comments: {submission.num_comments}", | |
| ] | |
| if selftext: | |
| output.append(f"\nPost text: {selftext}") | |
| # Add comments if requested | |
| if max_comment_depth != 0: | |
| output.append("\n\n--- COMMENTS ---") | |
| # Fetch all comments | |
| submission.comments.replace_more(limit=None) | |
| # Process comments recursively | |
| def process_comments(comments, depth=0): | |
| result = [] | |
| if max_comment_depth is not None and depth >= max_comment_depth: | |
| return result | |
| for comment in comments: | |
| if isinstance(comment, praw.models.MoreComments): | |
| continue | |
| # Format comment with pipe indentation | |
| indent = "| " * depth | |
| author = comment.author.name if comment.author else "[deleted]" | |
| body = re.sub(r'\n+', ' ', comment.body) | |
| score = comment.score | |
| result.append(f"{indent}{author} ({score} points): {body}") | |
| # Add replies | |
| if comment.replies: | |
| result.extend(process_comments(comment.replies, depth + 1)) | |
| return result | |
| comment_lines = process_comments(submission.comments) | |
| output.extend(comment_lines) | |
| return "\n".join(output) | |
| except ValueError as e: | |
| return f"β Configuration error: {str(e)}" | |
| except praw.exceptions.InvalidURL: | |
| return "β Invalid Reddit URL" | |
| except praw.exceptions.NotFound: | |
| return "β Post not found" | |
| except praw.exceptions.Forbidden: | |
| return "β Access forbidden - check your credentials" | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| if __name__ == "__main__": | |
| mcp.run() |