""" NeuraPrompt Agent — GitHub MCP Connector (v1.1 - Fixed Auth & Error Handling) OAuth + Per-User Token Storage + GitHub API Tools """ import os import json import time import logging import asyncio from typing import Optional, Dict, Any, List from datetime import datetime, timedelta from urllib.parse import urlencode import requests from fastapi import APIRouter, HTTPException, Request, Query from fastapi.responses import RedirectResponse, JSONResponse from pydantic import BaseModel from pymongo import MongoClient from pymongo.server_api import ServerApi import gridfs from .registry import register_tool, get_tool log = logging.getLogger("agent.tools.github") # ==================== CONFIG ==================== GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "") GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "") GITHUB_REDIRECT_URI = os.getenv("GITHUB_REDIRECT_URI", "") # MongoDB (using your existing setup) MONGO_URI = os.getenv("MONGO_URI", "") mongo_client = MongoClient( MONGO_URI, ssl=True, tlsAllowInvalidCertificates=False, tlsCAFile="/etc/ssl/certs/ca-certificates.crt", server_api=ServerApi("1"), ) neuraprompt_db = mongo_client["neuraprompt"] github_connections_col = neuraprompt_db["github_connections"] # NEW collection # ==================== GITHUB OAUTH FLOW ==================== github_router = APIRouter(prefix="/github") @github_router.get("/connect") async def github_connect(user_id: str = Query(...)): """ Step 1: Redirect user to GitHub OAuth login. Frontend calls: GET /api/github/connect?user_id= """ if not GITHUB_CLIENT_ID: raise HTTPException(400, "GitHub OAuth not configured") # Store state in DB to prevent CSRF state = os.urandom(16).hex() github_connections_col.update_one( {"user_id": user_id}, {"$set": { "state": state, "status": "pending", "created_at": datetime.utcnow() }}, upsert=True ) params = { "client_id": GITHUB_CLIENT_ID, "redirect_uri": GITHUB_REDIRECT_URI, "scope": "repo user read:org delete_repo workflow", "state": state, "allow_signup": "true" } auth_url = f"https://github.com/login/oauth/authorize?{urlencode(params)}" return RedirectResponse(auth_url) @github_router.get("/callback") async def github_callback(code: str = Query(...), state: str = Query(...)): """ Step 2: GitHub redirects back here after user authorizes. Exchanges code for access token, saves to DB. """ # Find user by state conn = github_connections_col.find_one({"state": state}) if not conn: raise HTTPException(400, "Invalid or expired state") user_id = conn["user_id"] # Exchange code for token token_response = requests.post( "https://github.com/login/oauth/access_token", headers={"Accept": "application/json"}, data={ "client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET, "code": code, "redirect_uri": GITHUB_REDIRECT_URI, "state": state }, timeout=30 ) token_data = token_response.json() if "error" in token_data: github_connections_col.update_one( {"user_id": user_id}, {"$set": {"status": "failed", "error": token_data.get("error_description", "Unknown error")}} ) raise HTTPException(400, f"GitHub OAuth error: {token_data['error']}") access_token = token_data.get("access_token") token_type = token_data.get("token_type", "bearer") scope = token_data.get("scope", "") refresh_token = token_data.get("refresh_token") # FIX: Store refresh token for future use # Get user info from GitHub user_info = _github_api_get("/user", access_token) # Save everything github_connections_col.update_one( {"user_id": user_id}, {"$set": { "status": "connected", "access_token": access_token, "token_type": token_type, "scope": scope, "refresh_token": refresh_token, # FIX: Store refresh token "github_login": user_info.get("login"), "github_id": user_info.get("id"), "github_avatar": user_info.get("avatar_url"), "connected_at": datetime.utcnow(), "last_used": datetime.utcnow(), "state": None # Clear state }}, upsert=True ) # Redirect back to frontend with success frontend_url = os.getenv("FRONTEND_URL", "https://neura-prompt-ai.vercel.app") return RedirectResponse(f"{frontend_url}/settings?github=connected&login={user_info.get('login')}") @github_router.get("/status") async def github_status(user_id: str = Query(...)): """Check if user has connected GitHub.""" conn = github_connections_col.find_one( {"user_id": user_id}, {"access_token": 0} # Don't return token ) if not conn or conn.get("status") != "connected": return {"connected": False} return { "connected": True, "login": conn.get("github_login"), "avatar": conn.get("github_avatar"), "scope": conn.get("scope"), "connected_at": conn.get("connected_at") } @github_router.post("/disconnect") async def github_disconnect(user_id: str = Query(...)): """Remove GitHub connection for user.""" github_connections_col.delete_one({"user_id": user_id}) return {"disconnected": True} # ==================== GITHUB API HELPER ==================== def _get_user_token(user_id: str) -> str: """Get GitHub access token for user.""" conn = github_connections_col.find_one({"user_id": user_id, "status": "connected"}) if not conn: raise HTTPException(401, "GitHub not connected. Please connect your GitHub account first.") return conn["access_token"] def _github_api_get(endpoint: str, token: str, params: dict = None) -> dict: """Make authenticated GET request to GitHub API.""" url = f"https://api.github.com{endpoint}" headers = { "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" for modern GitHub API "Accept": "application/vnd.github.v3+json", "User-Agent": "NeuraPrompt-Agent" } r = requests.get(url, headers=headers, params=params, timeout=30) if r.status_code == 401: raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") r.raise_for_status() return r.json() def _github_api_post(endpoint: str, token: str, data: dict = None) -> dict: """Make authenticated POST request to GitHub API.""" url = f"https://api.github.com{endpoint}" headers = { "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" "Accept": "application/vnd.github.v3+json", "User-Agent": "NeuraPrompt-Agent" } r = requests.post(url, headers=headers, json=data, timeout=30) if r.status_code == 401: raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") r.raise_for_status() return r.json() def _github_api_patch(endpoint: str, token: str, data: dict = None) -> dict: """Make authenticated PATCH request to GitHub API.""" url = f"https://api.github.com{endpoint}" headers = { "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" "Accept": "application/vnd.github.v3+json", "User-Agent": "NeuraPrompt-Agent" } r = requests.patch(url, headers=headers, json=data, timeout=30) if r.status_code == 401: raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") r.raise_for_status() return r.json() def _github_api_delete(endpoint: str, token: str) -> bool: """Make authenticated DELETE request to GitHub API.""" url = f"https://api.github.com{endpoint}" headers = { "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" "Accept": "application/vnd.github.v3+json", "User-Agent": "NeuraPrompt-Agent" } r = requests.delete(url, headers=headers, timeout=30) if r.status_code == 401: raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") return r.status_code in (204, 200) def _github_api_put(endpoint: str, token: str, data: dict = None) -> dict: """PUT request helper.""" url = f"https://api.github.com{endpoint}" headers = { "Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" "Accept": "application/vnd.github.v3+json", "User-Agent": "NeuraPrompt-Agent" } r = requests.put(url, headers=headers, json=data, timeout=30) # FIX: Added 401 handling that was missing if r.status_code == 401: raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.") r.raise_for_status() return r.json() # ==================== GITHUB TOOLS (AI Agent Uses These) ==================== # These tools accept user_id and automatically use that user's token def github_list_repos(user_id: str, type_filter: str = "owner", sort: str = "updated", per_page: int = 10) -> str: """List user's GitHub repositories.""" try: token = _get_user_token(user_id) repos = _github_api_get("/user/repos", token, { "type": type_filter, "sort": sort, "per_page": per_page }) lines = [f"📁 Found {len(repos)} repositories:"] for repo in repos: visibility = "🔒" if repo.get("private") else "🌐" lines.append(f"{visibility} {repo['full_name']} — ⭐{repo.get('stargazers_count', 0)} — {repo.get('description', 'No description')}") return "\n".join(lines) except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_create_repo(user_id: str, name: str, description: str = "", private: bool = False, auto_init: bool = True) -> str: """Create a new GitHub repository.""" try: token = _get_user_token(user_id) result = _github_api_post("/user/repos", token, { "name": name, "description": description, "private": private, "auto_init": auto_init }) return f"✅ Created repository: {result['html_url']}" except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_get_repo(user_id: str, owner: str, repo: str) -> str: """Get repository details.""" try: token = _get_user_token(user_id) data = _github_api_get(f"/repos/{owner}/{repo}", token) return json.dumps({ "name": data["name"], "full_name": data["full_name"], "description": data.get("description"), "stars": data["stargazers_count"], "forks": data["forks_count"], "language": data.get("language"), "url": data["html_url"], "default_branch": data["default_branch"], "created_at": data["created_at"], "updated_at": data["updated_at"] }, indent=2) except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_list_issues(user_id: str, owner: str, repo: str, state: str = "open", per_page: int = 10) -> str: """List issues in a repository.""" try: token = _get_user_token(user_id) issues = _github_api_get(f"/repos/{owner}/{repo}/issues", token, { "state": state, "per_page": per_page }) if not issues: return "No issues found." lines = [f"🐛 Issues in {owner}/{repo}:"] for issue in issues: labels = ", ".join([l["name"] for l in issue.get("labels", [])]) lines.append(f"#{issue['number']} [{issue['state']}] {issue['title']}") if labels: lines.append(f" Labels: {labels}") return "\n".join(lines) except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_create_issue(user_id: str, owner: str, repo: str, title: str, body: str = "", labels: list = None) -> str: """Create a new issue.""" try: token = _get_user_token(user_id) data = {"title": title, "body": body} if labels: data["labels"] = labels result = _github_api_post(f"/repos/{owner}/{repo}/issues", token, data) return f"✅ Created issue #{result['number']}: {result['html_url']}" except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_read_file(user_id: str, owner: str, repo: str, path: str, ref: str = "main") -> str: """Read file contents from a repository.""" try: token = _get_user_token(user_id) data = _github_api_get(f"/repos/{owner}/{repo}/contents/{path}", token, {"ref": ref}) if data.get("type") == "file": import base64 content = base64.b64decode(data["content"]).decode("utf-8") return f"📄 {path}:\n```\n{content[:5000]}\n```" else: return f"Path is a directory, not a file: {path}" except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_write_file(user_id: str, owner: str, repo: str, path: str, content: str, message: str, branch: str = "main") -> str: """Create or update a file in a repository.""" try: token = _get_user_token(user_id) # Check if file exists to get SHA sha = None try: existing = _github_api_get(f"/repos/{owner}/{repo}/contents/{path}", token, {"ref": branch}) sha = existing.get("sha") except HTTPException: # FIX: Don't swallow auth errors — re-raise them raise except Exception: # File doesn't exist, that's fine — we'll create it sha = None import base64 encoded = base64.b64encode(content.encode("utf-8")).decode() data = { "message": message, "content": encoded, "branch": branch } if sha: data["sha"] = sha result = _github_api_put(f"/repos/{owner}/{repo}/contents/{path}", token, data) return f"✅ {'Updated' if sha else 'Created'} {path}: {result['content']['html_url']}" except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_create_branch(user_id: str, owner: str, repo: str, branch: str, from_branch: str = "main") -> str: """Create a new branch from an existing branch.""" try: token = _get_user_token(user_id) # Get SHA of from_branch ref_data = _github_api_get(f"/repos/{owner}/{repo}/git/refs/heads/{from_branch}", token) sha = ref_data["object"]["sha"] # Create new branch result = _github_api_post(f"/repos/{owner}/{repo}/git/refs", token, { "ref": f"refs/heads/{branch}", "sha": sha }) return f"✅ Created branch '{branch}' from '{from_branch}'" except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_create_pull_request(user_id: str, owner: str, repo: str, title: str, head: str, base: str, body: str = "") -> str: """Create a pull request.""" try: token = _get_user_token(user_id) result = _github_api_post(f"/repos/{owner}/{repo}/pulls", token, { "title": title, "head": head, "base": base, "body": body }) return f"✅ Created PR #{result['number']}: {result['html_url']}" except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_search_code(user_id: str, query: str, per_page: int = 10) -> str: """Search code across GitHub.""" try: token = _get_user_token(user_id) results = _github_api_get("/search/code", token, { "q": query, "per_page": per_page }) lines = [f"🔍 Found {results.get('total_count', 0)} code results:"] for item in results.get("items", []): lines.append(f"📄 {item['repository']['full_name']}: {item['path']}") lines.append(f" {item['html_url']}") return "\n".join(lines) except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" def github_get_user_profile(user_id: str) -> str: """Get connected GitHub user's profile.""" try: token = _get_user_token(user_id) data = _github_api_get("/user", token) return json.dumps({ "login": data["login"], "name": data.get("name"), "bio": data.get("bio"), "public_repos": data["public_repos"], "followers": data["followers"], "following": data["following"], "url": data["html_url"], "avatar": data.get("avatar_url") }, indent=2) except HTTPException as e: return f"GitHub error: {e.detail}" except Exception as e: return f"Error: {str(e)}" # ==================== REGISTER TOOLS ==================== def register_github_tools(): """Register all GitHub tools with the agent.""" register_tool("github_list_repos", github_list_repos, "List user's GitHub repositories") register_tool("github_create_repo", github_create_repo, "Create a new GitHub repository") register_tool("github_get_repo", github_get_repo, "Get repository details") register_tool("github_list_issues", github_list_issues, "List repository issues") register_tool("github_create_issue", github_create_issue, "Create a new issue") register_tool("github_read_file", github_read_file, "Read file from repository") register_tool("github_write_file", github_write_file, "Create or update file in repository") register_tool("github_create_branch", github_create_branch, "Create a new branch") register_tool("github_create_pull_request", github_create_pull_request, "Create a pull request") register_tool("github_search_code", github_search_code, "Search code across GitHub") register_tool("github_get_user_profile", github_get_user_profile, "Get user's GitHub profile") log.info("✅ GitHub tools registered") # Call this in your main app startup # register_github_tools()