Spaces:
Sleeping
Sleeping
| """ | |
| GitHub Webhooks Handler | |
| Processes real-time GitHub events and updates Supabase automatically | |
| """ | |
| import hmac | |
| import hashlib | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime | |
| from supabase import Client | |
| logger = logging.getLogger(__name__) | |
| class GitHubWebhookHandler: | |
| """Handle GitHub webhook events and update Supabase""" | |
| def __init__(self, supabase: Client, webhook_secret: str): | |
| """ | |
| Initialize webhook handler | |
| Args: | |
| supabase: Supabase client instance | |
| webhook_secret: GitHub webhook secret for signature verification | |
| """ | |
| self.supabase = supabase | |
| self.webhook_secret = webhook_secret | |
| def verify_signature(self, payload: bytes, signature: str) -> bool: | |
| """ | |
| Verify GitHub webhook signature | |
| Args: | |
| payload: Raw webhook payload bytes | |
| signature: X-Hub-Signature-256 header value | |
| Returns: | |
| True if signature is valid, False otherwise | |
| """ | |
| expected_signature = "sha256=" + hmac.new( | |
| self.webhook_secret.encode(), | |
| payload, | |
| hashlib.sha256 | |
| ).hexdigest() | |
| return hmac.compare_digest(signature, expected_signature) | |
| def get_firebase_id_from_installation(self, installation_id: int) -> Optional[str]: | |
| """Get firebase_id associated with installation_id""" | |
| try: | |
| result = self.supabase.table("Users").select("firebase_id").eq("installation_id", installation_id).limit(1).execute() | |
| if result.data: | |
| return result.data[0]["firebase_id"] | |
| except Exception as e: | |
| logger.error(f"Failed to get firebase_id for installation {installation_id}: {str(e)}") | |
| return None | |
| def get_github_id(self, firebase_id: str, full_name: str) -> Optional[int]: | |
| """Get github table id for a repo""" | |
| try: | |
| result = self.supabase.table("github").select("id").eq("firebase_id", firebase_id).eq("full_name", full_name).limit(1).execute() | |
| if result.data: | |
| return result.data[0]["id"] | |
| except Exception as e: | |
| logger.error(f"Failed to get github_id for {full_name}: {str(e)}") | |
| return None | |
| # ======================================================================== | |
| # PUSH EVENTS (Commits) | |
| # ======================================================================== | |
| def handle_push(self, payload: Dict[str, Any]) -> bool: | |
| """ | |
| Handle push event - new commits pushed to repo | |
| Args: | |
| payload: GitHub webhook payload | |
| Returns: | |
| True if handled successfully, False otherwise | |
| """ | |
| try: | |
| installation_id = payload.get("installation", {}).get("id") | |
| repo_full_name = payload.get("repository", {}).get("full_name") | |
| commits = payload.get("commits", []) | |
| if not installation_id or not repo_full_name or not commits: | |
| logger.warning("Incomplete push payload data") | |
| return False | |
| firebase_id = self.get_firebase_id_from_installation(installation_id) | |
| if not firebase_id: | |
| logger.warning(f"No firebase_id found for installation {installation_id}") | |
| return False | |
| github_id = self.get_github_id(firebase_id, repo_full_name) | |
| if not github_id: | |
| logger.warning(f"No github_id found for {repo_full_name}") | |
| return False | |
| # Process each commit | |
| commit_rows = [] | |
| for commit in commits: | |
| commit_rows.append({ | |
| "github_id": github_id, | |
| "firebase_id": firebase_id, | |
| "sha": commit.get("id"), | |
| "message": commit.get("message"), | |
| "author": commit.get("author", {}).get("name"), | |
| "author_email": commit.get("author", {}).get("email"), | |
| "committed_date": commit.get("timestamp"), | |
| }) | |
| if commit_rows: | |
| self.supabase.table("commits").upsert(commit_rows, on_conflict="firebase_id,sha").execute() | |
| logger.info(f"✅ Stored {len(commit_rows)} commits from push event") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to handle push event: {str(e)}", exc_info=True) | |
| return False | |
| # ======================================================================== | |
| # PULL REQUEST EVENTS | |
| # ======================================================================== | |
| def handle_pull_request(self, payload: Dict[str, Any]) -> bool: | |
| """ | |
| Handle pull request events (opened, synchronize, closed, merged) | |
| Args: | |
| payload: GitHub webhook payload | |
| Returns: | |
| True if handled successfully, False otherwise | |
| """ | |
| try: | |
| action = payload.get("action") # opened, synchronize, closed, reopened, edited | |
| installation_id = payload.get("installation", {}).get("id") | |
| repo_full_name = payload.get("repository", {}).get("full_name") | |
| pr = payload.get("pull_request", {}) | |
| if not installation_id or not repo_full_name or not pr: | |
| logger.warning("Incomplete pull_request payload data") | |
| return False | |
| firebase_id = self.get_firebase_id_from_installation(installation_id) | |
| if not firebase_id: | |
| logger.warning(f"No firebase_id found for installation {installation_id}") | |
| return False | |
| github_id = self.get_github_id(firebase_id, repo_full_name) | |
| if not github_id: | |
| logger.warning(f"No github_id found for {repo_full_name}") | |
| return False | |
| pr_data = { | |
| "github_id": github_id, | |
| "firebase_id": firebase_id, | |
| "pr_id": pr.get("id"), | |
| "pr_number": pr.get("number"), | |
| "title": pr.get("title"), | |
| "body": pr.get("body"), | |
| "state": pr.get("state"), | |
| "author": pr.get("user", {}).get("login"), | |
| "created_at": pr.get("created_at"), | |
| "updated_at": pr.get("updated_at"), | |
| "merged_at": pr.get("merged_at"), | |
| "closed_at": pr.get("closed_at"), | |
| "url": pr.get("html_url"), | |
| "head_branch": pr.get("head", {}).get("ref"), | |
| "base_branch": pr.get("base", {}).get("ref"), | |
| "draft": pr.get("draft", False), | |
| "mergeable": pr.get("mergeable"), | |
| "comments": pr.get("comments", 0), | |
| "commits": pr.get("commits", 0), | |
| "additions": pr.get("additions", 0), | |
| "deletions": pr.get("deletions", 0), | |
| "changed_files": pr.get("changed_files", 0), | |
| } | |
| self.supabase.table("pull_requests").upsert([pr_data], on_conflict="firebase_id,pr_id").execute() | |
| logger.info(f"✅ Updated PR #{pr.get('number')} ({action}) in {repo_full_name}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to handle pull_request event: {str(e)}", exc_info=True) | |
| return False | |
| # ======================================================================== | |
| # ISSUES EVENTS | |
| # ======================================================================== | |
| def handle_issues(self, payload: Dict[str, Any]) -> bool: | |
| """ | |
| Handle issues events (opened, edited, closed, reopened, etc.) | |
| Args: | |
| payload: GitHub webhook payload | |
| Returns: | |
| True if handled successfully, False otherwise | |
| """ | |
| try: | |
| action = payload.get("action") # opened, edited, closed, reopened | |
| installation_id = payload.get("installation", {}).get("id") | |
| repo_full_name = payload.get("repository", {}).get("full_name") | |
| issue = payload.get("issue", {}) | |
| if not installation_id or not repo_full_name or not issue: | |
| logger.warning("Incomplete issues payload data") | |
| return False | |
| # Skip if this is actually a pull request (pulled_request key exists) | |
| if "pull_request" in issue: | |
| logger.debug("Skipping issue event - this is actually a PR") | |
| return False | |
| firebase_id = self.get_firebase_id_from_installation(installation_id) | |
| if not firebase_id: | |
| logger.warning(f"No firebase_id found for installation {installation_id}") | |
| return False | |
| github_id = self.get_github_id(firebase_id, repo_full_name) | |
| if not github_id: | |
| logger.warning(f"No github_id found for {repo_full_name}") | |
| return False | |
| issue_data = { | |
| "github_id": github_id, | |
| "firebase_id": firebase_id, | |
| "issue_id": issue.get("id"), | |
| "issue_number": issue.get("number"), | |
| "title": issue.get("title"), | |
| "body": issue.get("body"), | |
| "state": issue.get("state"), | |
| "author": issue.get("user", {}).get("login"), | |
| "created_at": issue.get("created_at"), | |
| "updated_at": issue.get("updated_at"), | |
| "url": issue.get("html_url"), | |
| "labels": [label.get("name") for label in issue.get("labels", [])], | |
| "comments": issue.get("comments", 0), | |
| "reactions": issue.get("reactions", {}).get("total_count", 0) if issue.get("reactions") else 0, | |
| } | |
| self.supabase.table("issues").upsert([issue_data], on_conflict="firebase_id,issue_id").execute() | |
| logger.info(f"✅ Updated issue #{issue.get('number')} ({action}) in {repo_full_name}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to handle issues event: {str(e)}", exc_info=True) | |
| return False | |
| # ======================================================================== | |
| # REPOSITORY EVENTS | |
| # ======================================================================== | |
| def handle_repository(self, payload: Dict[str, Any]) -> bool: | |
| """ | |
| Handle repository events (created, deleted, renamed, etc.) | |
| Args: | |
| payload: GitHub webhook payload | |
| Returns: | |
| True if handled successfully, False otherwise | |
| """ | |
| try: | |
| action = payload.get("action") | |
| installation_id = payload.get("installation", {}).get("id") | |
| repo = payload.get("repository", {}) | |
| if not installation_id or not repo: | |
| logger.warning("Incomplete repository payload data") | |
| return False | |
| firebase_id = self.get_firebase_id_from_installation(installation_id) | |
| if not firebase_id: | |
| logger.warning(f"No firebase_id found for installation {installation_id}") | |
| return False | |
| logger.info(f"✅ Repository event ({action}): {repo.get('full_name')}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to handle repository event: {str(e)}", exc_info=True) | |
| return False | |
| # ======================================================================== | |
| # MAIN DISPATCHER | |
| # ======================================================================== | |
| def handle_webhook(self, event_type: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Main webhook dispatcher - routes to appropriate handler | |
| Args: | |
| event_type: GitHub event type (from X-GitHub-Event header) | |
| payload: GitHub webhook payload | |
| Returns: | |
| Response dict with status and details | |
| """ | |
| logger.info(f"Processing webhook event: {event_type}") | |
| handlers = { | |
| "push": self.handle_push, | |
| "pull_request": self.handle_pull_request, | |
| "issues": self.handle_issues, | |
| "repository": self.handle_repository, | |
| } | |
| handler = handlers.get(event_type) | |
| if not handler: | |
| logger.warning(f"No handler for event type: {event_type}") | |
| return { | |
| "success": False, | |
| "message": f"No handler for event type: {event_type}", | |
| "event": event_type | |
| } | |
| try: | |
| success = handler(payload) | |
| return { | |
| "success": success, | |
| "message": f"Event {event_type} processed", | |
| "event": event_type | |
| } | |
| except Exception as e: | |
| logger.error(f"Error handling {event_type}: {str(e)}", exc_info=True) | |
| return { | |
| "success": False, | |
| "message": f"Error processing {event_type}: {str(e)}", | |
| "event": event_type | |
| } | |