import requests import pandas as pd import json import time import re from datetime import datetime, timedelta from typing import List, Dict, Optional class StackOverflowFetcher: def __init__(self, api_key: Optional[str] = None): self.api_key = api_key self.base_url = "https://api.stackexchange.com/2.3" self.session = requests.Session() # Programming tags that represent real bugs self.bug_tags = [ # Web Development Bugs 'javascript', 'reactjs', 'vue.js', 'angular', 'html', 'css', 'node.js', 'express', 'php', 'python', 'django', 'flask', # Mobile Development Bugs 'react-native', 'flutter', 'android', 'ios', 'swift', # Backend & Database Bugs 'sql', 'mysql', 'postgresql', 'mongodb', 'redis', 'java', 'spring', 'c#', '.net', 'ruby', 'rails', # System & Performance Bugs 'performance', 'memory-leaks', 'multithreading', 'concurrency', 'algorithm', 'data-structures', 'optimization', # Security Issues 'security', 'authentication', 'authorization', 'encryption', 'sql-injection', 'xss', 'csrf', # DevOps & Infrastructure 'docker', 'kubernetes', 'aws', 'azure', 'git', 'ci-cd' ] def fetch_questions_with_tag(self, tag: str, max_questions: int = 50) -> List[Dict]: """Fetch questions with a specific tag""" print(f"šŸ” Fetching {tag} questions from Stack Overflow...") questions = [] page = 1 page_size = min(100, max_questions) while len(questions) < max_questions: url = f"{self.base_url}/questions" params = { 'tagged': tag, 'site': 'stackoverflow', 'sort': 'votes', 'order': 'desc', 'pagesize': page_size, 'page': page, 'filter': 'withbody', # Include question body 'fromdate': int((datetime.now() - timedelta(days=365)).timestamp()), # Last year 'todate': int(datetime.now().timestamp()) } if self.api_key: params['key'] = self.api_key try: response = self.session.get(url, params=params, timeout=30) if response.status_code == 429: print(f"āš ļø Rate limit hit for {tag}, waiting...") time.sleep(60) continue elif response.status_code != 200: print(f"āŒ Error {response.status_code} for {tag}") break data = response.json() if 'items' not in data or not data['items']: break # Filter for bug-related questions bug_questions = [] for question in data['items']: if self._is_bug_question(question): bug_questions.append(question) questions.extend(bug_questions[:max_questions - len(questions)]) if len(data['items']) < page_size: break page += 1 # Rate limiting time.sleep(1) except Exception as e: print(f"āŒ Error fetching {tag}: {e}") break print(f"āœ… Fetched {len(questions)} bug questions for {tag}") return questions def _is_bug_question(self, question: Dict) -> bool: """Determine if a question is about a bug/problem""" title = question.get('title', '').lower() body = question.get('body', '').lower() tags = question.get('tags', []) # Bug/problem indicators bug_keywords = [ 'bug', 'error', 'exception', 'crash', 'fail', 'broken', 'not working', 'issue', 'problem', 'fix', 'solution', 'help', 'why', 'how to fix', 'doesn\'t work', 'won\'t work', 'can\'t', 'unable to', 'failed to' ] # Feature request indicators (exclude these) feature_keywords = [ 'how to implement', 'best way to', 'recommendation', 'suggestion', 'feature request', 'enhancement', 'improvement' ] # Check for bug keywords has_bug_keywords = any(keyword in title or keyword in body for keyword in bug_keywords) # Check for feature keywords has_feature_keywords = any(keyword in title or keyword in body for keyword in feature_keywords) # Check score (higher score = more relevant) score = question.get('score', 0) # Must have bug keywords, not feature keywords, and reasonable score return has_bug_keywords and not has_feature_keywords and score >= 0 def convert_to_training_format(self, questions: List[Dict]) -> List[Dict]: """Convert Stack Overflow questions to our training format""" print("šŸ”„ Converting Stack Overflow questions to training format...") converted = [] for question in questions: try: # Extract basic info title = question.get('title', '') body = self._extract_text_from_html(question.get('body', '')) tags = question.get('tags', []) score = question.get('score', 0) answer_count = question.get('answer_count', 0) created_date = question.get('creation_date', 0) # Skip if no meaningful content if len(title) < 10 or len(body) < 20: continue # Infer classification from content and tags severity = self._infer_severity(title, body, tags, score) component = self._infer_component(title, body, tags) bug_type = self._infer_bug_type(title, body, tags) team = self._infer_team(component, bug_type) priority = self._map_severity_to_priority(severity) converted_question = { 'title': self._clean_text(title), 'description': self._clean_description(body), 'severity': severity, 'component': component, 'bug_type': bug_type, 'team': team, 'priority': priority, 'source': 'stackoverflow', 'original_id': str(question.get('question_id', '')), 'tags': ', '.join(tags), 'score': score, 'answer_count': answer_count, 'created_date': created_date } converted.append(converted_question) except Exception as e: continue print(f"āœ… Converted {len(converted)} questions to training format") return converted def _extract_text_from_html(self, html: str) -> str: """Extract plain text from HTML content""" if not html: return "" # Remove HTML tags text = re.sub(r'<[^>]+>', ' ', html) # Decode HTML entities text = text.replace('&', '&') text = text.replace('<', '<') text = text.replace('>', '>') text = text.replace('"', '"') text = text.replace(''', "'") # Clean whitespace text = re.sub(r'\s+', ' ', text).strip() return text def _infer_severity(self, title: str, body: str, tags: List[str], score: int) -> str: """Infer severity from question content and score""" text = (title + ' ' + body).lower() tags_lower = [tag.lower() for tag in tags] # Critical indicators critical_keywords = [ 'crash', 'segfault', 'data loss', 'security breach', 'vulnerability', 'remote code execution', 'sql injection', 'xss', 'authentication bypass', 'system down', 'production down', 'database corruption' ] if any(keyword in text for keyword in critical_keywords) or score >= 20: return 'Critical' # High indicators high_keywords = [ 'major bug', 'broken', 'fail', 'error', 'exception', 'performance issue', 'memory leak', 'race condition', 'deadlock', 'not working' ] if any(keyword in text for keyword in high_keywords) or score >= 10: return 'High' # Low indicators low_keywords = [ 'minor', 'cosmetic', 'typo', 'grammar', 'enhancement', 'improvement', 'suggestion', 'recommendation' ] if any(keyword in text for keyword in low_keywords) or score <= 2: return 'Low' return 'Medium' # Default def _infer_component(self, title: str, body: str, tags: List[str]) -> str: """Infer component from question content and tags""" text = (title + ' ' + body).lower() tags_lower = [tag.lower() for tag in tags] # Frontend indicators frontend_keywords = [ 'ui', 'ux', 'frontend', 'css', 'html', 'javascript', 'react', 'vue', 'angular', 'button', 'form', 'layout', 'responsive', 'mobile-friendly', 'browser' ] frontend_tags = ['javascript', 'reactjs', 'vue.js', 'angular', 'html', 'css'] if any(keyword in text for keyword in frontend_keywords) or any(tag in frontend_tags for tag in tags_lower): return 'Frontend' # Mobile indicators mobile_keywords = [ 'mobile', 'android', 'ios', 'react-native', 'flutter', 'app', 'native' ] mobile_tags = ['react-native', 'flutter', 'android', 'ios', 'swift'] if any(keyword in text for keyword in mobile_keywords) or any(tag in mobile_tags for tag in tags_lower): return 'Mobile' # Backend indicators backend_keywords = [ 'backend', 'api', 'server', 'database', 'sql', 'authentication', 'authorization', 'middleware', 'service', 'endpoint' ] backend_tags = ['node.js', 'express', 'php', 'python', 'django', 'flask', 'java', 'spring', 'c#', '.net'] if any(keyword in text for keyword in backend_keywords) or any(tag in backend_tags for tag in tags_lower): return 'Backend' return 'Backend' # Default for most programming questions def _infer_bug_type(self, title: str, body: str, tags: List[str]) -> str: """Infer bug type from question content and tags""" text = (title + ' ' + body).lower() tags_lower = [tag.lower() for tag in tags] # Security indicators security_keywords = [ 'security', 'vulnerability', 'xss', 'csrf', 'injection', 'authentication', 'authorization', 'permission', 'access control' ] security_tags = ['security', 'authentication', 'authorization', 'sql-injection', 'xss', 'csrf'] if any(keyword in text for keyword in security_keywords) or any(tag in security_tags for tag in tags_lower): return 'Security' # Performance indicators performance_keywords = [ 'performance', 'slow', 'timeout', 'memory', 'cpu', 'bottleneck', 'optimization' ] performance_tags = ['performance', 'optimization', 'memory-leaks'] if any(keyword in text for keyword in performance_keywords) or any(tag in performance_tags for tag in tags_lower): return 'Performance' # UI/UX indicators ui_keywords = [ 'ui', 'ux', 'interface', 'design', 'layout', 'visual', 'appearance', 'user experience' ] if any(keyword in text for keyword in ui_keywords): return 'UI/UX' # Functional indicators functional_keywords = [ 'functionality', 'feature', 'logic', 'workflow', 'process', 'business logic' ] if any(keyword in text for keyword in functional_keywords): return 'Functional' return 'Functional' # Default for most programming questions def _infer_team(self, component: str, bug_type: str) -> str: """Infer team based on component and bug type""" if component == 'Frontend': return 'Frontend Team' elif component == 'Mobile': return 'Mobile Team' elif bug_type == 'Security': return 'Security Team' else: return 'Dev Team' def _map_severity_to_priority(self, severity: str) -> str: """Map severity to priority""" return {'Critical': 'P0', 'High': 'P1', 'Medium': 'P2', 'Low': 'P3'}.get(severity, 'P2') def _clean_text(self, text: str) -> str: """Clean and truncate text""" if not text: return "" # Remove URLs, code blocks, etc. text = re.sub(r'http[s]?://\S+', '[URL]', text) text = re.sub(r'`[^`]+`', '[CODE]', text) text = re.sub(r'```[\s\S]*?```', '[CODE]', text) # Clean whitespace text = re.sub(r'\s+', ' ', text).strip() # Truncate if too long if len(text) > 100: text = text[:97] + "..." return text def _clean_description(self, text: str) -> str: """Clean and truncate description""" if not text: return "" # Remove code blocks, URLs, etc. text = re.sub(r'```[\s\S]*?```', '[CODE]', text) text = re.sub(r'`[^`]+`', '[CODE]', text) text = re.sub(r'http[s]?://\S+', '[URL]', text) # Clean whitespace text = re.sub(r'\s+', ' ', text).strip() # Truncate if too long if len(text) > 500: text = text[:497] + "..." return text def fetch_comprehensive_questions(self, max_questions_per_tag: int = 30) -> List[Dict]: """Fetch questions from all target tags""" print("šŸš€ COMPREHENSIVE STACK OVERFLOW DATA COLLECTION") print("=" * 60) print(f"Target: {len(self.bug_tags)} programming tags") print(f"Expected: ~{len(self.bug_tags) * max_questions_per_tag} questions") print() all_questions = [] for tag in self.bug_tags: try: questions = self.fetch_questions_with_tag(tag, max_questions_per_tag) if questions: converted = self.convert_to_training_format(questions) all_questions.extend(converted) print(f"āœ… {tag}: {len(converted)} questions added") else: print(f"āš ļø {tag}: No questions found") # Rate limiting time.sleep(2) except Exception as e: print(f"āŒ {tag}: Error - {e}") continue print(f"\nšŸŽ‰ Total questions collected: {len(all_questions)}") return all_questions def save_to_csv(self, questions: List[Dict], filename: str = 'stackoverflow_training_data.csv') -> Optional[str]: """Save questions to CSV""" if not questions: print("āŒ No questions to save") return None df = pd.DataFrame(questions) df.to_csv(filename, index=False) print(f"šŸ’¾ Saved {len(questions)} questions to {filename}") return filename def main(): """Main function to fetch Stack Overflow questions""" print("šŸš€ STACK OVERFLOW DATA COLLECTOR") print("=" * 50) print("šŸ“‹ Purpose: Collect real-world programming problems from Stack Overflow") print("šŸŽÆ Goal: Maximum precision model training") print() # Check for API key api_key = input("Enter Stack Exchange API key (optional, for higher rate limits): ").strip() if not api_key: print("āš ļø No API key provided - using public API (limited to 10,000 requests/day)") fetcher = StackOverflowFetcher(api_key if api_key else None) # Fetch questions print("\nšŸ”„ Starting collection...") questions = fetcher.fetch_comprehensive_questions(max_questions_per_tag=20) if questions: # Save to CSV filename = fetcher.save_to_csv(questions) print(f"\nšŸŽ‰ SUCCESS!") print(f"šŸ“ File: {filename}") print(f"šŸ“Š Total questions: {len(questions)}") # Show sample if questions: sample = questions[0] print(f"\nšŸ“‹ Sample question:") print(f" Title: {sample['title'][:60]}...") print(f" Severity: {sample['severity']}") print(f" Component: {sample['component']}") print(f" Bug Type: {sample['bug_type']}") print(f" Team: {sample['team']}") print(f" Score: {sample['score']}") print(f" Source: {sample['source']}") print(f"\nšŸŽÆ Next steps:") print("1. Upload this CSV via UI (Data Import → CSV Import)") print("2. Train your model with real Stack Overflow data") print("3. Achieve maximum precision!") return filename else: print("āŒ No questions collected") return None if __name__ == "__main__": main()