Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # github_issues_fetcher.py - Fetch real-world bug reports from GitHub | |
| import requests | |
| import pandas as pd | |
| import json | |
| import time | |
| import re | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional | |
| class GitHubIssuesFetcher: | |
| def __init__(self, github_token: Optional[str] = None): | |
| self.github_token = github_token | |
| self.session = requests.Session() | |
| if github_token: | |
| self.session.headers.update({ | |
| 'Authorization': f'token {github_token}', | |
| 'Accept': 'application/vnd.github.v3+json' | |
| }) | |
| # Popular repositories with diverse bug types | |
| self.target_repos = [ | |
| # Web Development | |
| 'facebook/react', | |
| 'vuejs/vue', | |
| 'angular/angular', | |
| 'microsoft/TypeScript', | |
| 'nodejs/node', | |
| # Mobile Development | |
| 'facebook/react-native', | |
| 'flutter/flutter', | |
| 'ionic-team/ionic', | |
| # Backend & APIs | |
| 'expressjs/express', | |
| 'fastify/fastify', | |
| 'nestjs/nest', | |
| 'prisma/prisma', | |
| 'sequelize/sequelize', | |
| # Security & Authentication | |
| 'auth0/auth0.js', | |
| 'nextauthjs/next-auth', | |
| 'supabase/supabase', | |
| # UI Libraries | |
| 'mui/material-ui', | |
| 'ant-design/ant-design', | |
| 'chakra-ui/chakra-ui', | |
| 'tailwindlabs/tailwindcss', | |
| # Testing & Quality | |
| 'jestjs/jest', | |
| 'cypress-io/cypress', | |
| 'playwright/playwright', | |
| # DevOps & Tools | |
| 'docker/compose', | |
| 'kubernetes/kubernetes', | |
| 'hashicorp/terraform', | |
| # Data & ML | |
| 'pandas-dev/pandas', | |
| 'numpy/numpy', | |
| 'scikit-learn/scikit-learn', | |
| 'tensorflow/tensorflow' | |
| ] | |
| def fetch_issues_from_repo(self, repo: str, max_issues: int = 100) -> List[Dict]: | |
| """Fetch issues from a specific repository""" | |
| print(f"π Fetching issues from {repo}...") | |
| issues = [] | |
| page = 1 | |
| per_page = min(100, max_issues) | |
| while len(issues) < max_issues: | |
| url = f"https://api.github.com/repos/{repo}/issues" | |
| params = { | |
| 'state': 'closed', | |
| 'sort': 'created', | |
| 'direction': 'desc', | |
| 'per_page': per_page, | |
| 'page': page | |
| } | |
| try: | |
| response = self.session.get(url, params=params, timeout=30) | |
| if response.status_code == 403: | |
| print(f"β οΈ Rate limit hit for {repo}, skipping...") | |
| break | |
| elif response.status_code != 200: | |
| print(f"β Error {response.status_code} for {repo}") | |
| break | |
| page_issues = response.json() | |
| if not page_issues: | |
| break | |
| # Filter for actual bugs (not feature requests) | |
| bug_issues = [] | |
| for issue in page_issues: | |
| if self._is_bug_issue(issue): | |
| bug_issues.append(issue) | |
| issues.extend(bug_issues[:max_issues - len(issues)]) | |
| if len(page_issues) < per_page: | |
| break | |
| page += 1 | |
| # Rate limiting | |
| if not self.github_token: | |
| time.sleep(1) # Respect rate limits | |
| except Exception as e: | |
| print(f"β Error fetching from {repo}: {e}") | |
| break | |
| print(f"β Fetched {len(issues)} bug issues from {repo}") | |
| return issues | |
| def _is_bug_issue(self, issue: Dict) -> bool: | |
| """Determine if an issue is a bug (not feature request)""" | |
| title = issue.get('title', '').lower() | |
| body = issue.get('body', '').lower() | |
| labels = [label['name'].lower() for label in issue.get('labels', [])] | |
| # Bug indicators | |
| bug_keywords = [ | |
| 'bug', 'fix', 'crash', 'error', 'exception', 'fail', 'broken', | |
| 'issue', 'problem', 'defect', 'vulnerability', 'security', | |
| 'performance', 'slow', 'timeout', 'memory leak', 'race condition' | |
| ] | |
| # Feature request indicators | |
| feature_keywords = [ | |
| 'feature', 'enhancement', 'improvement', 'request', 'proposal', | |
| 'suggestion', 'idea', 'wishlist', 'roadmap' | |
| ] | |
| # Check for bug keywords | |
| has_bug_keywords = any(keyword in title or keyword in body for keyword in bug_keywords) | |
| # Check for feature keywords | |
| has_feature_keywords = any(keyword in title or keyword in body for keyword in feature_keywords) | |
| # Check labels | |
| has_bug_labels = any(label in ['bug', 'defect', 'security', 'performance'] for label in labels) | |
| has_feature_labels = any(label in ['enhancement', 'feature', 'proposal'] for label in labels) | |
| # Priority: labels > keywords | |
| if has_bug_labels and not has_feature_labels: | |
| return True | |
| elif has_feature_labels: | |
| return False | |
| elif has_bug_keywords and not has_feature_keywords: | |
| return True | |
| return False | |
| def convert_to_training_format(self, issues: List[Dict]) -> List[Dict]: | |
| """Convert GitHub issues to our training format""" | |
| print("π Converting GitHub issues to training format...") | |
| converted = [] | |
| for issue in issues: | |
| try: | |
| # Extract basic info | |
| title = issue.get('title', '') | |
| body = issue.get('body', '') | |
| labels = [label['name'] for label in issue.get('labels', [])] | |
| created_at = issue.get('created_at', '') | |
| # Skip if no meaningful content | |
| if len(title) < 10 or len(body) < 20: | |
| continue | |
| # Infer classification from content and labels | |
| severity = self._infer_severity(title, body, labels) | |
| component = self._infer_component(title, body, labels) | |
| bug_type = self._infer_bug_type(title, body, labels) | |
| team = self._infer_team(component, bug_type) | |
| priority = self._map_severity_to_priority(severity) | |
| converted_issue = { | |
| 'title': self._clean_text(title), | |
| 'description': self._clean_description(body), | |
| 'severity': severity, | |
| 'component': component, | |
| 'bug_type': bug_type, | |
| 'team': team, | |
| 'priority': priority, | |
| 'source': 'github_issues', | |
| 'original_id': str(issue.get('id', '')), | |
| 'repo': issue.get('repository', {}).get('full_name', ''), | |
| 'labels': ', '.join(labels), | |
| 'created_at': created_at | |
| } | |
| converted.append(converted_issue) | |
| except Exception as e: | |
| continue | |
| print(f"β Converted {len(converted)} issues to training format") | |
| return converted | |
| def _infer_severity(self, title: str, body: str, labels: List[str]) -> str: | |
| """Infer severity from issue content""" | |
| text = (title + ' ' + body).lower() | |
| labels_lower = [label.lower() for label in labels] | |
| # Critical indicators | |
| critical_keywords = [ | |
| 'crash', 'segfault', 'data loss', 'security breach', 'vulnerability', | |
| 'remote code execution', 'sql injection', 'xss', 'authentication bypass' | |
| ] | |
| if any(keyword in text for keyword in critical_keywords) or 'critical' in labels_lower: | |
| return 'Critical' | |
| # High indicators | |
| high_keywords = [ | |
| 'major bug', 'broken', 'fail', 'error', 'exception', 'performance issue', | |
| 'memory leak', 'race condition', 'deadlock' | |
| ] | |
| if any(keyword in text for keyword in high_keywords) or 'high' in labels_lower: | |
| return 'High' | |
| # Low indicators | |
| low_keywords = [ | |
| 'minor', 'cosmetic', 'typo', 'grammar', 'enhancement', 'improvement' | |
| ] | |
| if any(keyword in text for keyword in low_keywords) or 'low' in labels_lower: | |
| return 'Low' | |
| return 'Medium' # Default | |
| def _infer_component(self, title: str, body: str, labels: List[str]) -> str: | |
| """Infer component from issue content""" | |
| text = (title + ' ' + body).lower() | |
| labels_lower = [label.lower() for label in labels] | |
| # Frontend indicators | |
| frontend_keywords = [ | |
| 'ui', 'ux', 'frontend', 'css', 'html', 'javascript', 'react', 'vue', 'angular', | |
| 'button', 'form', 'layout', 'responsive', 'mobile-friendly', 'browser' | |
| ] | |
| if any(keyword in text for keyword in frontend_keywords) or any(label in ['ui', 'frontend'] for label in labels_lower): | |
| return 'Frontend' | |
| # Mobile indicators | |
| mobile_keywords = [ | |
| 'mobile', 'android', 'ios', 'react native', 'flutter', 'app', 'native' | |
| ] | |
| if any(keyword in text for keyword in mobile_keywords) or any(label in ['mobile', 'android', 'ios'] for label in labels_lower): | |
| return 'Mobile' | |
| # Backend indicators | |
| backend_keywords = [ | |
| 'backend', 'api', 'server', 'database', 'sql', 'authentication', 'authorization', | |
| 'middleware', 'service', 'endpoint' | |
| ] | |
| if any(keyword in text for keyword in backend_keywords) or any(label in ['backend', 'api'] for label in labels_lower): | |
| return 'Backend' | |
| return 'Backend' # Default for most issues | |
| def _infer_bug_type(self, title: str, body: str, labels: List[str]) -> str: | |
| """Infer bug type from issue content""" | |
| text = (title + ' ' + body).lower() | |
| labels_lower = [label.lower() for label in labels] | |
| # Security indicators | |
| security_keywords = [ | |
| 'security', 'vulnerability', 'xss', 'csrf', 'injection', 'authentication', | |
| 'authorization', 'permission', 'access control' | |
| ] | |
| if any(keyword in text for keyword in security_keywords) or any(label in ['security', 'vulnerability'] for label in labels_lower): | |
| return 'Security' | |
| # Performance indicators | |
| performance_keywords = [ | |
| 'performance', 'slow', 'timeout', 'memory', 'cpu', 'bottleneck', 'optimization' | |
| ] | |
| if any(keyword in text for keyword in performance_keywords) or any(label in ['performance'] for label in labels_lower): | |
| return 'Performance' | |
| # UI/UX indicators | |
| ui_keywords = [ | |
| 'ui', 'ux', 'interface', 'design', 'layout', 'visual', 'appearance', 'user experience' | |
| ] | |
| if any(keyword in text for keyword in ui_keywords) or any(label in ['ui', 'ux'] for label in labels_lower): | |
| return 'UI/UX' | |
| # Functional indicators | |
| functional_keywords = [ | |
| 'functionality', 'feature', 'logic', 'workflow', 'process', 'business logic' | |
| ] | |
| if any(keyword in text for keyword in functional_keywords): | |
| return 'Functional' | |
| return 'Functional' # Default | |
| def _infer_team(self, component: str, bug_type: str) -> str: | |
| """Infer team based on component and bug type""" | |
| if component == 'Frontend': | |
| return 'Frontend Team' | |
| elif component == 'Mobile': | |
| return 'Mobile Team' | |
| elif bug_type == 'Security': | |
| return 'Security Team' | |
| else: | |
| return 'Dev Team' | |
| def _map_severity_to_priority(self, severity: str) -> str: | |
| """Map severity to priority""" | |
| return {'Critical': 'P0', 'High': 'P1', 'Medium': 'P2', 'Low': 'P3'}.get(severity, 'P2') | |
| def _clean_text(self, text: str) -> str: | |
| """Clean and truncate text""" | |
| if not text: | |
| return "" | |
| # Remove URLs, emails, etc. | |
| text = re.sub(r'http[s]?://\S+', '[URL]', text) | |
| text = re.sub(r'\S+@\S+', '[EMAIL]', text) | |
| text = re.sub(r'#\d+', '[ISSUE]', text) | |
| # Clean whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Truncate if too long | |
| if len(text) > 100: | |
| text = text[:97] + "..." | |
| return text | |
| def _clean_description(self, text: str) -> str: | |
| """Clean and truncate description""" | |
| if not text: | |
| return "" | |
| # Remove markdown formatting | |
| text = re.sub(r'```[\s\S]*?```', '[CODE]', text) | |
| text = re.sub(r'`[^`]+`', '[CODE]', text) | |
| text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) | |
| text = re.sub(r'\*([^*]+)\*', r'\1', text) | |
| # Remove URLs, emails, etc. | |
| text = re.sub(r'http[s]?://\S+', '[URL]', text) | |
| text = re.sub(r'\S+@\S+', '[EMAIL]', text) | |
| text = re.sub(r'#\d+', '[ISSUE]', text) | |
| # Clean whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Truncate if too long | |
| if len(text) > 500: | |
| text = text[:497] + "..." | |
| return text | |
| def fetch_comprehensive_issues(self, max_issues_per_repo: int = 50) -> List[Dict]: | |
| """Fetch issues from all target repositories""" | |
| print("π COMPREHENSIVE GITHUB ISSUES COLLECTION") | |
| print("=" * 60) | |
| print(f"Target: {len(self.target_repos)} repositories") | |
| print(f"Expected: ~{len(self.target_repos) * max_issues_per_repo} issues") | |
| print() | |
| all_issues = [] | |
| for repo in self.target_repos: | |
| try: | |
| issues = self.fetch_issues_from_repo(repo, max_issues_per_repo) | |
| if issues: | |
| converted = self.convert_to_training_format(issues) | |
| all_issues.extend(converted) | |
| print(f"β {repo}: {len(converted)} issues added") | |
| else: | |
| print(f"β οΈ {repo}: No issues found") | |
| # Rate limiting | |
| if not self.github_token: | |
| time.sleep(2) | |
| except Exception as e: | |
| print(f"β {repo}: Error - {e}") | |
| continue | |
| print(f"\nπ Total issues collected: {len(all_issues)}") | |
| return all_issues | |
| def save_to_csv(self, issues: List[Dict], filename: str = 'github_issues_training_data.csv') -> Optional[str]: | |
| """Save issues to CSV""" | |
| if not issues: | |
| print("β No issues to save") | |
| return None | |
| df = pd.DataFrame(issues) | |
| df.to_csv(filename, index=False) | |
| print(f"πΎ Saved {len(issues)} issues to {filename}") | |
| return filename | |
| def main(): | |
| """Main function to fetch GitHub issues""" | |
| print("π GITHUB ISSUES DATA COLLECTOR") | |
| print("=" * 50) | |
| print("π Purpose: Collect real-world bug reports from popular GitHub repositories") | |
| print("π― Goal: Maximum precision model training") | |
| print() | |
| # Check for GitHub token | |
| github_token = input("Enter GitHub token (optional, for higher rate limits): ").strip() | |
| if not github_token: | |
| print("β οΈ No token provided - using public API (limited to 60 requests/hour)") | |
| fetcher = GitHubIssuesFetcher(github_token if github_token else None) | |
| # Fetch issues | |
| print("\nπ Starting collection...") | |
| issues = fetcher.fetch_comprehensive_issues(max_issues_per_repo=30) | |
| if issues: | |
| # Save to CSV | |
| filename = fetcher.save_to_csv(issues) | |
| print(f"\nπ SUCCESS!") | |
| print(f"π File: {filename}") | |
| print(f"π Total issues: {len(issues)}") | |
| # Show sample | |
| if issues: | |
| sample = issues[0] | |
| print(f"\nπ Sample issue:") | |
| print(f" Title: {sample['title'][:60]}...") | |
| print(f" Severity: {sample['severity']}") | |
| print(f" Component: {sample['component']}") | |
| print(f" Bug Type: {sample['bug_type']}") | |
| print(f" Team: {sample['team']}") | |
| print(f" Source: {sample['source']}") | |
| print(f"\nπ― Next steps:") | |
| print("1. Upload this CSV via UI (Data Import β CSV Import)") | |
| print("2. Train your model with real GitHub bug data") | |
| print("3. Achieve maximum precision!") | |
| return filename | |
| else: | |
| print("β No issues collected") | |
| return None | |
| if __name__ == "__main__": | |
| main() |