Spaces:
Running
Running
| """ | |
| NeuraPrompt Agent β GitHub MCP Connector (v1.1 - Fixed Auth & Error Handling) | |
| OAuth + Per-User Token Storage + GitHub API Tools | |
| """ | |
| import os | |
| import json | |
| import time | |
| import logging | |
| import asyncio | |
| from typing import Optional, Dict, Any, List | |
| from datetime import datetime, timedelta | |
| from urllib.parse import urlencode | |
| import requests | |
| from fastapi import APIRouter, HTTPException, Request, Query | |
| from fastapi.responses import RedirectResponse, JSONResponse | |
| from pydantic import BaseModel | |
| from pymongo import MongoClient | |
| from pymongo.server_api import ServerApi | |
| import gridfs | |
| from .registry import register_tool, get_tool | |
| log = logging.getLogger("agent.tools.github") | |
| # ==================== CONFIG ==================== | |
| GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "") | |
| GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "") | |
| GITHUB_REDIRECT_URI = os.getenv("GITHUB_REDIRECT_URI", "") | |
| # MongoDB (using your existing setup) | |
| MONGO_URI = os.getenv("MONGO_URI", "") | |
| mongo_client = MongoClient( | |
| MONGO_URI, ssl=True, | |
| tlsAllowInvalidCertificates=False, | |
| tlsCAFile="/etc/ssl/certs/ca-certificates.crt", | |
| server_api=ServerApi("1"), | |
| ) | |
| neuraprompt_db = mongo_client["neuraprompt"] | |
| github_connections_col = neuraprompt_db["github_connections"] # NEW collection | |
| # ==================== GITHUB OAUTH FLOW ==================== | |
| github_router = APIRouter(prefix="/github") | |
| async def github_connect(user_id: str = Query(...)): | |
| """ | |
| Step 1: Redirect user to GitHub OAuth login. | |
| Frontend calls: GET /api/github/connect?user_id=<uid> | |
| """ | |
| if not GITHUB_CLIENT_ID: | |
| raise HTTPException(400, "GitHub OAuth not configured") | |
| # Store state in DB to prevent CSRF | |
| state = os.urandom(16).hex() | |
| github_connections_col.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "state": state, | |
| "status": "pending", | |
| "created_at": datetime.utcnow() | |
| }}, | |
| upsert=True | |
| ) | |
| params = { | |
| "client_id": GITHUB_CLIENT_ID, | |
| "redirect_uri": GITHUB_REDIRECT_URI, | |
| "scope": "repo user read:org delete_repo workflow", | |
| "state": state, | |
| "allow_signup": "true" | |
| } | |
| auth_url = f"https://github.com/login/oauth/authorize?{urlencode(params)}" | |
| return RedirectResponse(auth_url) | |
| async def github_callback(code: str = Query(...), state: str = Query(...)): | |
| """ | |
| Step 2: GitHub redirects back here after user authorizes. | |
| Exchanges code for access token, saves to DB. | |
| """ | |
| # Find user by state | |
| conn = github_connections_col.find_one({"state": state}) | |
| if not conn: | |
| raise HTTPException(400, "Invalid or expired state") | |
| user_id = conn["user_id"] | |
| # Exchange code for token | |
| token_response = requests.post( | |
| "https://github.com/login/oauth/access_token", | |
| headers={"Accept": "application/json"}, | |
| data={ | |
| "client_id": GITHUB_CLIENT_ID, | |
| "client_secret": GITHUB_CLIENT_SECRET, | |
| "code": code, | |
| "redirect_uri": GITHUB_REDIRECT_URI, | |
| "state": state | |
| }, | |
| timeout=30 | |
| ) | |
| token_data = token_response.json() | |
| if "error" in token_data: | |
| github_connections_col.update_one( | |
| {"user_id": user_id}, | |
| {"$set": {"status": "failed", "error": token_data.get("error_description", "Unknown error")}} | |
| ) | |
| raise HTTPException(400, f"GitHub OAuth error: {token_data['error']}") | |
| access_token = token_data.get("access_token") | |
| token_type = token_data.get("token_type", "bearer") | |
| scope = token_data.get("scope", "") | |
| refresh_token = token_data.get("refresh_token") # FIX: Store refresh token for future use | |
| # Get user info from GitHub | |
| user_info = _github_api_get("/user", access_token) | |
| # Save everything | |
| github_connections_col.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "status": "connected", | |
| "access_token": access_token, | |
| "token_type": token_type, | |
| "scope": scope, | |
| "refresh_token": refresh_token, # FIX: Store refresh token | |
| "github_login": user_info.get("login"), | |
| "github_id": user_info.get("id"), | |
| "github_avatar": user_info.get("avatar_url"), | |
| "connected_at": datetime.utcnow(), | |
| "last_used": datetime.utcnow(), | |
| "state": None # Clear state | |
| }}, | |
| upsert=True | |
| ) | |
| # Redirect back to frontend with success | |
| frontend_url = os.getenv("FRONTEND_URL", "https://neura-prompt-ai.vercel.app") | |
| return RedirectResponse(f"{frontend_url}/settings?github=connected&login={user_info.get('login')}") | |
| async def github_status(user_id: str = Query(...)): | |
| """Check if user has connected GitHub.""" | |
| conn = github_connections_col.find_one( | |
| {"user_id": user_id}, | |
| {"access_token": 0} # Don't return token | |
| ) | |
| if not conn or conn.get("status") != "connected": | |
| return {"connected": False} | |
| return { | |
| "connected": True, | |
| "login": conn.get("github_login"), | |
| "avatar": conn.get("github_avatar"), | |
| "scope": conn.get("scope"), | |
| "connected_at": conn.get("connected_at") | |
| } | |
| async def github_disconnect(user_id: str = Query(...)): | |
| """Remove GitHub connection for user.""" | |
| github_connections_col.delete_one({"user_id": user_id}) | |
| return {"disconnected": True} | |
| # ==================== GITHUB API HELPER ==================== | |
| def _get_user_token(user_id: str) -> str: | |
| """Get GitHub access token for user.""" | |
| conn = github_connections_col.find_one({"user_id": user_id, "status": "connected"}) | |
| if not conn: | |
| raise HTTPException(401, "GitHub not connected. Please connect your GitHub account first.") | |
| return conn["access_token"] | |
| def _github_api_get(endpoint: str, token: str, params: dict = None) -> dict: | |
| """Make authenticated GET request to GitHub API.""" | |
| url = f"https://api.github.com{endpoint}" | |
| headers = { | |
| "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" for modern GitHub API | |
| "Accept": "application/vnd.github.v3+json", | |
| "User-Agent": "NeuraPrompt-Agent" | |
| } | |
| r = requests.get(url, headers=headers, params=params, timeout=30) | |
| if r.status_code == 401: | |
| raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") | |
| r.raise_for_status() | |
| return r.json() | |
| def _github_api_post(endpoint: str, token: str, data: dict = None) -> dict: | |
| """Make authenticated POST request to GitHub API.""" | |
| url = f"https://api.github.com{endpoint}" | |
| headers = { | |
| "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" | |
| "Accept": "application/vnd.github.v3+json", | |
| "User-Agent": "NeuraPrompt-Agent" | |
| } | |
| r = requests.post(url, headers=headers, json=data, timeout=30) | |
| if r.status_code == 401: | |
| raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") | |
| r.raise_for_status() | |
| return r.json() | |
| def _github_api_patch(endpoint: str, token: str, data: dict = None) -> dict: | |
| """Make authenticated PATCH request to GitHub API.""" | |
| url = f"https://api.github.com{endpoint}" | |
| headers = { | |
| "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" | |
| "Accept": "application/vnd.github.v3+json", | |
| "User-Agent": "NeuraPrompt-Agent" | |
| } | |
| r = requests.patch(url, headers=headers, json=data, timeout=30) | |
| if r.status_code == 401: | |
| raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") | |
| r.raise_for_status() | |
| return r.json() | |
| def _github_api_delete(endpoint: str, token: str) -> bool: | |
| """Make authenticated DELETE request to GitHub API.""" | |
| url = f"https://api.github.com{endpoint}" | |
| headers = { | |
| "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" | |
| "Accept": "application/vnd.github.v3+json", | |
| "User-Agent": "NeuraPrompt-Agent" | |
| } | |
| r = requests.delete(url, headers=headers, timeout=30) | |
| if r.status_code == 401: | |
| raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") | |
| return r.status_code in (204, 200) | |
| def _github_api_put(endpoint: str, token: str, data: dict = None) -> dict: | |
| """PUT request helper.""" | |
| url = f"https://api.github.com{endpoint}" | |
| headers = { | |
| "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" | |
| "Accept": "application/vnd.github.v3+json", | |
| "User-Agent": "NeuraPrompt-Agent" | |
| } | |
| r = requests.put(url, headers=headers, json=data, timeout=30) | |
| # FIX: Added 401 handling that was missing | |
| if r.status_code == 401: | |
| raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") | |
| r.raise_for_status() | |
| return r.json() | |
| # ==================== GITHUB TOOLS (AI Agent Uses These) ==================== | |
| # These tools accept user_id and automatically use that user's token | |
| def github_list_repos(user_id: str, type_filter: str = "owner", sort: str = "updated", per_page: int = 10) -> str: | |
| """List user's GitHub repositories.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| repos = _github_api_get("/user/repos", token, { | |
| "type": type_filter, | |
| "sort": sort, | |
| "per_page": per_page | |
| }) | |
| lines = [f"π Found {len(repos)} repositories:"] | |
| for repo in repos: | |
| visibility = "π" if repo.get("private") else "π" | |
| lines.append(f"{visibility} {repo['full_name']} β β{repo.get('stargazers_count', 0)} β {repo.get('description', 'No description')}") | |
| return "\n".join(lines) | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_create_repo(user_id: str, name: str, description: str = "", private: bool = False, auto_init: bool = True) -> str: | |
| """Create a new GitHub repository.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| result = _github_api_post("/user/repos", token, { | |
| "name": name, | |
| "description": description, | |
| "private": private, | |
| "auto_init": auto_init | |
| }) | |
| return f"β Created repository: {result['html_url']}" | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_get_repo(user_id: str, owner: str, repo: str) -> str: | |
| """Get repository details.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| data = _github_api_get(f"/repos/{owner}/{repo}", token) | |
| return json.dumps({ | |
| "name": data["name"], | |
| "full_name": data["full_name"], | |
| "description": data.get("description"), | |
| "stars": data["stargazers_count"], | |
| "forks": data["forks_count"], | |
| "language": data.get("language"), | |
| "url": data["html_url"], | |
| "default_branch": data["default_branch"], | |
| "created_at": data["created_at"], | |
| "updated_at": data["updated_at"] | |
| }, indent=2) | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_list_issues(user_id: str, owner: str, repo: str, state: str = "open", per_page: int = 10) -> str: | |
| """List issues in a repository.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| issues = _github_api_get(f"/repos/{owner}/{repo}/issues", token, { | |
| "state": state, | |
| "per_page": per_page | |
| }) | |
| if not issues: | |
| return "No issues found." | |
| lines = [f"π Issues in {owner}/{repo}:"] | |
| for issue in issues: | |
| labels = ", ".join([l["name"] for l in issue.get("labels", [])]) | |
| lines.append(f"#{issue['number']} [{issue['state']}] {issue['title']}") | |
| if labels: | |
| lines.append(f" Labels: {labels}") | |
| return "\n".join(lines) | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_create_issue(user_id: str, owner: str, repo: str, title: str, body: str = "", labels: list = None) -> str: | |
| """Create a new issue.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| data = {"title": title, "body": body} | |
| if labels: | |
| data["labels"] = labels | |
| result = _github_api_post(f"/repos/{owner}/{repo}/issues", token, data) | |
| return f"β Created issue #{result['number']}: {result['html_url']}" | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_read_file(user_id: str, owner: str, repo: str, path: str, ref: str = "main") -> str: | |
| """Read file contents from a repository.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| data = _github_api_get(f"/repos/{owner}/{repo}/contents/{path}", token, {"ref": ref}) | |
| if data.get("type") == "file": | |
| import base64 | |
| content = base64.b64decode(data["content"]).decode("utf-8") | |
| return f"π {path}:\n```\n{content[:5000]}\n```" | |
| else: | |
| return f"Path is a directory, not a file: {path}" | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_write_file(user_id: str, owner: str, repo: str, path: str, content: str, message: str, branch: str = "main") -> str: | |
| """Create or update a file in a repository.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| # Check if file exists to get SHA | |
| sha = None | |
| try: | |
| existing = _github_api_get(f"/repos/{owner}/{repo}/contents/{path}", token, {"ref": branch}) | |
| sha = existing.get("sha") | |
| except HTTPException: | |
| # FIX: Don't swallow auth errors β re-raise them | |
| raise | |
| except Exception: | |
| # File doesn't exist, that's fine β we'll create it | |
| sha = None | |
| import base64 | |
| encoded = base64.b64encode(content.encode("utf-8")).decode() | |
| data = { | |
| "message": message, | |
| "content": encoded, | |
| "branch": branch | |
| } | |
| if sha: | |
| data["sha"] = sha | |
| result = _github_api_put(f"/repos/{owner}/{repo}/contents/{path}", token, data) | |
| return f"β {'Updated' if sha else 'Created'} {path}: {result['content']['html_url']}" | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_create_branch(user_id: str, owner: str, repo: str, branch: str, from_branch: str = "main") -> str: | |
| """Create a new branch from an existing branch.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| # Get SHA of from_branch | |
| ref_data = _github_api_get(f"/repos/{owner}/{repo}/git/refs/heads/{from_branch}", token) | |
| sha = ref_data["object"]["sha"] | |
| # Create new branch | |
| result = _github_api_post(f"/repos/{owner}/{repo}/git/refs", token, { | |
| "ref": f"refs/heads/{branch}", | |
| "sha": sha | |
| }) | |
| return f"β Created branch '{branch}' from '{from_branch}'" | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_create_pull_request(user_id: str, owner: str, repo: str, title: str, head: str, base: str, body: str = "") -> str: | |
| """Create a pull request.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| result = _github_api_post(f"/repos/{owner}/{repo}/pulls", token, { | |
| "title": title, | |
| "head": head, | |
| "base": base, | |
| "body": body | |
| }) | |
| return f"β Created PR #{result['number']}: {result['html_url']}" | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_search_code(user_id: str, query: str, per_page: int = 10) -> str: | |
| """Search code across GitHub.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| results = _github_api_get("/search/code", token, { | |
| "q": query, | |
| "per_page": per_page | |
| }) | |
| lines = [f"π Found {results.get('total_count', 0)} code results:"] | |
| for item in results.get("items", []): | |
| lines.append(f"π {item['repository']['full_name']}: {item['path']}") | |
| lines.append(f" {item['html_url']}") | |
| return "\n".join(lines) | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def github_get_user_profile(user_id: str) -> str: | |
| """Get connected GitHub user's profile.""" | |
| try: | |
| token = _get_user_token(user_id) | |
| data = _github_api_get("/user", token) | |
| return json.dumps({ | |
| "login": data["login"], | |
| "name": data.get("name"), | |
| "bio": data.get("bio"), | |
| "public_repos": data["public_repos"], | |
| "followers": data["followers"], | |
| "following": data["following"], | |
| "url": data["html_url"], | |
| "avatar": data.get("avatar_url") | |
| }, indent=2) | |
| except HTTPException as e: | |
| return f"GitHub error: {e.detail}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # ==================== REGISTER TOOLS ==================== | |
| def register_github_tools(): | |
| """Register all GitHub tools with the agent.""" | |
| register_tool("github_list_repos", github_list_repos, "List user's GitHub repositories") | |
| register_tool("github_create_repo", github_create_repo, "Create a new GitHub repository") | |
| register_tool("github_get_repo", github_get_repo, "Get repository details") | |
| register_tool("github_list_issues", github_list_issues, "List repository issues") | |
| register_tool("github_create_issue", github_create_issue, "Create a new issue") | |
| register_tool("github_read_file", github_read_file, "Read file from repository") | |
| register_tool("github_write_file", github_write_file, "Create or update file in repository") | |
| register_tool("github_create_branch", github_create_branch, "Create a new branch") | |
| register_tool("github_create_pull_request", github_create_pull_request, "Create a pull request") | |
| register_tool("github_search_code", github_search_code, "Search code across GitHub") | |
| register_tool("github_get_user_profile", github_get_user_profile, "Get user's GitHub profile") | |
| log.info("β GitHub tools registered") | |
| # Call this in your main app startup | |
| # register_github_tools() | |