Spaces:
Sleeping
Sleeping
| import requests | |
| import pandas as pd | |
| import json | |
| import time | |
| import re | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional | |
| class StackOverflowFetcher: | |
| def __init__(self, api_key: Optional[str] = None): | |
| self.api_key = api_key | |
| self.base_url = "https://api.stackexchange.com/2.3" | |
| self.session = requests.Session() | |
| # Programming tags that represent real bugs | |
| self.bug_tags = [ | |
| # Web Development Bugs | |
| 'javascript', 'reactjs', 'vue.js', 'angular', 'html', 'css', | |
| 'node.js', 'express', 'php', 'python', 'django', 'flask', | |
| # Mobile Development Bugs | |
| 'react-native', 'flutter', 'android', 'ios', 'swift', | |
| # Backend & Database Bugs | |
| 'sql', 'mysql', 'postgresql', 'mongodb', 'redis', | |
| 'java', 'spring', 'c#', '.net', 'ruby', 'rails', | |
| # System & Performance Bugs | |
| 'performance', 'memory-leaks', 'multithreading', 'concurrency', | |
| 'algorithm', 'data-structures', 'optimization', | |
| # Security Issues | |
| 'security', 'authentication', 'authorization', 'encryption', | |
| 'sql-injection', 'xss', 'csrf', | |
| # DevOps & Infrastructure | |
| 'docker', 'kubernetes', 'aws', 'azure', 'git', 'ci-cd' | |
| ] | |
| def fetch_questions_with_tag(self, tag: str, max_questions: int = 50) -> List[Dict]: | |
| """Fetch questions with a specific tag""" | |
| print(f"π Fetching {tag} questions from Stack Overflow...") | |
| questions = [] | |
| page = 1 | |
| page_size = min(100, max_questions) | |
| while len(questions) < max_questions: | |
| url = f"{self.base_url}/questions" | |
| params = { | |
| 'tagged': tag, | |
| 'site': 'stackoverflow', | |
| 'sort': 'votes', | |
| 'order': 'desc', | |
| 'pagesize': page_size, | |
| 'page': page, | |
| 'filter': 'withbody', # Include question body | |
| 'fromdate': int((datetime.now() - timedelta(days=365)).timestamp()), # Last year | |
| 'todate': int(datetime.now().timestamp()) | |
| } | |
| if self.api_key: | |
| params['key'] = self.api_key | |
| try: | |
| response = self.session.get(url, params=params, timeout=30) | |
| if response.status_code == 429: | |
| print(f"β οΈ Rate limit hit for {tag}, waiting...") | |
| time.sleep(60) | |
| continue | |
| elif response.status_code != 200: | |
| print(f"β Error {response.status_code} for {tag}") | |
| break | |
| data = response.json() | |
| if 'items' not in data or not data['items']: | |
| break | |
| # Filter for bug-related questions | |
| bug_questions = [] | |
| for question in data['items']: | |
| if self._is_bug_question(question): | |
| bug_questions.append(question) | |
| questions.extend(bug_questions[:max_questions - len(questions)]) | |
| if len(data['items']) < page_size: | |
| break | |
| page += 1 | |
| # Rate limiting | |
| time.sleep(1) | |
| except Exception as e: | |
| print(f"β Error fetching {tag}: {e}") | |
| break | |
| print(f"β Fetched {len(questions)} bug questions for {tag}") | |
| return questions | |
| def _is_bug_question(self, question: Dict) -> bool: | |
| """Determine if a question is about a bug/problem""" | |
| title = question.get('title', '').lower() | |
| body = question.get('body', '').lower() | |
| tags = question.get('tags', []) | |
| # Bug/problem indicators | |
| bug_keywords = [ | |
| 'bug', 'error', 'exception', 'crash', 'fail', 'broken', 'not working', | |
| 'issue', 'problem', 'fix', 'solution', 'help', 'why', 'how to fix', | |
| 'doesn\'t work', 'won\'t work', 'can\'t', 'unable to', 'failed to' | |
| ] | |
| # Feature request indicators (exclude these) | |
| feature_keywords = [ | |
| 'how to implement', 'best way to', 'recommendation', 'suggestion', | |
| 'feature request', 'enhancement', 'improvement' | |
| ] | |
| # Check for bug keywords | |
| has_bug_keywords = any(keyword in title or keyword in body for keyword in bug_keywords) | |
| # Check for feature keywords | |
| has_feature_keywords = any(keyword in title or keyword in body for keyword in feature_keywords) | |
| # Check score (higher score = more relevant) | |
| score = question.get('score', 0) | |
| # Must have bug keywords, not feature keywords, and reasonable score | |
| return has_bug_keywords and not has_feature_keywords and score >= 0 | |
| def convert_to_training_format(self, questions: List[Dict]) -> List[Dict]: | |
| """Convert Stack Overflow questions to our training format""" | |
| print("π Converting Stack Overflow questions to training format...") | |
| converted = [] | |
| for question in questions: | |
| try: | |
| # Extract basic info | |
| title = question.get('title', '') | |
| body = self._extract_text_from_html(question.get('body', '')) | |
| tags = question.get('tags', []) | |
| score = question.get('score', 0) | |
| answer_count = question.get('answer_count', 0) | |
| created_date = question.get('creation_date', 0) | |
| # Skip if no meaningful content | |
| if len(title) < 10 or len(body) < 20: | |
| continue | |
| # Infer classification from content and tags | |
| severity = self._infer_severity(title, body, tags, score) | |
| component = self._infer_component(title, body, tags) | |
| bug_type = self._infer_bug_type(title, body, tags) | |
| team = self._infer_team(component, bug_type) | |
| priority = self._map_severity_to_priority(severity) | |
| converted_question = { | |
| 'title': self._clean_text(title), | |
| 'description': self._clean_description(body), | |
| 'severity': severity, | |
| 'component': component, | |
| 'bug_type': bug_type, | |
| 'team': team, | |
| 'priority': priority, | |
| 'source': 'stackoverflow', | |
| 'original_id': str(question.get('question_id', '')), | |
| 'tags': ', '.join(tags), | |
| 'score': score, | |
| 'answer_count': answer_count, | |
| 'created_date': created_date | |
| } | |
| converted.append(converted_question) | |
| except Exception as e: | |
| continue | |
| print(f"β Converted {len(converted)} questions to training format") | |
| return converted | |
| def _extract_text_from_html(self, html: str) -> str: | |
| """Extract plain text from HTML content""" | |
| if not html: | |
| return "" | |
| # Remove HTML tags | |
| text = re.sub(r'<[^>]+>', ' ', html) | |
| # Decode HTML entities | |
| text = text.replace('&', '&') | |
| text = text.replace('<', '<') | |
| text = text.replace('>', '>') | |
| text = text.replace('"', '"') | |
| text = text.replace(''', "'") | |
| # Clean whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def _infer_severity(self, title: str, body: str, tags: List[str], score: int) -> str: | |
| """Infer severity from question content and score""" | |
| text = (title + ' ' + body).lower() | |
| tags_lower = [tag.lower() for tag in tags] | |
| # Critical indicators | |
| critical_keywords = [ | |
| 'crash', 'segfault', 'data loss', 'security breach', 'vulnerability', | |
| 'remote code execution', 'sql injection', 'xss', 'authentication bypass', | |
| 'system down', 'production down', 'database corruption' | |
| ] | |
| if any(keyword in text for keyword in critical_keywords) or score >= 20: | |
| return 'Critical' | |
| # High indicators | |
| high_keywords = [ | |
| 'major bug', 'broken', 'fail', 'error', 'exception', 'performance issue', | |
| 'memory leak', 'race condition', 'deadlock', 'not working' | |
| ] | |
| if any(keyword in text for keyword in high_keywords) or score >= 10: | |
| return 'High' | |
| # Low indicators | |
| low_keywords = [ | |
| 'minor', 'cosmetic', 'typo', 'grammar', 'enhancement', 'improvement', | |
| 'suggestion', 'recommendation' | |
| ] | |
| if any(keyword in text for keyword in low_keywords) or score <= 2: | |
| return 'Low' | |
| return 'Medium' # Default | |
| def _infer_component(self, title: str, body: str, tags: List[str]) -> str: | |
| """Infer component from question content and tags""" | |
| text = (title + ' ' + body).lower() | |
| tags_lower = [tag.lower() for tag in tags] | |
| # Frontend indicators | |
| frontend_keywords = [ | |
| 'ui', 'ux', 'frontend', 'css', 'html', 'javascript', 'react', 'vue', 'angular', | |
| 'button', 'form', 'layout', 'responsive', 'mobile-friendly', 'browser' | |
| ] | |
| frontend_tags = ['javascript', 'reactjs', 'vue.js', 'angular', 'html', 'css'] | |
| if any(keyword in text for keyword in frontend_keywords) or any(tag in frontend_tags for tag in tags_lower): | |
| return 'Frontend' | |
| # Mobile indicators | |
| mobile_keywords = [ | |
| 'mobile', 'android', 'ios', 'react-native', 'flutter', 'app', 'native' | |
| ] | |
| mobile_tags = ['react-native', 'flutter', 'android', 'ios', 'swift'] | |
| if any(keyword in text for keyword in mobile_keywords) or any(tag in mobile_tags for tag in tags_lower): | |
| return 'Mobile' | |
| # Backend indicators | |
| backend_keywords = [ | |
| 'backend', 'api', 'server', 'database', 'sql', 'authentication', 'authorization', | |
| 'middleware', 'service', 'endpoint' | |
| ] | |
| backend_tags = ['node.js', 'express', 'php', 'python', 'django', 'flask', 'java', 'spring', 'c#', '.net'] | |
| if any(keyword in text for keyword in backend_keywords) or any(tag in backend_tags for tag in tags_lower): | |
| return 'Backend' | |
| return 'Backend' # Default for most programming questions | |
| def _infer_bug_type(self, title: str, body: str, tags: List[str]) -> str: | |
| """Infer bug type from question content and tags""" | |
| text = (title + ' ' + body).lower() | |
| tags_lower = [tag.lower() for tag in tags] | |
| # Security indicators | |
| security_keywords = [ | |
| 'security', 'vulnerability', 'xss', 'csrf', 'injection', 'authentication', | |
| 'authorization', 'permission', 'access control' | |
| ] | |
| security_tags = ['security', 'authentication', 'authorization', 'sql-injection', 'xss', 'csrf'] | |
| if any(keyword in text for keyword in security_keywords) or any(tag in security_tags for tag in tags_lower): | |
| return 'Security' | |
| # Performance indicators | |
| performance_keywords = [ | |
| 'performance', 'slow', 'timeout', 'memory', 'cpu', 'bottleneck', 'optimization' | |
| ] | |
| performance_tags = ['performance', 'optimization', 'memory-leaks'] | |
| if any(keyword in text for keyword in performance_keywords) or any(tag in performance_tags for tag in tags_lower): | |
| return 'Performance' | |
| # UI/UX indicators | |
| ui_keywords = [ | |
| 'ui', 'ux', 'interface', 'design', 'layout', 'visual', 'appearance', 'user experience' | |
| ] | |
| if any(keyword in text for keyword in ui_keywords): | |
| return 'UI/UX' | |
| # Functional indicators | |
| functional_keywords = [ | |
| 'functionality', 'feature', 'logic', 'workflow', 'process', 'business logic' | |
| ] | |
| if any(keyword in text for keyword in functional_keywords): | |
| return 'Functional' | |
| return 'Functional' # Default for most programming questions | |
| def _infer_team(self, component: str, bug_type: str) -> str: | |
| """Infer team based on component and bug type""" | |
| if component == 'Frontend': | |
| return 'Frontend Team' | |
| elif component == 'Mobile': | |
| return 'Mobile Team' | |
| elif bug_type == 'Security': | |
| return 'Security Team' | |
| else: | |
| return 'Dev Team' | |
| def _map_severity_to_priority(self, severity: str) -> str: | |
| """Map severity to priority""" | |
| return {'Critical': 'P0', 'High': 'P1', 'Medium': 'P2', 'Low': 'P3'}.get(severity, 'P2') | |
| def _clean_text(self, text: str) -> str: | |
| """Clean and truncate text""" | |
| if not text: | |
| return "" | |
| # Remove URLs, code blocks, etc. | |
| text = re.sub(r'http[s]?://\S+', '[URL]', text) | |
| text = re.sub(r'`[^`]+`', '[CODE]', text) | |
| text = re.sub(r'```[\s\S]*?```', '[CODE]', text) | |
| # Clean whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Truncate if too long | |
| if len(text) > 100: | |
| text = text[:97] + "..." | |
| return text | |
| def _clean_description(self, text: str) -> str: | |
| """Clean and truncate description""" | |
| if not text: | |
| return "" | |
| # Remove code blocks, URLs, etc. | |
| text = re.sub(r'```[\s\S]*?```', '[CODE]', text) | |
| text = re.sub(r'`[^`]+`', '[CODE]', text) | |
| text = re.sub(r'http[s]?://\S+', '[URL]', text) | |
| # Clean whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Truncate if too long | |
| if len(text) > 500: | |
| text = text[:497] + "..." | |
| return text | |
| def fetch_comprehensive_questions(self, max_questions_per_tag: int = 30) -> List[Dict]: | |
| """Fetch questions from all target tags""" | |
| print("π COMPREHENSIVE STACK OVERFLOW DATA COLLECTION") | |
| print("=" * 60) | |
| print(f"Target: {len(self.bug_tags)} programming tags") | |
| print(f"Expected: ~{len(self.bug_tags) * max_questions_per_tag} questions") | |
| print() | |
| all_questions = [] | |
| for tag in self.bug_tags: | |
| try: | |
| questions = self.fetch_questions_with_tag(tag, max_questions_per_tag) | |
| if questions: | |
| converted = self.convert_to_training_format(questions) | |
| all_questions.extend(converted) | |
| print(f"β {tag}: {len(converted)} questions added") | |
| else: | |
| print(f"β οΈ {tag}: No questions found") | |
| # Rate limiting | |
| time.sleep(2) | |
| except Exception as e: | |
| print(f"β {tag}: Error - {e}") | |
| continue | |
| print(f"\nπ Total questions collected: {len(all_questions)}") | |
| return all_questions | |
| def save_to_csv(self, questions: List[Dict], filename: str = 'stackoverflow_training_data.csv') -> Optional[str]: | |
| """Save questions to CSV""" | |
| if not questions: | |
| print("β No questions to save") | |
| return None | |
| df = pd.DataFrame(questions) | |
| df.to_csv(filename, index=False) | |
| print(f"πΎ Saved {len(questions)} questions to {filename}") | |
| return filename | |
| def main(): | |
| """Main function to fetch Stack Overflow questions""" | |
| print("π STACK OVERFLOW DATA COLLECTOR") | |
| print("=" * 50) | |
| print("π Purpose: Collect real-world programming problems from Stack Overflow") | |
| print("π― Goal: Maximum precision model training") | |
| print() | |
| # Check for API key | |
| api_key = input("Enter Stack Exchange API key (optional, for higher rate limits): ").strip() | |
| if not api_key: | |
| print("β οΈ No API key provided - using public API (limited to 10,000 requests/day)") | |
| fetcher = StackOverflowFetcher(api_key if api_key else None) | |
| # Fetch questions | |
| print("\nπ Starting collection...") | |
| questions = fetcher.fetch_comprehensive_questions(max_questions_per_tag=20) | |
| if questions: | |
| # Save to CSV | |
| filename = fetcher.save_to_csv(questions) | |
| print(f"\nπ SUCCESS!") | |
| print(f"π File: {filename}") | |
| print(f"π Total questions: {len(questions)}") | |
| # Show sample | |
| if questions: | |
| sample = questions[0] | |
| print(f"\nπ Sample question:") | |
| print(f" Title: {sample['title'][:60]}...") | |
| print(f" Severity: {sample['severity']}") | |
| print(f" Component: {sample['component']}") | |
| print(f" Bug Type: {sample['bug_type']}") | |
| print(f" Team: {sample['team']}") | |
| print(f" Score: {sample['score']}") | |
| print(f" Source: {sample['source']}") | |
| print(f"\nπ― Next steps:") | |
| print("1. Upload this CSV via UI (Data Import β CSV Import)") | |
| print("2. Train your model with real Stack Overflow data") | |
| print("3. Achieve maximum precision!") | |
| return filename | |
| else: | |
| print("β No questions collected") | |
| return None | |
| if __name__ == "__main__": | |
| main() |