intellibug / github_issues_fetcher.py
ahmadw's picture
Deploy IntelliBug AI bug classifier with 75.6% accuracy
47d7700
#!/usr/bin/env python3
# github_issues_fetcher.py - Fetch real-world bug reports from GitHub
import requests
import pandas as pd
import json
import time
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class GitHubIssuesFetcher:
def __init__(self, github_token: Optional[str] = None):
self.github_token = github_token
self.session = requests.Session()
if github_token:
self.session.headers.update({
'Authorization': f'token {github_token}',
'Accept': 'application/vnd.github.v3+json'
})
# Popular repositories with diverse bug types
self.target_repos = [
# Web Development
'facebook/react',
'vuejs/vue',
'angular/angular',
'microsoft/TypeScript',
'nodejs/node',
# Mobile Development
'facebook/react-native',
'flutter/flutter',
'ionic-team/ionic',
# Backend & APIs
'expressjs/express',
'fastify/fastify',
'nestjs/nest',
'prisma/prisma',
'sequelize/sequelize',
# Security & Authentication
'auth0/auth0.js',
'nextauthjs/next-auth',
'supabase/supabase',
# UI Libraries
'mui/material-ui',
'ant-design/ant-design',
'chakra-ui/chakra-ui',
'tailwindlabs/tailwindcss',
# Testing & Quality
'jestjs/jest',
'cypress-io/cypress',
'playwright/playwright',
# DevOps & Tools
'docker/compose',
'kubernetes/kubernetes',
'hashicorp/terraform',
# Data & ML
'pandas-dev/pandas',
'numpy/numpy',
'scikit-learn/scikit-learn',
'tensorflow/tensorflow'
]
def fetch_issues_from_repo(self, repo: str, max_issues: int = 100) -> List[Dict]:
"""Fetch issues from a specific repository"""
print(f"πŸ” Fetching issues from {repo}...")
issues = []
page = 1
per_page = min(100, max_issues)
while len(issues) < max_issues:
url = f"https://api.github.com/repos/{repo}/issues"
params = {
'state': 'closed',
'sort': 'created',
'direction': 'desc',
'per_page': per_page,
'page': page
}
try:
response = self.session.get(url, params=params, timeout=30)
if response.status_code == 403:
print(f"⚠️ Rate limit hit for {repo}, skipping...")
break
elif response.status_code != 200:
print(f"❌ Error {response.status_code} for {repo}")
break
page_issues = response.json()
if not page_issues:
break
# Filter for actual bugs (not feature requests)
bug_issues = []
for issue in page_issues:
if self._is_bug_issue(issue):
bug_issues.append(issue)
issues.extend(bug_issues[:max_issues - len(issues)])
if len(page_issues) < per_page:
break
page += 1
# Rate limiting
if not self.github_token:
time.sleep(1) # Respect rate limits
except Exception as e:
print(f"❌ Error fetching from {repo}: {e}")
break
print(f"βœ… Fetched {len(issues)} bug issues from {repo}")
return issues
def _is_bug_issue(self, issue: Dict) -> bool:
"""Determine if an issue is a bug (not feature request)"""
title = issue.get('title', '').lower()
body = issue.get('body', '').lower()
labels = [label['name'].lower() for label in issue.get('labels', [])]
# Bug indicators
bug_keywords = [
'bug', 'fix', 'crash', 'error', 'exception', 'fail', 'broken',
'issue', 'problem', 'defect', 'vulnerability', 'security',
'performance', 'slow', 'timeout', 'memory leak', 'race condition'
]
# Feature request indicators
feature_keywords = [
'feature', 'enhancement', 'improvement', 'request', 'proposal',
'suggestion', 'idea', 'wishlist', 'roadmap'
]
# Check for bug keywords
has_bug_keywords = any(keyword in title or keyword in body for keyword in bug_keywords)
# Check for feature keywords
has_feature_keywords = any(keyword in title or keyword in body for keyword in feature_keywords)
# Check labels
has_bug_labels = any(label in ['bug', 'defect', 'security', 'performance'] for label in labels)
has_feature_labels = any(label in ['enhancement', 'feature', 'proposal'] for label in labels)
# Priority: labels > keywords
if has_bug_labels and not has_feature_labels:
return True
elif has_feature_labels:
return False
elif has_bug_keywords and not has_feature_keywords:
return True
return False
def convert_to_training_format(self, issues: List[Dict]) -> List[Dict]:
"""Convert GitHub issues to our training format"""
print("πŸ”„ Converting GitHub issues to training format...")
converted = []
for issue in issues:
try:
# Extract basic info
title = issue.get('title', '')
body = issue.get('body', '')
labels = [label['name'] for label in issue.get('labels', [])]
created_at = issue.get('created_at', '')
# Skip if no meaningful content
if len(title) < 10 or len(body) < 20:
continue
# Infer classification from content and labels
severity = self._infer_severity(title, body, labels)
component = self._infer_component(title, body, labels)
bug_type = self._infer_bug_type(title, body, labels)
team = self._infer_team(component, bug_type)
priority = self._map_severity_to_priority(severity)
converted_issue = {
'title': self._clean_text(title),
'description': self._clean_description(body),
'severity': severity,
'component': component,
'bug_type': bug_type,
'team': team,
'priority': priority,
'source': 'github_issues',
'original_id': str(issue.get('id', '')),
'repo': issue.get('repository', {}).get('full_name', ''),
'labels': ', '.join(labels),
'created_at': created_at
}
converted.append(converted_issue)
except Exception as e:
continue
print(f"βœ… Converted {len(converted)} issues to training format")
return converted
def _infer_severity(self, title: str, body: str, labels: List[str]) -> str:
"""Infer severity from issue content"""
text = (title + ' ' + body).lower()
labels_lower = [label.lower() for label in labels]
# Critical indicators
critical_keywords = [
'crash', 'segfault', 'data loss', 'security breach', 'vulnerability',
'remote code execution', 'sql injection', 'xss', 'authentication bypass'
]
if any(keyword in text for keyword in critical_keywords) or 'critical' in labels_lower:
return 'Critical'
# High indicators
high_keywords = [
'major bug', 'broken', 'fail', 'error', 'exception', 'performance issue',
'memory leak', 'race condition', 'deadlock'
]
if any(keyword in text for keyword in high_keywords) or 'high' in labels_lower:
return 'High'
# Low indicators
low_keywords = [
'minor', 'cosmetic', 'typo', 'grammar', 'enhancement', 'improvement'
]
if any(keyword in text for keyword in low_keywords) or 'low' in labels_lower:
return 'Low'
return 'Medium' # Default
def _infer_component(self, title: str, body: str, labels: List[str]) -> str:
"""Infer component from issue content"""
text = (title + ' ' + body).lower()
labels_lower = [label.lower() for label in labels]
# Frontend indicators
frontend_keywords = [
'ui', 'ux', 'frontend', 'css', 'html', 'javascript', 'react', 'vue', 'angular',
'button', 'form', 'layout', 'responsive', 'mobile-friendly', 'browser'
]
if any(keyword in text for keyword in frontend_keywords) or any(label in ['ui', 'frontend'] for label in labels_lower):
return 'Frontend'
# Mobile indicators
mobile_keywords = [
'mobile', 'android', 'ios', 'react native', 'flutter', 'app', 'native'
]
if any(keyword in text for keyword in mobile_keywords) or any(label in ['mobile', 'android', 'ios'] for label in labels_lower):
return 'Mobile'
# Backend indicators
backend_keywords = [
'backend', 'api', 'server', 'database', 'sql', 'authentication', 'authorization',
'middleware', 'service', 'endpoint'
]
if any(keyword in text for keyword in backend_keywords) or any(label in ['backend', 'api'] for label in labels_lower):
return 'Backend'
return 'Backend' # Default for most issues
def _infer_bug_type(self, title: str, body: str, labels: List[str]) -> str:
"""Infer bug type from issue content"""
text = (title + ' ' + body).lower()
labels_lower = [label.lower() for label in labels]
# Security indicators
security_keywords = [
'security', 'vulnerability', 'xss', 'csrf', 'injection', 'authentication',
'authorization', 'permission', 'access control'
]
if any(keyword in text for keyword in security_keywords) or any(label in ['security', 'vulnerability'] for label in labels_lower):
return 'Security'
# Performance indicators
performance_keywords = [
'performance', 'slow', 'timeout', 'memory', 'cpu', 'bottleneck', 'optimization'
]
if any(keyword in text for keyword in performance_keywords) or any(label in ['performance'] for label in labels_lower):
return 'Performance'
# UI/UX indicators
ui_keywords = [
'ui', 'ux', 'interface', 'design', 'layout', 'visual', 'appearance', 'user experience'
]
if any(keyword in text for keyword in ui_keywords) or any(label in ['ui', 'ux'] for label in labels_lower):
return 'UI/UX'
# Functional indicators
functional_keywords = [
'functionality', 'feature', 'logic', 'workflow', 'process', 'business logic'
]
if any(keyword in text for keyword in functional_keywords):
return 'Functional'
return 'Functional' # Default
def _infer_team(self, component: str, bug_type: str) -> str:
"""Infer team based on component and bug type"""
if component == 'Frontend':
return 'Frontend Team'
elif component == 'Mobile':
return 'Mobile Team'
elif bug_type == 'Security':
return 'Security Team'
else:
return 'Dev Team'
def _map_severity_to_priority(self, severity: str) -> str:
"""Map severity to priority"""
return {'Critical': 'P0', 'High': 'P1', 'Medium': 'P2', 'Low': 'P3'}.get(severity, 'P2')
def _clean_text(self, text: str) -> str:
"""Clean and truncate text"""
if not text:
return ""
# Remove URLs, emails, etc.
text = re.sub(r'http[s]?://\S+', '[URL]', text)
text = re.sub(r'\S+@\S+', '[EMAIL]', text)
text = re.sub(r'#\d+', '[ISSUE]', text)
# Clean whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Truncate if too long
if len(text) > 100:
text = text[:97] + "..."
return text
def _clean_description(self, text: str) -> str:
"""Clean and truncate description"""
if not text:
return ""
# Remove markdown formatting
text = re.sub(r'```[\s\S]*?```', '[CODE]', text)
text = re.sub(r'`[^`]+`', '[CODE]', text)
text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
text = re.sub(r'\*([^*]+)\*', r'\1', text)
# Remove URLs, emails, etc.
text = re.sub(r'http[s]?://\S+', '[URL]', text)
text = re.sub(r'\S+@\S+', '[EMAIL]', text)
text = re.sub(r'#\d+', '[ISSUE]', text)
# Clean whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Truncate if too long
if len(text) > 500:
text = text[:497] + "..."
return text
def fetch_comprehensive_issues(self, max_issues_per_repo: int = 50) -> List[Dict]:
"""Fetch issues from all target repositories"""
print("πŸš€ COMPREHENSIVE GITHUB ISSUES COLLECTION")
print("=" * 60)
print(f"Target: {len(self.target_repos)} repositories")
print(f"Expected: ~{len(self.target_repos) * max_issues_per_repo} issues")
print()
all_issues = []
for repo in self.target_repos:
try:
issues = self.fetch_issues_from_repo(repo, max_issues_per_repo)
if issues:
converted = self.convert_to_training_format(issues)
all_issues.extend(converted)
print(f"βœ… {repo}: {len(converted)} issues added")
else:
print(f"⚠️ {repo}: No issues found")
# Rate limiting
if not self.github_token:
time.sleep(2)
except Exception as e:
print(f"❌ {repo}: Error - {e}")
continue
print(f"\nπŸŽ‰ Total issues collected: {len(all_issues)}")
return all_issues
def save_to_csv(self, issues: List[Dict], filename: str = 'github_issues_training_data.csv') -> Optional[str]:
"""Save issues to CSV"""
if not issues:
print("❌ No issues to save")
return None
df = pd.DataFrame(issues)
df.to_csv(filename, index=False)
print(f"πŸ’Ύ Saved {len(issues)} issues to {filename}")
return filename
def main():
"""Main function to fetch GitHub issues"""
print("πŸš€ GITHUB ISSUES DATA COLLECTOR")
print("=" * 50)
print("πŸ“‹ Purpose: Collect real-world bug reports from popular GitHub repositories")
print("🎯 Goal: Maximum precision model training")
print()
# Check for GitHub token
github_token = input("Enter GitHub token (optional, for higher rate limits): ").strip()
if not github_token:
print("⚠️ No token provided - using public API (limited to 60 requests/hour)")
fetcher = GitHubIssuesFetcher(github_token if github_token else None)
# Fetch issues
print("\nπŸ”„ Starting collection...")
issues = fetcher.fetch_comprehensive_issues(max_issues_per_repo=30)
if issues:
# Save to CSV
filename = fetcher.save_to_csv(issues)
print(f"\nπŸŽ‰ SUCCESS!")
print(f"πŸ“ File: {filename}")
print(f"πŸ“Š Total issues: {len(issues)}")
# Show sample
if issues:
sample = issues[0]
print(f"\nπŸ“‹ Sample issue:")
print(f" Title: {sample['title'][:60]}...")
print(f" Severity: {sample['severity']}")
print(f" Component: {sample['component']}")
print(f" Bug Type: {sample['bug_type']}")
print(f" Team: {sample['team']}")
print(f" Source: {sample['source']}")
print(f"\n🎯 Next steps:")
print("1. Upload this CSV via UI (Data Import β†’ CSV Import)")
print("2. Train your model with real GitHub bug data")
print("3. Achieve maximum precision!")
return filename
else:
print("❌ No issues collected")
return None
if __name__ == "__main__":
main()