#!/usr/bin/env python3 # github_issues_fetcher.py - Fetch real-world bug reports from GitHub import requests import pandas as pd import json import time import re from datetime import datetime, timedelta from typing import List, Dict, Optional class GitHubIssuesFetcher: def __init__(self, github_token: Optional[str] = None): self.github_token = github_token self.session = requests.Session() if github_token: self.session.headers.update({ 'Authorization': f'token {github_token}', 'Accept': 'application/vnd.github.v3+json' }) # Popular repositories with diverse bug types self.target_repos = [ # Web Development 'facebook/react', 'vuejs/vue', 'angular/angular', 'microsoft/TypeScript', 'nodejs/node', # Mobile Development 'facebook/react-native', 'flutter/flutter', 'ionic-team/ionic', # Backend & APIs 'expressjs/express', 'fastify/fastify', 'nestjs/nest', 'prisma/prisma', 'sequelize/sequelize', # Security & Authentication 'auth0/auth0.js', 'nextauthjs/next-auth', 'supabase/supabase', # UI Libraries 'mui/material-ui', 'ant-design/ant-design', 'chakra-ui/chakra-ui', 'tailwindlabs/tailwindcss', # Testing & Quality 'jestjs/jest', 'cypress-io/cypress', 'playwright/playwright', # DevOps & Tools 'docker/compose', 'kubernetes/kubernetes', 'hashicorp/terraform', # Data & ML 'pandas-dev/pandas', 'numpy/numpy', 'scikit-learn/scikit-learn', 'tensorflow/tensorflow' ] def fetch_issues_from_repo(self, repo: str, max_issues: int = 100) -> List[Dict]: """Fetch issues from a specific repository""" print(f"šŸ” Fetching issues from {repo}...") issues = [] page = 1 per_page = min(100, max_issues) while len(issues) < max_issues: url = f"https://api.github.com/repos/{repo}/issues" params = { 'state': 'closed', 'sort': 'created', 'direction': 'desc', 'per_page': per_page, 'page': page } try: response = self.session.get(url, params=params, timeout=30) if response.status_code == 403: print(f"āš ļø Rate limit hit for {repo}, skipping...") break elif response.status_code != 200: print(f"āŒ Error {response.status_code} for {repo}") break page_issues = response.json() if not page_issues: break # Filter for actual bugs (not feature requests) bug_issues = [] for issue in page_issues: if self._is_bug_issue(issue): bug_issues.append(issue) issues.extend(bug_issues[:max_issues - len(issues)]) if len(page_issues) < per_page: break page += 1 # Rate limiting if not self.github_token: time.sleep(1) # Respect rate limits except Exception as e: print(f"āŒ Error fetching from {repo}: {e}") break print(f"āœ… Fetched {len(issues)} bug issues from {repo}") return issues def _is_bug_issue(self, issue: Dict) -> bool: """Determine if an issue is a bug (not feature request)""" title = issue.get('title', '').lower() body = issue.get('body', '').lower() labels = [label['name'].lower() for label in issue.get('labels', [])] # Bug indicators bug_keywords = [ 'bug', 'fix', 'crash', 'error', 'exception', 'fail', 'broken', 'issue', 'problem', 'defect', 'vulnerability', 'security', 'performance', 'slow', 'timeout', 'memory leak', 'race condition' ] # Feature request indicators feature_keywords = [ 'feature', 'enhancement', 'improvement', 'request', 'proposal', 'suggestion', 'idea', 'wishlist', 'roadmap' ] # Check for bug keywords has_bug_keywords = any(keyword in title or keyword in body for keyword in bug_keywords) # Check for feature keywords has_feature_keywords = any(keyword in title or keyword in body for keyword in feature_keywords) # Check labels has_bug_labels = any(label in ['bug', 'defect', 'security', 'performance'] for label in labels) has_feature_labels = any(label in ['enhancement', 'feature', 'proposal'] for label in labels) # Priority: labels > keywords if has_bug_labels and not has_feature_labels: return True elif has_feature_labels: return False elif has_bug_keywords and not has_feature_keywords: return True return False def convert_to_training_format(self, issues: List[Dict]) -> List[Dict]: """Convert GitHub issues to our training format""" print("šŸ”„ Converting GitHub issues to training format...") converted = [] for issue in issues: try: # Extract basic info title = issue.get('title', '') body = issue.get('body', '') labels = [label['name'] for label in issue.get('labels', [])] created_at = issue.get('created_at', '') # Skip if no meaningful content if len(title) < 10 or len(body) < 20: continue # Infer classification from content and labels severity = self._infer_severity(title, body, labels) component = self._infer_component(title, body, labels) bug_type = self._infer_bug_type(title, body, labels) team = self._infer_team(component, bug_type) priority = self._map_severity_to_priority(severity) converted_issue = { 'title': self._clean_text(title), 'description': self._clean_description(body), 'severity': severity, 'component': component, 'bug_type': bug_type, 'team': team, 'priority': priority, 'source': 'github_issues', 'original_id': str(issue.get('id', '')), 'repo': issue.get('repository', {}).get('full_name', ''), 'labels': ', '.join(labels), 'created_at': created_at } converted.append(converted_issue) except Exception as e: continue print(f"āœ… Converted {len(converted)} issues to training format") return converted def _infer_severity(self, title: str, body: str, labels: List[str]) -> str: """Infer severity from issue content""" text = (title + ' ' + body).lower() labels_lower = [label.lower() for label in labels] # Critical indicators critical_keywords = [ 'crash', 'segfault', 'data loss', 'security breach', 'vulnerability', 'remote code execution', 'sql injection', 'xss', 'authentication bypass' ] if any(keyword in text for keyword in critical_keywords) or 'critical' in labels_lower: return 'Critical' # High indicators high_keywords = [ 'major bug', 'broken', 'fail', 'error', 'exception', 'performance issue', 'memory leak', 'race condition', 'deadlock' ] if any(keyword in text for keyword in high_keywords) or 'high' in labels_lower: return 'High' # Low indicators low_keywords = [ 'minor', 'cosmetic', 'typo', 'grammar', 'enhancement', 'improvement' ] if any(keyword in text for keyword in low_keywords) or 'low' in labels_lower: return 'Low' return 'Medium' # Default def _infer_component(self, title: str, body: str, labels: List[str]) -> str: """Infer component from issue content""" text = (title + ' ' + body).lower() labels_lower = [label.lower() for label in labels] # Frontend indicators frontend_keywords = [ 'ui', 'ux', 'frontend', 'css', 'html', 'javascript', 'react', 'vue', 'angular', 'button', 'form', 'layout', 'responsive', 'mobile-friendly', 'browser' ] if any(keyword in text for keyword in frontend_keywords) or any(label in ['ui', 'frontend'] for label in labels_lower): return 'Frontend' # Mobile indicators mobile_keywords = [ 'mobile', 'android', 'ios', 'react native', 'flutter', 'app', 'native' ] if any(keyword in text for keyword in mobile_keywords) or any(label in ['mobile', 'android', 'ios'] for label in labels_lower): return 'Mobile' # Backend indicators backend_keywords = [ 'backend', 'api', 'server', 'database', 'sql', 'authentication', 'authorization', 'middleware', 'service', 'endpoint' ] if any(keyword in text for keyword in backend_keywords) or any(label in ['backend', 'api'] for label in labels_lower): return 'Backend' return 'Backend' # Default for most issues def _infer_bug_type(self, title: str, body: str, labels: List[str]) -> str: """Infer bug type from issue content""" text = (title + ' ' + body).lower() labels_lower = [label.lower() for label in labels] # Security indicators security_keywords = [ 'security', 'vulnerability', 'xss', 'csrf', 'injection', 'authentication', 'authorization', 'permission', 'access control' ] if any(keyword in text for keyword in security_keywords) or any(label in ['security', 'vulnerability'] for label in labels_lower): return 'Security' # Performance indicators performance_keywords = [ 'performance', 'slow', 'timeout', 'memory', 'cpu', 'bottleneck', 'optimization' ] if any(keyword in text for keyword in performance_keywords) or any(label in ['performance'] for label in labels_lower): return 'Performance' # UI/UX indicators ui_keywords = [ 'ui', 'ux', 'interface', 'design', 'layout', 'visual', 'appearance', 'user experience' ] if any(keyword in text for keyword in ui_keywords) or any(label in ['ui', 'ux'] for label in labels_lower): return 'UI/UX' # Functional indicators functional_keywords = [ 'functionality', 'feature', 'logic', 'workflow', 'process', 'business logic' ] if any(keyword in text for keyword in functional_keywords): return 'Functional' return 'Functional' # Default def _infer_team(self, component: str, bug_type: str) -> str: """Infer team based on component and bug type""" if component == 'Frontend': return 'Frontend Team' elif component == 'Mobile': return 'Mobile Team' elif bug_type == 'Security': return 'Security Team' else: return 'Dev Team' def _map_severity_to_priority(self, severity: str) -> str: """Map severity to priority""" return {'Critical': 'P0', 'High': 'P1', 'Medium': 'P2', 'Low': 'P3'}.get(severity, 'P2') def _clean_text(self, text: str) -> str: """Clean and truncate text""" if not text: return "" # Remove URLs, emails, etc. text = re.sub(r'http[s]?://\S+', '[URL]', text) text = re.sub(r'\S+@\S+', '[EMAIL]', text) text = re.sub(r'#\d+', '[ISSUE]', text) # Clean whitespace text = re.sub(r'\s+', ' ', text).strip() # Truncate if too long if len(text) > 100: text = text[:97] + "..." return text def _clean_description(self, text: str) -> str: """Clean and truncate description""" if not text: return "" # Remove markdown formatting text = re.sub(r'```[\s\S]*?```', '[CODE]', text) text = re.sub(r'`[^`]+`', '[CODE]', text) text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) text = re.sub(r'\*([^*]+)\*', r'\1', text) # Remove URLs, emails, etc. text = re.sub(r'http[s]?://\S+', '[URL]', text) text = re.sub(r'\S+@\S+', '[EMAIL]', text) text = re.sub(r'#\d+', '[ISSUE]', text) # Clean whitespace text = re.sub(r'\s+', ' ', text).strip() # Truncate if too long if len(text) > 500: text = text[:497] + "..." return text def fetch_comprehensive_issues(self, max_issues_per_repo: int = 50) -> List[Dict]: """Fetch issues from all target repositories""" print("šŸš€ COMPREHENSIVE GITHUB ISSUES COLLECTION") print("=" * 60) print(f"Target: {len(self.target_repos)} repositories") print(f"Expected: ~{len(self.target_repos) * max_issues_per_repo} issues") print() all_issues = [] for repo in self.target_repos: try: issues = self.fetch_issues_from_repo(repo, max_issues_per_repo) if issues: converted = self.convert_to_training_format(issues) all_issues.extend(converted) print(f"āœ… {repo}: {len(converted)} issues added") else: print(f"āš ļø {repo}: No issues found") # Rate limiting if not self.github_token: time.sleep(2) except Exception as e: print(f"āŒ {repo}: Error - {e}") continue print(f"\nšŸŽ‰ Total issues collected: {len(all_issues)}") return all_issues def save_to_csv(self, issues: List[Dict], filename: str = 'github_issues_training_data.csv') -> Optional[str]: """Save issues to CSV""" if not issues: print("āŒ No issues to save") return None df = pd.DataFrame(issues) df.to_csv(filename, index=False) print(f"šŸ’¾ Saved {len(issues)} issues to {filename}") return filename def main(): """Main function to fetch GitHub issues""" print("šŸš€ GITHUB ISSUES DATA COLLECTOR") print("=" * 50) print("šŸ“‹ Purpose: Collect real-world bug reports from popular GitHub repositories") print("šŸŽÆ Goal: Maximum precision model training") print() # Check for GitHub token github_token = input("Enter GitHub token (optional, for higher rate limits): ").strip() if not github_token: print("āš ļø No token provided - using public API (limited to 60 requests/hour)") fetcher = GitHubIssuesFetcher(github_token if github_token else None) # Fetch issues print("\nšŸ”„ Starting collection...") issues = fetcher.fetch_comprehensive_issues(max_issues_per_repo=30) if issues: # Save to CSV filename = fetcher.save_to_csv(issues) print(f"\nšŸŽ‰ SUCCESS!") print(f"šŸ“ File: {filename}") print(f"šŸ“Š Total issues: {len(issues)}") # Show sample if issues: sample = issues[0] print(f"\nšŸ“‹ Sample issue:") print(f" Title: {sample['title'][:60]}...") print(f" Severity: {sample['severity']}") print(f" Component: {sample['component']}") print(f" Bug Type: {sample['bug_type']}") print(f" Team: {sample['team']}") print(f" Source: {sample['source']}") print(f"\nšŸŽÆ Next steps:") print("1. Upload this CSV via UI (Data Import → CSV Import)") print("2. Train your model with real GitHub bug data") print("3. Achieve maximum precision!") return filename else: print("āŒ No issues collected") return None if __name__ == "__main__": main()