self-trained2 / agent /tools /github_tools.py
DeepImagix's picture
Upload github_tools.py
16af15f verified
Raw
History Blame Contribute Delete
19.1 kB
"""
NeuraPrompt Agent β€” GitHub MCP Connector (v1.1 - Fixed Auth & Error Handling)
OAuth + Per-User Token Storage + GitHub API Tools
"""
import os
import json
import time
import logging
import asyncio
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from urllib.parse import urlencode
import requests
from fastapi import APIRouter, HTTPException, Request, Query
from fastapi.responses import RedirectResponse, JSONResponse
from pydantic import BaseModel
from pymongo import MongoClient
from pymongo.server_api import ServerApi
import gridfs
from .registry import register_tool, get_tool
log = logging.getLogger("agent.tools.github")
# ==================== CONFIG ====================
GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "")
GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "")
GITHUB_REDIRECT_URI = os.getenv("GITHUB_REDIRECT_URI", "")
# MongoDB (using your existing setup)
MONGO_URI = os.getenv("MONGO_URI", "")
mongo_client = MongoClient(
MONGO_URI, ssl=True,
tlsAllowInvalidCertificates=False,
tlsCAFile="/etc/ssl/certs/ca-certificates.crt",
server_api=ServerApi("1"),
)
neuraprompt_db = mongo_client["neuraprompt"]
github_connections_col = neuraprompt_db["github_connections"] # NEW collection
# ==================== GITHUB OAUTH FLOW ====================
github_router = APIRouter(prefix="/github")
@github_router.get("/connect")
async def github_connect(user_id: str = Query(...)):
"""
Step 1: Redirect user to GitHub OAuth login.
Frontend calls: GET /api/github/connect?user_id=<uid>
"""
if not GITHUB_CLIENT_ID:
raise HTTPException(400, "GitHub OAuth not configured")
# Store state in DB to prevent CSRF
state = os.urandom(16).hex()
github_connections_col.update_one(
{"user_id": user_id},
{"$set": {
"state": state,
"status": "pending",
"created_at": datetime.utcnow()
}},
upsert=True
)
params = {
"client_id": GITHUB_CLIENT_ID,
"redirect_uri": GITHUB_REDIRECT_URI,
"scope": "repo user read:org delete_repo workflow",
"state": state,
"allow_signup": "true"
}
auth_url = f"https://github.com/login/oauth/authorize?{urlencode(params)}"
return RedirectResponse(auth_url)
@github_router.get("/callback")
async def github_callback(code: str = Query(...), state: str = Query(...)):
"""
Step 2: GitHub redirects back here after user authorizes.
Exchanges code for access token, saves to DB.
"""
# Find user by state
conn = github_connections_col.find_one({"state": state})
if not conn:
raise HTTPException(400, "Invalid or expired state")
user_id = conn["user_id"]
# Exchange code for token
token_response = requests.post(
"https://github.com/login/oauth/access_token",
headers={"Accept": "application/json"},
data={
"client_id": GITHUB_CLIENT_ID,
"client_secret": GITHUB_CLIENT_SECRET,
"code": code,
"redirect_uri": GITHUB_REDIRECT_URI,
"state": state
},
timeout=30
)
token_data = token_response.json()
if "error" in token_data:
github_connections_col.update_one(
{"user_id": user_id},
{"$set": {"status": "failed", "error": token_data.get("error_description", "Unknown error")}}
)
raise HTTPException(400, f"GitHub OAuth error: {token_data['error']}")
access_token = token_data.get("access_token")
token_type = token_data.get("token_type", "bearer")
scope = token_data.get("scope", "")
refresh_token = token_data.get("refresh_token") # FIX: Store refresh token for future use
# Get user info from GitHub
user_info = _github_api_get("/user", access_token)
# Save everything
github_connections_col.update_one(
{"user_id": user_id},
{"$set": {
"status": "connected",
"access_token": access_token,
"token_type": token_type,
"scope": scope,
"refresh_token": refresh_token, # FIX: Store refresh token
"github_login": user_info.get("login"),
"github_id": user_info.get("id"),
"github_avatar": user_info.get("avatar_url"),
"connected_at": datetime.utcnow(),
"last_used": datetime.utcnow(),
"state": None # Clear state
}},
upsert=True
)
# Redirect back to frontend with success
frontend_url = os.getenv("FRONTEND_URL", "https://neura-prompt-ai.vercel.app")
return RedirectResponse(f"{frontend_url}/settings?github=connected&login={user_info.get('login')}")
@github_router.get("/status")
async def github_status(user_id: str = Query(...)):
"""Check if user has connected GitHub."""
conn = github_connections_col.find_one(
{"user_id": user_id},
{"access_token": 0} # Don't return token
)
if not conn or conn.get("status") != "connected":
return {"connected": False}
return {
"connected": True,
"login": conn.get("github_login"),
"avatar": conn.get("github_avatar"),
"scope": conn.get("scope"),
"connected_at": conn.get("connected_at")
}
@github_router.post("/disconnect")
async def github_disconnect(user_id: str = Query(...)):
"""Remove GitHub connection for user."""
github_connections_col.delete_one({"user_id": user_id})
return {"disconnected": True}
# ==================== GITHUB API HELPER ====================
def _get_user_token(user_id: str) -> str:
"""Get GitHub access token for user."""
conn = github_connections_col.find_one({"user_id": user_id, "status": "connected"})
if not conn:
raise HTTPException(401, "GitHub not connected. Please connect your GitHub account first.")
return conn["access_token"]
def _github_api_get(endpoint: str, token: str, params: dict = None) -> dict:
"""Make authenticated GET request to GitHub API."""
url = f"https://api.github.com{endpoint}"
headers = {
"Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer" for modern GitHub API
"Accept": "application/vnd.github.v3+json",
"User-Agent": "NeuraPrompt-Agent"
}
r = requests.get(url, headers=headers, params=params, timeout=30)
if r.status_code == 401:
raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.")
r.raise_for_status()
return r.json()
def _github_api_post(endpoint: str, token: str, data: dict = None) -> dict:
"""Make authenticated POST request to GitHub API."""
url = f"https://api.github.com{endpoint}"
headers = {
"Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer"
"Accept": "application/vnd.github.v3+json",
"User-Agent": "NeuraPrompt-Agent"
}
r = requests.post(url, headers=headers, json=data, timeout=30)
if r.status_code == 401:
raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.")
r.raise_for_status()
return r.json()
def _github_api_patch(endpoint: str, token: str, data: dict = None) -> dict:
"""Make authenticated PATCH request to GitHub API."""
url = f"https://api.github.com{endpoint}"
headers = {
"Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer"
"Accept": "application/vnd.github.v3+json",
"User-Agent": "NeuraPrompt-Agent"
}
r = requests.patch(url, headers=headers, json=data, timeout=30)
if r.status_code == 401:
raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.")
r.raise_for_status()
return r.json()
def _github_api_delete(endpoint: str, token: str) -> bool:
"""Make authenticated DELETE request to GitHub API."""
url = f"https://api.github.com{endpoint}"
headers = {
"Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer"
"Accept": "application/vnd.github.v3+json",
"User-Agent": "NeuraPrompt-Agent"
}
r = requests.delete(url, headers=headers, timeout=30)
if r.status_code == 401:
raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.")
return r.status_code in (204, 200)
def _github_api_put(endpoint: str, token: str, data: dict = None) -> dict:
"""PUT request helper."""
url = f"https://api.github.com{endpoint}"
headers = {
"Authorization": f"Bearer {token}", # FIX: Changed from "token" to "Bearer"
"Accept": "application/vnd.github.v3+json",
"User-Agent": "NeuraPrompt-Agent"
}
r = requests.put(url, headers=headers, json=data, timeout=30)
# FIX: Added 401 handling that was missing
if r.status_code == 401:
raise HTTPException(401, "GitHub token expired or invalid. Please reconnect.")
r.raise_for_status()
return r.json()
# ==================== GITHUB TOOLS (AI Agent Uses These) ====================
# These tools accept user_id and automatically use that user's token
def github_list_repos(user_id: str, type_filter: str = "owner", sort: str = "updated", per_page: int = 10) -> str:
"""List user's GitHub repositories."""
try:
token = _get_user_token(user_id)
repos = _github_api_get("/user/repos", token, {
"type": type_filter,
"sort": sort,
"per_page": per_page
})
lines = [f"πŸ“ Found {len(repos)} repositories:"]
for repo in repos:
visibility = "πŸ”’" if repo.get("private") else "🌐"
lines.append(f"{visibility} {repo['full_name']} β€” ⭐{repo.get('stargazers_count', 0)} β€” {repo.get('description', 'No description')}")
return "\n".join(lines)
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_create_repo(user_id: str, name: str, description: str = "", private: bool = False, auto_init: bool = True) -> str:
"""Create a new GitHub repository."""
try:
token = _get_user_token(user_id)
result = _github_api_post("/user/repos", token, {
"name": name,
"description": description,
"private": private,
"auto_init": auto_init
})
return f"βœ… Created repository: {result['html_url']}"
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_get_repo(user_id: str, owner: str, repo: str) -> str:
"""Get repository details."""
try:
token = _get_user_token(user_id)
data = _github_api_get(f"/repos/{owner}/{repo}", token)
return json.dumps({
"name": data["name"],
"full_name": data["full_name"],
"description": data.get("description"),
"stars": data["stargazers_count"],
"forks": data["forks_count"],
"language": data.get("language"),
"url": data["html_url"],
"default_branch": data["default_branch"],
"created_at": data["created_at"],
"updated_at": data["updated_at"]
}, indent=2)
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_list_issues(user_id: str, owner: str, repo: str, state: str = "open", per_page: int = 10) -> str:
"""List issues in a repository."""
try:
token = _get_user_token(user_id)
issues = _github_api_get(f"/repos/{owner}/{repo}/issues", token, {
"state": state,
"per_page": per_page
})
if not issues:
return "No issues found."
lines = [f"πŸ› Issues in {owner}/{repo}:"]
for issue in issues:
labels = ", ".join([l["name"] for l in issue.get("labels", [])])
lines.append(f"#{issue['number']} [{issue['state']}] {issue['title']}")
if labels:
lines.append(f" Labels: {labels}")
return "\n".join(lines)
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_create_issue(user_id: str, owner: str, repo: str, title: str, body: str = "", labels: list = None) -> str:
"""Create a new issue."""
try:
token = _get_user_token(user_id)
data = {"title": title, "body": body}
if labels:
data["labels"] = labels
result = _github_api_post(f"/repos/{owner}/{repo}/issues", token, data)
return f"βœ… Created issue #{result['number']}: {result['html_url']}"
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_read_file(user_id: str, owner: str, repo: str, path: str, ref: str = "main") -> str:
"""Read file contents from a repository."""
try:
token = _get_user_token(user_id)
data = _github_api_get(f"/repos/{owner}/{repo}/contents/{path}", token, {"ref": ref})
if data.get("type") == "file":
import base64
content = base64.b64decode(data["content"]).decode("utf-8")
return f"πŸ“„ {path}:\n```\n{content[:5000]}\n```"
else:
return f"Path is a directory, not a file: {path}"
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_write_file(user_id: str, owner: str, repo: str, path: str, content: str, message: str, branch: str = "main") -> str:
"""Create or update a file in a repository."""
try:
token = _get_user_token(user_id)
# Check if file exists to get SHA
sha = None
try:
existing = _github_api_get(f"/repos/{owner}/{repo}/contents/{path}", token, {"ref": branch})
sha = existing.get("sha")
except HTTPException:
# FIX: Don't swallow auth errors β€” re-raise them
raise
except Exception:
# File doesn't exist, that's fine β€” we'll create it
sha = None
import base64
encoded = base64.b64encode(content.encode("utf-8")).decode()
data = {
"message": message,
"content": encoded,
"branch": branch
}
if sha:
data["sha"] = sha
result = _github_api_put(f"/repos/{owner}/{repo}/contents/{path}", token, data)
return f"βœ… {'Updated' if sha else 'Created'} {path}: {result['content']['html_url']}"
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_create_branch(user_id: str, owner: str, repo: str, branch: str, from_branch: str = "main") -> str:
"""Create a new branch from an existing branch."""
try:
token = _get_user_token(user_id)
# Get SHA of from_branch
ref_data = _github_api_get(f"/repos/{owner}/{repo}/git/refs/heads/{from_branch}", token)
sha = ref_data["object"]["sha"]
# Create new branch
result = _github_api_post(f"/repos/{owner}/{repo}/git/refs", token, {
"ref": f"refs/heads/{branch}",
"sha": sha
})
return f"βœ… Created branch '{branch}' from '{from_branch}'"
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_create_pull_request(user_id: str, owner: str, repo: str, title: str, head: str, base: str, body: str = "") -> str:
"""Create a pull request."""
try:
token = _get_user_token(user_id)
result = _github_api_post(f"/repos/{owner}/{repo}/pulls", token, {
"title": title,
"head": head,
"base": base,
"body": body
})
return f"βœ… Created PR #{result['number']}: {result['html_url']}"
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_search_code(user_id: str, query: str, per_page: int = 10) -> str:
"""Search code across GitHub."""
try:
token = _get_user_token(user_id)
results = _github_api_get("/search/code", token, {
"q": query,
"per_page": per_page
})
lines = [f"πŸ” Found {results.get('total_count', 0)} code results:"]
for item in results.get("items", []):
lines.append(f"πŸ“„ {item['repository']['full_name']}: {item['path']}")
lines.append(f" {item['html_url']}")
return "\n".join(lines)
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
def github_get_user_profile(user_id: str) -> str:
"""Get connected GitHub user's profile."""
try:
token = _get_user_token(user_id)
data = _github_api_get("/user", token)
return json.dumps({
"login": data["login"],
"name": data.get("name"),
"bio": data.get("bio"),
"public_repos": data["public_repos"],
"followers": data["followers"],
"following": data["following"],
"url": data["html_url"],
"avatar": data.get("avatar_url")
}, indent=2)
except HTTPException as e:
return f"GitHub error: {e.detail}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== REGISTER TOOLS ====================
def register_github_tools():
"""Register all GitHub tools with the agent."""
register_tool("github_list_repos", github_list_repos, "List user's GitHub repositories")
register_tool("github_create_repo", github_create_repo, "Create a new GitHub repository")
register_tool("github_get_repo", github_get_repo, "Get repository details")
register_tool("github_list_issues", github_list_issues, "List repository issues")
register_tool("github_create_issue", github_create_issue, "Create a new issue")
register_tool("github_read_file", github_read_file, "Read file from repository")
register_tool("github_write_file", github_write_file, "Create or update file in repository")
register_tool("github_create_branch", github_create_branch, "Create a new branch")
register_tool("github_create_pull_request", github_create_pull_request, "Create a pull request")
register_tool("github_search_code", github_search_code, "Search code across GitHub")
register_tool("github_get_user_profile", github_get_user_profile, "Get user's GitHub profile")
log.info("βœ… GitHub tools registered")
# Call this in your main app startup
# register_github_tools()