intellibug / stackoverflow_fetcher.py
ahmadw's picture
Deploy IntelliBug AI bug classifier with 75.6% accuracy
47d7700
import requests
import pandas as pd
import json
import time
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class StackOverflowFetcher:
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
self.base_url = "https://api.stackexchange.com/2.3"
self.session = requests.Session()
# Programming tags that represent real bugs
self.bug_tags = [
# Web Development Bugs
'javascript', 'reactjs', 'vue.js', 'angular', 'html', 'css',
'node.js', 'express', 'php', 'python', 'django', 'flask',
# Mobile Development Bugs
'react-native', 'flutter', 'android', 'ios', 'swift',
# Backend & Database Bugs
'sql', 'mysql', 'postgresql', 'mongodb', 'redis',
'java', 'spring', 'c#', '.net', 'ruby', 'rails',
# System & Performance Bugs
'performance', 'memory-leaks', 'multithreading', 'concurrency',
'algorithm', 'data-structures', 'optimization',
# Security Issues
'security', 'authentication', 'authorization', 'encryption',
'sql-injection', 'xss', 'csrf',
# DevOps & Infrastructure
'docker', 'kubernetes', 'aws', 'azure', 'git', 'ci-cd'
]
def fetch_questions_with_tag(self, tag: str, max_questions: int = 50) -> List[Dict]:
"""Fetch questions with a specific tag"""
print(f"πŸ” Fetching {tag} questions from Stack Overflow...")
questions = []
page = 1
page_size = min(100, max_questions)
while len(questions) < max_questions:
url = f"{self.base_url}/questions"
params = {
'tagged': tag,
'site': 'stackoverflow',
'sort': 'votes',
'order': 'desc',
'pagesize': page_size,
'page': page,
'filter': 'withbody', # Include question body
'fromdate': int((datetime.now() - timedelta(days=365)).timestamp()), # Last year
'todate': int(datetime.now().timestamp())
}
if self.api_key:
params['key'] = self.api_key
try:
response = self.session.get(url, params=params, timeout=30)
if response.status_code == 429:
print(f"⚠️ Rate limit hit for {tag}, waiting...")
time.sleep(60)
continue
elif response.status_code != 200:
print(f"❌ Error {response.status_code} for {tag}")
break
data = response.json()
if 'items' not in data or not data['items']:
break
# Filter for bug-related questions
bug_questions = []
for question in data['items']:
if self._is_bug_question(question):
bug_questions.append(question)
questions.extend(bug_questions[:max_questions - len(questions)])
if len(data['items']) < page_size:
break
page += 1
# Rate limiting
time.sleep(1)
except Exception as e:
print(f"❌ Error fetching {tag}: {e}")
break
print(f"βœ… Fetched {len(questions)} bug questions for {tag}")
return questions
def _is_bug_question(self, question: Dict) -> bool:
"""Determine if a question is about a bug/problem"""
title = question.get('title', '').lower()
body = question.get('body', '').lower()
tags = question.get('tags', [])
# Bug/problem indicators
bug_keywords = [
'bug', 'error', 'exception', 'crash', 'fail', 'broken', 'not working',
'issue', 'problem', 'fix', 'solution', 'help', 'why', 'how to fix',
'doesn\'t work', 'won\'t work', 'can\'t', 'unable to', 'failed to'
]
# Feature request indicators (exclude these)
feature_keywords = [
'how to implement', 'best way to', 'recommendation', 'suggestion',
'feature request', 'enhancement', 'improvement'
]
# Check for bug keywords
has_bug_keywords = any(keyword in title or keyword in body for keyword in bug_keywords)
# Check for feature keywords
has_feature_keywords = any(keyword in title or keyword in body for keyword in feature_keywords)
# Check score (higher score = more relevant)
score = question.get('score', 0)
# Must have bug keywords, not feature keywords, and reasonable score
return has_bug_keywords and not has_feature_keywords and score >= 0
def convert_to_training_format(self, questions: List[Dict]) -> List[Dict]:
"""Convert Stack Overflow questions to our training format"""
print("πŸ”„ Converting Stack Overflow questions to training format...")
converted = []
for question in questions:
try:
# Extract basic info
title = question.get('title', '')
body = self._extract_text_from_html(question.get('body', ''))
tags = question.get('tags', [])
score = question.get('score', 0)
answer_count = question.get('answer_count', 0)
created_date = question.get('creation_date', 0)
# Skip if no meaningful content
if len(title) < 10 or len(body) < 20:
continue
# Infer classification from content and tags
severity = self._infer_severity(title, body, tags, score)
component = self._infer_component(title, body, tags)
bug_type = self._infer_bug_type(title, body, tags)
team = self._infer_team(component, bug_type)
priority = self._map_severity_to_priority(severity)
converted_question = {
'title': self._clean_text(title),
'description': self._clean_description(body),
'severity': severity,
'component': component,
'bug_type': bug_type,
'team': team,
'priority': priority,
'source': 'stackoverflow',
'original_id': str(question.get('question_id', '')),
'tags': ', '.join(tags),
'score': score,
'answer_count': answer_count,
'created_date': created_date
}
converted.append(converted_question)
except Exception as e:
continue
print(f"βœ… Converted {len(converted)} questions to training format")
return converted
def _extract_text_from_html(self, html: str) -> str:
"""Extract plain text from HTML content"""
if not html:
return ""
# Remove HTML tags
text = re.sub(r'<[^>]+>', ' ', html)
# Decode HTML entities
text = text.replace('&amp;', '&')
text = text.replace('&lt;', '<')
text = text.replace('&gt;', '>')
text = text.replace('&quot;', '"')
text = text.replace('&#39;', "'")
# Clean whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
def _infer_severity(self, title: str, body: str, tags: List[str], score: int) -> str:
"""Infer severity from question content and score"""
text = (title + ' ' + body).lower()
tags_lower = [tag.lower() for tag in tags]
# Critical indicators
critical_keywords = [
'crash', 'segfault', 'data loss', 'security breach', 'vulnerability',
'remote code execution', 'sql injection', 'xss', 'authentication bypass',
'system down', 'production down', 'database corruption'
]
if any(keyword in text for keyword in critical_keywords) or score >= 20:
return 'Critical'
# High indicators
high_keywords = [
'major bug', 'broken', 'fail', 'error', 'exception', 'performance issue',
'memory leak', 'race condition', 'deadlock', 'not working'
]
if any(keyword in text for keyword in high_keywords) or score >= 10:
return 'High'
# Low indicators
low_keywords = [
'minor', 'cosmetic', 'typo', 'grammar', 'enhancement', 'improvement',
'suggestion', 'recommendation'
]
if any(keyword in text for keyword in low_keywords) or score <= 2:
return 'Low'
return 'Medium' # Default
def _infer_component(self, title: str, body: str, tags: List[str]) -> str:
"""Infer component from question content and tags"""
text = (title + ' ' + body).lower()
tags_lower = [tag.lower() for tag in tags]
# Frontend indicators
frontend_keywords = [
'ui', 'ux', 'frontend', 'css', 'html', 'javascript', 'react', 'vue', 'angular',
'button', 'form', 'layout', 'responsive', 'mobile-friendly', 'browser'
]
frontend_tags = ['javascript', 'reactjs', 'vue.js', 'angular', 'html', 'css']
if any(keyword in text for keyword in frontend_keywords) or any(tag in frontend_tags for tag in tags_lower):
return 'Frontend'
# Mobile indicators
mobile_keywords = [
'mobile', 'android', 'ios', 'react-native', 'flutter', 'app', 'native'
]
mobile_tags = ['react-native', 'flutter', 'android', 'ios', 'swift']
if any(keyword in text for keyword in mobile_keywords) or any(tag in mobile_tags for tag in tags_lower):
return 'Mobile'
# Backend indicators
backend_keywords = [
'backend', 'api', 'server', 'database', 'sql', 'authentication', 'authorization',
'middleware', 'service', 'endpoint'
]
backend_tags = ['node.js', 'express', 'php', 'python', 'django', 'flask', 'java', 'spring', 'c#', '.net']
if any(keyword in text for keyword in backend_keywords) or any(tag in backend_tags for tag in tags_lower):
return 'Backend'
return 'Backend' # Default for most programming questions
def _infer_bug_type(self, title: str, body: str, tags: List[str]) -> str:
"""Infer bug type from question content and tags"""
text = (title + ' ' + body).lower()
tags_lower = [tag.lower() for tag in tags]
# Security indicators
security_keywords = [
'security', 'vulnerability', 'xss', 'csrf', 'injection', 'authentication',
'authorization', 'permission', 'access control'
]
security_tags = ['security', 'authentication', 'authorization', 'sql-injection', 'xss', 'csrf']
if any(keyword in text for keyword in security_keywords) or any(tag in security_tags for tag in tags_lower):
return 'Security'
# Performance indicators
performance_keywords = [
'performance', 'slow', 'timeout', 'memory', 'cpu', 'bottleneck', 'optimization'
]
performance_tags = ['performance', 'optimization', 'memory-leaks']
if any(keyword in text for keyword in performance_keywords) or any(tag in performance_tags for tag in tags_lower):
return 'Performance'
# UI/UX indicators
ui_keywords = [
'ui', 'ux', 'interface', 'design', 'layout', 'visual', 'appearance', 'user experience'
]
if any(keyword in text for keyword in ui_keywords):
return 'UI/UX'
# Functional indicators
functional_keywords = [
'functionality', 'feature', 'logic', 'workflow', 'process', 'business logic'
]
if any(keyword in text for keyword in functional_keywords):
return 'Functional'
return 'Functional' # Default for most programming questions
def _infer_team(self, component: str, bug_type: str) -> str:
"""Infer team based on component and bug type"""
if component == 'Frontend':
return 'Frontend Team'
elif component == 'Mobile':
return 'Mobile Team'
elif bug_type == 'Security':
return 'Security Team'
else:
return 'Dev Team'
def _map_severity_to_priority(self, severity: str) -> str:
"""Map severity to priority"""
return {'Critical': 'P0', 'High': 'P1', 'Medium': 'P2', 'Low': 'P3'}.get(severity, 'P2')
def _clean_text(self, text: str) -> str:
"""Clean and truncate text"""
if not text:
return ""
# Remove URLs, code blocks, etc.
text = re.sub(r'http[s]?://\S+', '[URL]', text)
text = re.sub(r'`[^`]+`', '[CODE]', text)
text = re.sub(r'```[\s\S]*?```', '[CODE]', text)
# Clean whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Truncate if too long
if len(text) > 100:
text = text[:97] + "..."
return text
def _clean_description(self, text: str) -> str:
"""Clean and truncate description"""
if not text:
return ""
# Remove code blocks, URLs, etc.
text = re.sub(r'```[\s\S]*?```', '[CODE]', text)
text = re.sub(r'`[^`]+`', '[CODE]', text)
text = re.sub(r'http[s]?://\S+', '[URL]', text)
# Clean whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Truncate if too long
if len(text) > 500:
text = text[:497] + "..."
return text
def fetch_comprehensive_questions(self, max_questions_per_tag: int = 30) -> List[Dict]:
"""Fetch questions from all target tags"""
print("πŸš€ COMPREHENSIVE STACK OVERFLOW DATA COLLECTION")
print("=" * 60)
print(f"Target: {len(self.bug_tags)} programming tags")
print(f"Expected: ~{len(self.bug_tags) * max_questions_per_tag} questions")
print()
all_questions = []
for tag in self.bug_tags:
try:
questions = self.fetch_questions_with_tag(tag, max_questions_per_tag)
if questions:
converted = self.convert_to_training_format(questions)
all_questions.extend(converted)
print(f"βœ… {tag}: {len(converted)} questions added")
else:
print(f"⚠️ {tag}: No questions found")
# Rate limiting
time.sleep(2)
except Exception as e:
print(f"❌ {tag}: Error - {e}")
continue
print(f"\nπŸŽ‰ Total questions collected: {len(all_questions)}")
return all_questions
def save_to_csv(self, questions: List[Dict], filename: str = 'stackoverflow_training_data.csv') -> Optional[str]:
"""Save questions to CSV"""
if not questions:
print("❌ No questions to save")
return None
df = pd.DataFrame(questions)
df.to_csv(filename, index=False)
print(f"πŸ’Ύ Saved {len(questions)} questions to {filename}")
return filename
def main():
"""Main function to fetch Stack Overflow questions"""
print("πŸš€ STACK OVERFLOW DATA COLLECTOR")
print("=" * 50)
print("πŸ“‹ Purpose: Collect real-world programming problems from Stack Overflow")
print("🎯 Goal: Maximum precision model training")
print()
# Check for API key
api_key = input("Enter Stack Exchange API key (optional, for higher rate limits): ").strip()
if not api_key:
print("⚠️ No API key provided - using public API (limited to 10,000 requests/day)")
fetcher = StackOverflowFetcher(api_key if api_key else None)
# Fetch questions
print("\nπŸ”„ Starting collection...")
questions = fetcher.fetch_comprehensive_questions(max_questions_per_tag=20)
if questions:
# Save to CSV
filename = fetcher.save_to_csv(questions)
print(f"\nπŸŽ‰ SUCCESS!")
print(f"πŸ“ File: {filename}")
print(f"πŸ“Š Total questions: {len(questions)}")
# Show sample
if questions:
sample = questions[0]
print(f"\nπŸ“‹ Sample question:")
print(f" Title: {sample['title'][:60]}...")
print(f" Severity: {sample['severity']}")
print(f" Component: {sample['component']}")
print(f" Bug Type: {sample['bug_type']}")
print(f" Team: {sample['team']}")
print(f" Score: {sample['score']}")
print(f" Source: {sample['source']}")
print(f"\n🎯 Next steps:")
print("1. Upload this CSV via UI (Data Import β†’ CSV Import)")
print("2. Train your model with real Stack Overflow data")
print("3. Achieve maximum precision!")
return filename
else:
print("❌ No questions collected")
return None
if __name__ == "__main__":
main()