Spaces:
Runtime error
Runtime error
| # github_ai_agent.py - Improved version with parallel processing and error handling | |
| import os | |
| import re | |
| import time | |
| import json | |
| import datetime | |
| import networkx as nx | |
| from collections import defaultdict, Counter | |
| from itertools import combinations | |
| import numpy as np | |
| from typing import List, Dict, Tuple, Any, Optional, Union | |
| import concurrent.futures | |
| from functools import lru_cache | |
| import google.generativeai as genai | |
| # External libraries | |
| from github import Github, GithubException | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from gemini_integration import GeminiClient | |
| from visualization_module import RepositoryVisualizer | |
| # Configuration | |
| class Config: | |
| """Configuration for the GitHub AI Agent""" | |
| def __init__(self): | |
| self.gemini_api_key = os.environ.get("GEMINI_API_KEY") | |
| self.github_token = os.environ.get("GITHUB_ACCESS_TOKEN") | |
| self.embedding_model_name = "all-MiniLM-L6-v2" | |
| self.gemini_model = "gemini-2.0-pro-exp-02-05" | |
| self.max_files_to_load = 100 # Safety limit for large repos | |
| self.max_token_length = 64000 # Gemini Pro context limit | |
| self.enable_advanced_metrics = True | |
| self.visualization_node_limit = 150 | |
| self.cache_enabled = True | |
| self.cache_ttl = 3600 # Cache time to live in seconds | |
| # File extensions to analyze | |
| self.code_extensions = [ | |
| '.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.c', '.cpp', '.cs', | |
| '.go', '.rb', '.php', '.swift', '.kt', '.rs', '.hs', '.scala', '.ml' | |
| ] | |
| self.doc_extensions = [ | |
| '.md', '.txt', '.rst', '.html', '.xml', '.json', '.yaml', '.yml' | |
| ] | |
| # GitHub Repository Management | |
| class GitHubManager: | |
| """Manages interaction with GitHub repositories""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.g = Github(config.github_token) if config.github_token else Github() | |
| self.current_repo = None | |
| self.repo_data = {} | |
| self.file_contents = {} | |
| self.contributors_data = {} | |
| self.commit_history = [] | |
| self.issues_data = [] | |
| self.file_cache = {} # Cache for loaded files | |
| def load_repository(self, repo_url: str) -> bool: | |
| """Load a repository from URL""" | |
| try: | |
| # Extract repo name from URL | |
| repo_name = self._extract_repo_name(repo_url) | |
| if not repo_name: | |
| return False | |
| # Get repository | |
| self.current_repo = self.g.get_repo(repo_name) | |
| # Load basic repository data | |
| self.repo_data = { | |
| 'name': self.current_repo.name, | |
| 'full_name': self.current_repo.full_name, | |
| 'description': self.current_repo.description, | |
| 'stars': self.current_repo.stargazers_count, | |
| 'forks': self.current_repo.forks_count, | |
| 'watchers': self.current_repo.watchers_count, | |
| 'open_issues': self.current_repo.open_issues_count, | |
| 'created_at': self.current_repo.created_at, | |
| 'updated_at': self.current_repo.updated_at, | |
| 'default_branch': self.current_repo.default_branch, | |
| 'language': self.current_repo.language, | |
| 'topics': self.current_repo.get_topics(), | |
| 'license': self.current_repo.license.name if self.current_repo.license else None, | |
| } | |
| return True | |
| except Exception as e: | |
| print(f"Error loading repository: {e}") | |
| return False | |
| def _extract_repo_name(self, repo_url: str) -> Optional[str]: | |
| """Extract repository name from URL""" | |
| # Handle URLs like: https://github.com/username/repository | |
| github_pattern = r'github\.com[/:]([^/]+)/([^/]+)' | |
| match = re.search(github_pattern, repo_url) | |
| if match: | |
| username, repo = match.groups() | |
| # Remove .git extension if present | |
| repo = repo.replace('.git', '') | |
| return f"{username}/{repo}" | |
| return None | |
| def load_files(self) -> Dict[str, Dict]: | |
| """Load files from repository with improved performance""" | |
| if not self.current_repo: | |
| return {} | |
| try: | |
| contents = self.current_repo.get_contents("") | |
| self.file_contents = {} | |
| files_loaded = 0 | |
| batch_size = 20 # Process files in batches | |
| # Create a queue of files to process | |
| file_queue = [] | |
| # First pass - collect all file paths | |
| while contents: | |
| content_item = contents.pop(0) | |
| # Skip directories but add their contents to our processing queue | |
| if content_item.type == "dir": | |
| try: | |
| dir_contents = self.current_repo.get_contents(content_item.path) | |
| contents.extend(dir_contents) | |
| except Exception as e: | |
| print(f"Error accessing directory {content_item.path}: {e}") | |
| continue | |
| # Filter by extensions | |
| _, ext = os.path.splitext(content_item.path) | |
| if ext not in self.config.code_extensions + self.config.doc_extensions: | |
| continue | |
| # Add file to processing queue | |
| file_queue.append(content_item) | |
| # Stop if we've reached our limit | |
| if len(file_queue) >= self.config.max_files_to_load: | |
| break | |
| # Process files in batches | |
| for i in range(0, len(file_queue), batch_size): | |
| batch = file_queue[i:i+batch_size] | |
| # Process batch in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future_to_file = { | |
| executor.submit(self._process_file, file_content): file_content | |
| for file_content in batch | |
| } | |
| for future in concurrent.futures.as_completed(future_to_file): | |
| file_content = future_to_file[future] | |
| try: | |
| result = future.result() | |
| if result: | |
| self.file_contents[file_content.path] = result | |
| files_loaded += 1 | |
| except Exception as e: | |
| print(f"Error processing file {file_content.path}: {e}") | |
| return self.file_contents | |
| except Exception as e: | |
| print(f"Error loading files: {e}") | |
| return {} | |
| def _process_file(self, file_content) -> Optional[Dict]: | |
| """Process a single file (for parallel execution)""" | |
| try: | |
| # Check if in cache | |
| if file_content.path in self.file_cache: | |
| return self.file_cache[file_content.path] | |
| _, ext = os.path.splitext(file_content.path) | |
| # Only process text files with specified extensions | |
| if ext not in self.config.code_extensions + self.config.doc_extensions: | |
| return None | |
| try: | |
| # Decode content | |
| decoded_content = file_content.decoded_content.decode('utf-8') | |
| result = { | |
| 'content': decoded_content, | |
| 'type': 'code' if ext in self.config.code_extensions else 'document', | |
| 'size': file_content.size, | |
| 'ext': ext | |
| } | |
| # Update cache | |
| self.file_cache[file_content.path] = result | |
| return result | |
| except UnicodeDecodeError: | |
| # Skip binary files | |
| return None | |
| except Exception as e: | |
| print(f"Error processing file {file_content.path}: {e}") | |
| return None | |
| def load_contributors(self) -> List[Dict]: | |
| """Load repository contributors with improved performance""" | |
| if not self.current_repo: | |
| return [] | |
| try: | |
| contributors = self.current_repo.get_contributors() | |
| self.contributors_data = {} | |
| # Collect basic contributor info | |
| contributor_list = list(contributors) # Convert from PaginatedList to list | |
| # Process in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future_to_contributor = { | |
| executor.submit(self._process_contributor, contributor): contributor | |
| for contributor in contributor_list | |
| } | |
| for future in concurrent.futures.as_completed(future_to_contributor): | |
| contributor = future_to_contributor[future] | |
| try: | |
| contributor_data = future.result() | |
| if contributor_data: | |
| self.contributors_data[contributor.login] = contributor_data | |
| except Exception as e: | |
| print(f"Error processing contributor {contributor.login}: {e}") | |
| return list(self.contributors_data.values()) | |
| except Exception as e: | |
| print(f"Error loading contributors: {e}") | |
| return [] | |
| def _process_contributor(self, contributor) -> Dict: | |
| """Process a single contributor (for parallel execution)""" | |
| try: | |
| return { | |
| 'login': contributor.login, | |
| 'id': contributor.id, | |
| 'contributions': contributor.contributions, | |
| 'avatar_url': contributor.avatar_url, | |
| 'html_url': contributor.html_url, | |
| 'type': contributor.type, | |
| 'files_modified': [], | |
| 'commit_messages': [], | |
| 'activity_dates': [] | |
| } | |
| except Exception as e: | |
| print(f"Error processing contributor {contributor.login}: {e}") | |
| return None | |
| def load_commits(self, limit: int = 100) -> List[Dict]: | |
| """Load repository commits with improved performance""" | |
| if not self.current_repo: | |
| return [] | |
| try: | |
| commits = self.current_repo.get_commits()[:limit] | |
| self.commit_history = [] | |
| commits_list = list(commits) # Convert from PaginatedList to list | |
| # Process commits in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future_to_commit = { | |
| executor.submit(self._process_commit, commit): commit | |
| for commit in commits_list | |
| } | |
| for future in concurrent.futures.as_completed(future_to_commit): | |
| commit = future_to_commit[future] | |
| try: | |
| commit_data = future.result() | |
| if commit_data: | |
| self.commit_history.append(commit_data) | |
| except Exception as e: | |
| print(f"Error processing commit {commit.sha}: {e}") | |
| # Process contributor file statistics | |
| self._update_contributor_file_stats() | |
| return self.commit_history | |
| except Exception as e: | |
| print(f"Error loading commits: {e}") | |
| return [] | |
| def _process_commit(self, commit) -> Optional[Dict]: | |
| """Process a single commit (for parallel execution)""" | |
| try: | |
| # Make sure the commit date is timezone-naive | |
| commit_date = commit.commit.author.date | |
| if hasattr(commit_date, 'tzinfo') and commit_date.tzinfo: | |
| commit_date = commit_date.replace(tzinfo=None) | |
| commit_data = { | |
| 'sha': commit.sha, | |
| 'author': commit.author.login if commit.author else 'Unknown', | |
| 'date': commit_date, | |
| 'message': commit.commit.message, | |
| 'files': [] | |
| } | |
| # Get files changed in this commit | |
| try: | |
| commit_files = commit.files | |
| for file in commit_files: | |
| file_data = { | |
| 'filename': file.filename, | |
| 'additions': file.additions, | |
| 'deletions': file.deletions, | |
| 'changes': file.changes, | |
| 'status': file.status | |
| } | |
| commit_data['files'].append(file_data) | |
| # Add this file to the contributor's file list | |
| if commit.author and commit.author.login in self.contributors_data: | |
| self.contributors_data[commit.author.login]['files_modified'].append(file.filename) | |
| self.contributors_data[commit.author.login]['commit_messages'].append(commit.commit.message) | |
| self.contributors_data[commit.author.login]['activity_dates'].append(commit_date) | |
| except Exception as e: | |
| print(f"Error processing files for commit {commit.sha}: {e}") | |
| return commit_data | |
| except Exception as e: | |
| print(f"Error processing commit {commit.sha}: {e}") | |
| return None | |
| def _update_contributor_file_stats(self): | |
| """Update contributor file statistics""" | |
| for login, contributor in self.contributors_data.items(): | |
| if 'files_modified' in contributor: | |
| # Count occurrences of each file | |
| file_counts = Counter(contributor['files_modified']) | |
| # Replace list with a list of (filename, count) tuples | |
| self.contributors_data[login]['files_modified'] = [ | |
| {'filename': filename, 'count': count} | |
| for filename, count in file_counts.most_common(10) | |
| ] | |
| def load_issues(self, limit: int = 30) -> List[Dict]: | |
| """Load repository issues with improved performance""" | |
| if not self.current_repo: | |
| return [] | |
| try: | |
| issues = self.current_repo.get_issues(state='all')[:limit] | |
| self.issues_data = [] | |
| issues_list = list(issues) # Convert from PaginatedList to list | |
| # Process issues in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future_to_issue = { | |
| executor.submit(self._process_issue, issue): issue | |
| for issue in issues_list | |
| } | |
| for future in concurrent.futures.as_completed(future_to_issue): | |
| issue = future_to_issue[future] | |
| try: | |
| issue_data = future.result() | |
| if issue_data: | |
| self.issues_data.append(issue_data) | |
| except Exception as e: | |
| print(f"Error processing issue #{issue.number}: {e}") | |
| return self.issues_data | |
| except Exception as e: | |
| print(f"Error loading issues: {e}") | |
| return [] | |
| def _process_issue(self, issue) -> Optional[Dict]: | |
| """Process a single issue (for parallel execution)""" | |
| try: | |
| # Normalize datetime objects | |
| created_at = issue.created_at | |
| updated_at = issue.updated_at | |
| closed_at = issue.closed_at | |
| if hasattr(created_at, 'tzinfo') and created_at.tzinfo: | |
| created_at = created_at.replace(tzinfo=None) | |
| if hasattr(updated_at, 'tzinfo') and updated_at.tzinfo: | |
| updated_at = updated_at.replace(tzinfo=None) | |
| if hasattr(closed_at, 'tzinfo') and closed_at and closed_at.tzinfo: | |
| closed_at = closed_at.replace(tzinfo=None) | |
| issue_data = { | |
| 'number': issue.number, | |
| 'title': issue.title, | |
| 'body': issue.body, | |
| 'user': issue.user.login if issue.user else 'Unknown', | |
| 'state': issue.state, | |
| 'created_at': created_at, | |
| 'updated_at': updated_at, | |
| 'closed_at': closed_at, | |
| 'labels': [label.name for label in issue.labels], | |
| 'comments': [] | |
| } | |
| # Get comments for this issue (limited to 10) | |
| try: | |
| comments = issue.get_comments()[:10] | |
| for comment in comments: | |
| # Normalize datetime | |
| comment_created_at = comment.created_at | |
| if hasattr(comment_created_at, 'tzinfo') and comment_created_at.tzinfo: | |
| comment_created_at = comment_created_at.replace(tzinfo=None) | |
| issue_data['comments'].append({ | |
| 'user': comment.user.login if comment.user else 'Unknown', | |
| 'body': comment.body, | |
| 'created_at': comment_created_at | |
| }) | |
| except Exception as e: | |
| print(f"Error loading comments for issue #{issue.number}: {e}") | |
| return issue_data | |
| except Exception as e: | |
| print(f"Error processing issue #{issue.number}: {e}") | |
| return None | |
| # Knowledge Base and Vector Storage | |
| class KnowledgeBase: | |
| """Manages the knowledge base for the repository""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.embeddings = {} | |
| self.embedding_model = SentenceTransformer(config.embedding_model_name) | |
| self.index = None | |
| self.knowledge_graph = nx.Graph() | |
| self.insights = {} | |
| self.insights_cache = {} | |
| self.cache_timestamp = None | |
| def initialize_vector_storage(self, file_contents: Dict[str, Dict]) -> None: | |
| """Initialize vector storage with file contents and batched processing""" | |
| try: | |
| # Clear existing data | |
| self.embeddings = {} | |
| self.knowledge_graph = nx.Graph() | |
| # Process files and create embeddings | |
| texts = [] | |
| ids = [] | |
| # Process files in parallel for large repositories | |
| if len(file_contents) > 50: | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| # Process files in batches | |
| batch_size = 20 | |
| keys = list(file_contents.keys()) | |
| batches = [keys[i:i + batch_size] for i in range(0, len(keys), batch_size)] | |
| # Create a function to process a batch | |
| def process_batch(batch_keys): | |
| batch_texts = [] | |
| batch_ids = [] | |
| for path in batch_keys: | |
| file_data = file_contents[path] | |
| content = file_data['content'] | |
| # Skip very large files to avoid embedding issues | |
| if len(content) > 10000: | |
| content = content[:10000] + "..." | |
| batch_texts.append(content) | |
| batch_ids.append(path) | |
| return batch_texts, batch_ids | |
| # Submit batch processing tasks | |
| futures = [executor.submit(process_batch, batch) for batch in batches] | |
| # Collect results | |
| for future in concurrent.futures.as_completed(futures): | |
| batch_texts, batch_ids = future.result() | |
| texts.extend(batch_texts) | |
| ids.extend(batch_ids) | |
| else: | |
| # For smaller repositories, process sequentially | |
| for path, file_data in file_contents.items(): | |
| content = file_data['content'] | |
| # Skip very large files to avoid embedding issues | |
| if len(content) > 10000: | |
| content = content[:10000] + "..." | |
| texts.append(content) | |
| ids.append(path) | |
| # Add nodes to knowledge graph | |
| for path, file_data in file_contents.items(): | |
| self.knowledge_graph.add_node( | |
| path, | |
| type='file', | |
| file_type=file_data.get('type', 'unknown'), | |
| size=file_data.get('size', 0), | |
| extension=file_data.get('ext', '') | |
| ) | |
| # Create embeddings for all files | |
| if texts: | |
| # Process embeddings in batches to avoid memory issues | |
| batch_size = 32 | |
| file_embeddings = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch_texts = texts[i:i+batch_size] | |
| batch_embeddings = self.embedding_model.encode(batch_texts) | |
| file_embeddings.append(batch_embeddings) | |
| file_embeddings = np.vstack(file_embeddings) | |
| # Initialize FAISS index | |
| dimension = file_embeddings.shape[1] | |
| self.index = faiss.IndexFlatL2(dimension) | |
| self.index.add(np.array(file_embeddings).astype('float32')) | |
| # Store embeddings with their IDs | |
| for i, file_id in enumerate(ids): | |
| self.embeddings[file_id] = { | |
| 'embedding': file_embeddings[i], | |
| 'content': texts[i] | |
| } | |
| except Exception as e: | |
| print(f"Error initializing vector storage: {e}") | |
| def build_knowledge_graph(self, commits: List[Dict], contributors: Dict) -> nx.Graph: | |
| """Build knowledge graph from repository data""" | |
| try: | |
| # Add contributor nodes | |
| for login, data in contributors.items(): | |
| self.knowledge_graph.add_node( | |
| login, | |
| type='contributor', | |
| contributions=data['contributions'] | |
| ) | |
| # Add connections between contributors and files | |
| for login, data in contributors.items(): | |
| for file_data in data['files_modified']: | |
| filename = file_data['filename'] | |
| count = file_data['count'] | |
| # Only add edges if file exists in the graph | |
| if filename in self.knowledge_graph: | |
| if self.knowledge_graph.has_edge(login, filename): | |
| # Update weight if edge exists | |
| self.knowledge_graph[login][filename]['weight'] += count | |
| else: | |
| # Create new edge | |
| self.knowledge_graph.add_edge(login, filename, weight=count) | |
| # Optimized co-occurrence calculation | |
| file_co_occurrence = defaultdict(int) | |
| # Process in batches for large commit histories | |
| batch_size = 50 | |
| for i in range(0, len(commits), batch_size): | |
| batch_commits = commits[i:i+batch_size] | |
| for commit in batch_commits: | |
| # Get all files in this commit | |
| commit_files = [file['filename'] for file in commit['files']] | |
| # Add co-occurrence for each pair of files | |
| from itertools import combinations | |
| for file1, file2 in combinations(commit_files, 2): | |
| if file1 in self.knowledge_graph and file2 in self.knowledge_graph: | |
| file_pair = tuple(sorted([file1, file2])) | |
| file_co_occurrence[file_pair] += 1 | |
| # Add edges for file co-occurrence | |
| for (file1, file2), count in file_co_occurrence.items(): | |
| if count >= 2: # Only add edge if files co-occur at least twice | |
| if self.knowledge_graph.has_edge(file1, file2): | |
| self.knowledge_graph[file1][file2]['weight'] += count | |
| else: | |
| self.knowledge_graph.add_edge(file1, file2, weight=count, type='co-occurrence') | |
| return self.knowledge_graph | |
| except Exception as e: | |
| print(f"Error building knowledge graph: {e}") | |
| return nx.Graph() | |
| def search_similar_files(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """Search for files similar to query with caching""" | |
| try: | |
| if not self.index: | |
| return [] | |
| # Encode query | |
| query_embedding = self.embedding_model.encode([query]) | |
| # Search in FAISS | |
| distances, indices = self.index.search(np.array(query_embedding).astype('float32'), top_k) | |
| # Get results | |
| results = [] | |
| all_ids = list(self.embeddings.keys()) | |
| for i, idx in enumerate(indices[0]): | |
| if idx < len(all_ids): | |
| file_id = all_ids[idx] | |
| results.append({ | |
| 'file': file_id, | |
| 'distance': float(distances[0][i]), | |
| 'content': self.embeddings[file_id]['content'][:1000] + "..." if len(self.embeddings[file_id]['content']) > 1000 else self.embeddings[file_id]['content'] | |
| }) | |
| return results | |
| except Exception as e: | |
| print(f"Error searching similar files: {e}") | |
| return [] | |
| def extract_insights(self, repo_data: Dict, commits: List[Dict], contributors: Dict, issues: List[Dict]) -> Dict: | |
| """Extract insights from repository data with datetime fix and caching""" | |
| # Check if we have a recent cache (less than 10 minutes old) | |
| current_time = time.time() | |
| if self.cache_timestamp and (current_time - self.cache_timestamp < 600) and self.insights_cache: | |
| return self.insights_cache | |
| try: | |
| insights = { | |
| 'basic_stats': {}, | |
| 'activity': {}, | |
| 'contributors': {}, | |
| 'code': {}, | |
| 'issues': {} | |
| } | |
| # Make a deep copy of repo_data to avoid modifying the original | |
| repo_data_copy = {k: v for k, v in repo_data.items()} | |
| # Basic statistics | |
| insights['basic_stats'] = { | |
| 'name': repo_data_copy['name'], | |
| 'description': repo_data_copy['description'], | |
| 'stars': repo_data_copy['stars'], | |
| 'forks': repo_data_copy['forks'], | |
| 'age_days': None, # Will calculate below | |
| 'primary_language': repo_data_copy['language'], | |
| 'topics': repo_data_copy['topics'] | |
| } | |
| # Fix: Normalize datetime objects to be timezone-naive for consistent comparison | |
| created_at = repo_data_copy.get('created_at') | |
| if created_at: | |
| # Remove timezone info if present | |
| if hasattr(created_at, 'tzinfo') and created_at.tzinfo: | |
| created_at = created_at.replace(tzinfo=None) | |
| # Calculate age | |
| now = datetime.datetime.now() | |
| insights['basic_stats']['age_days'] = (now - created_at).days | |
| # Activity insights | |
| if commits: | |
| # Fix: Normalize all datetime objects to be timezone-naive | |
| commit_dates = [] | |
| for commit in commits: | |
| date = commit.get('date') | |
| if date: | |
| # Remove timezone info if present | |
| if hasattr(date, 'tzinfo') and date.tzinfo: | |
| date = date.replace(tzinfo=None) | |
| commit_dates.append(date) | |
| # Sort dates | |
| commit_dates.sort() | |
| if commit_dates: | |
| # Calculate commit frequency | |
| first_commit = commit_dates[0] | |
| last_commit = commit_dates[-1] | |
| days_span = (last_commit - first_commit).days + 1 | |
| insights['activity'] = { | |
| 'total_commits': len(commits), | |
| 'first_commit': first_commit, | |
| 'last_commit': last_commit, | |
| 'days_span': days_span, | |
| 'commits_per_day': round(len(commits) / max(days_span, 1), 2), | |
| } | |
| # Fix: Use Counter for most active day calculation | |
| date_counter = Counter(d.date() for d in commit_dates) | |
| if date_counter: | |
| insights['activity']['most_active_day'] = date_counter.most_common(1)[0][0] | |
| # Commit activity by month | |
| commit_months = [d.strftime('%Y-%m') for d in commit_dates] | |
| month_counts = Counter(commit_months) | |
| insights['activity']['monthly_activity'] = [ | |
| {'month': month, 'commits': count} for month, count in month_counts.most_common(12) | |
| ] | |
| # Contributor insights | |
| if contributors: | |
| top_contributors = sorted(contributors.values(), key=lambda x: x['contributions'], reverse=True)[:10] | |
| insights['contributors'] = { | |
| 'total_contributors': len(contributors), | |
| 'top_contributors': [ | |
| { | |
| 'login': c['login'], | |
| 'contributions': c['contributions'], | |
| 'top_files': [f['filename'] for f in c['files_modified'][:5]] if c['files_modified'] else [] | |
| } for c in top_contributors | |
| ] | |
| } | |
| # Calculate bus factor (simplified) | |
| total_commits = sum(c['contributions'] for c in contributors.values()) | |
| running_sum = 0 | |
| bus_factor = 0 | |
| for c in top_contributors: | |
| running_sum += c['contributions'] | |
| bus_factor += 1 | |
| if running_sum / total_commits > 0.5: | |
| break | |
| insights['contributors']['bus_factor'] = bus_factor | |
| # Code insights | |
| if self.knowledge_graph: | |
| # Get top connected files | |
| file_nodes = [(node, degree) for node, degree in self.knowledge_graph.degree() | |
| if self.knowledge_graph.nodes[node].get('type') == 'file'] | |
| top_files = sorted(file_nodes, key=lambda x: x[1], reverse=True)[:10] | |
| insights['code']['central_files'] = [ | |
| {'filename': filename, 'connections': degree} for filename, degree in top_files | |
| ] | |
| # Most frequently modified files from commits | |
| file_modifications = Counter() | |
| for commit in commits: | |
| for file in commit['files']: | |
| file_modifications[file['filename']] += 1 | |
| insights['code']['frequently_modified_files'] = [ | |
| {'filename': filename, 'modifications': count} | |
| for filename, count in file_modifications.most_common(10) | |
| ] | |
| # File types distribution | |
| file_types = Counter([os.path.splitext(node)[1] for node in self.knowledge_graph.nodes() | |
| if '.' in node and self.knowledge_graph.nodes[node].get('type') == 'file']) | |
| insights['code']['file_types'] = [ | |
| {'extension': ext, 'count': count} for ext, count in file_types.most_common() | |
| ] | |
| # Issue insights | |
| if issues: | |
| # Calculate issue statistics | |
| open_issues = [issue for issue in issues if issue['state'] == 'open'] | |
| closed_issues = [issue for issue in issues if issue['state'] == 'closed'] | |
| insights['issues'] = { | |
| 'total_issues': len(issues), | |
| 'open_issues': len(open_issues), | |
| 'closed_issues': len(closed_issues), | |
| 'resolution_rate': round(len(closed_issues) / max(len(issues), 1), 2) | |
| } | |
| # Calculate average time to close | |
| close_times = [] | |
| for issue in closed_issues: | |
| if issue['created_at'] and issue['closed_at']: | |
| # Fix: Normalize datetime objects to be timezone-naive | |
| created_at = issue['created_at'] | |
| closed_at = issue['closed_at'] | |
| if hasattr(created_at, 'tzinfo') and created_at.tzinfo: | |
| created_at = created_at.replace(tzinfo=None) | |
| if hasattr(closed_at, 'tzinfo') and closed_at.tzinfo: | |
| closed_at = closed_at.replace(tzinfo=None) | |
| close_time = (closed_at - created_at).days | |
| close_times.append(close_time) | |
| if close_times: | |
| insights['issues']['avg_days_to_close'] = round(sum(close_times) / len(close_times), 1) | |
| # Top issue labels | |
| issue_labels = [label for issue in issues for label in issue['labels']] | |
| label_counts = Counter(issue_labels) | |
| insights['issues']['top_labels'] = [ | |
| {'label': label, 'count': count} for label, count in label_counts.most_common(5) | |
| ] | |
| # Update cache | |
| self.insights_cache = insights | |
| self.cache_timestamp = current_time | |
| self.insights = insights | |
| return insights | |
| except Exception as e: | |
| import traceback | |
| print(f"Error extracting insights: {e}") | |
| print(traceback.format_exc()) | |
| return {} | |
| # Main GitHub AI Agent Class | |
| class GitHubAIAgent: | |
| """Main class for GitHub AI Agent""" | |
| def __init__(self): | |
| self.config = Config() | |
| self.github_manager = None | |
| self.knowledge_base = None | |
| self.gemini_client = None | |
| self.visualization_manager = None | |
| self.repository_loaded = False | |
| self.repository_url = "" | |
| self.repository_analysis = {} | |
| self.visualizations = {} | |
| # Initialize caches | |
| self.file_cache = {} | |
| self.contributor_cache = {} | |
| self.commit_cache = {} | |
| self.issue_cache = {} | |
| self.query_cache = {} | |
| def set_api_keys(self, gemini_api_key: str, github_token: str = None) -> None: | |
| """Set API keys""" | |
| # Set environment variables | |
| os.environ["GEMINI_API_KEY"] = gemini_api_key | |
| if github_token: | |
| os.environ["GITHUB_ACCESS_TOKEN"] = github_token | |
| # Update config | |
| self.config.gemini_api_key = gemini_api_key | |
| self.config.github_token = github_token | |
| # Initialize clients | |
| self.github_manager = GitHubManager(self.config) | |
| self.knowledge_base = KnowledgeBase(self.config) | |
| self.gemini_client = GeminiClient(self.config.gemini_api_key, self.config.gemini_model) | |
| self.visualization_manager = RepositoryVisualizer(self.config) | |
| def load_repository(self, repository_url: str) -> Dict: | |
| """Load and analyze a GitHub repository with improved parallelization""" | |
| result = { | |
| 'success': False, | |
| 'message': '', | |
| 'repo_data': {}, | |
| 'file_count': 0, | |
| 'contributor_count': 0 | |
| } | |
| try: | |
| # Reset state | |
| self.repository_loaded = False | |
| self.repository_url = "" | |
| self.repository_analysis = {} | |
| self.visualizations = {} | |
| # Load repository basic info | |
| print(f"Loading repository: {repository_url}") | |
| repo_loaded = self.github_manager.load_repository(repository_url) | |
| if not repo_loaded: | |
| result['message'] = "Failed to load repository. Check the URL and your GitHub access token." | |
| return result | |
| # Store repository URL | |
| self.repository_url = repository_url | |
| # Use parallel processing for loading repository data | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| # Submit tasks | |
| files_future = executor.submit(self.github_manager.load_files) | |
| contributors_future = executor.submit(self.github_manager.load_contributors) | |
| commits_future = executor.submit(self.github_manager.load_commits) | |
| issues_future = executor.submit(self.github_manager.load_issues) | |
| # Get results | |
| files = files_future.result() | |
| contributors = contributors_future.result() | |
| commits = commits_future.result() | |
| issues = issues_future.result() | |
| result['file_count'] = len(files) | |
| result['contributor_count'] = len(contributors) | |
| # Initialize vector storage and build knowledge graph | |
| # (These are kept sequential as they depend on previous steps) | |
| print("Building knowledge base") | |
| self.knowledge_base.initialize_vector_storage(files) | |
| knowledge_graph = self.knowledge_base.build_knowledge_graph( | |
| commits, self.github_manager.contributors_data | |
| ) | |
| # Extract repository insights | |
| print("Extracting repository insights") | |
| insights = self.knowledge_base.extract_insights( | |
| self.github_manager.repo_data, | |
| commits, | |
| self.github_manager.contributors_data, | |
| issues | |
| ) | |
| # Use a separate thread for Gemini analysis which can be slower | |
| # and doesn't block the main thread | |
| def analyze_with_gemini(): | |
| print("Analyzing repository with Gemini") | |
| return self.gemini_client.analyze_repository( | |
| self.github_manager.repo_data, | |
| files, | |
| commits, | |
| self.github_manager.contributors_data, | |
| insights | |
| ) | |
| # Use another thread pool for visualization generation | |
| def create_visualizations(): | |
| print("Creating repository visualizations") | |
| repo_graph_path = self.visualization_manager.create_repository_graph(knowledge_graph) | |
| activity_chart_path = self.visualization_manager.create_commit_activity_chart(commits) | |
| contributor_network_path = self.visualization_manager.create_contributor_network( | |
| self.github_manager.contributors_data, commits | |
| ) | |
| dependency_graph_path = self.visualization_manager.create_file_dependency_graph(files) | |
| return { | |
| 'repository_graph': repo_graph_path, | |
| 'activity_chart': activity_chart_path, | |
| 'contributor_network': contributor_network_path, | |
| 'dependency_graph': dependency_graph_path, | |
| } | |
| # Run Gemini analysis and visualization generation in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| analysis_future = executor.submit(analyze_with_gemini) | |
| viz_future = executor.submit(create_visualizations) | |
| # Get results | |
| self.repository_analysis = analysis_future.result() | |
| self.visualizations = viz_future.result() | |
| # Update result | |
| result['success'] = True | |
| result['message'] = f"Successfully loaded and analyzed repository: {self.github_manager.repo_data['full_name']}" | |
| result['repo_data'] = self.github_manager.repo_data | |
| self.repository_loaded = True | |
| return result | |
| except Exception as e: | |
| import traceback | |
| print(f"Error loading repository: {str(e)}") | |
| print(traceback.format_exc()) | |
| result['message'] = f"Error loading repository: {str(e)}" | |
| return result | |
| def answer_query(self, query: str) -> Dict: | |
| """Answer a natural language query about the repository with caching""" | |
| if not self.repository_loaded: | |
| return { | |
| 'success': False, | |
| 'message': "No repository loaded. Please load a repository first.", | |
| 'answer': "" | |
| } | |
| # Check cache if enabled | |
| cache_key = f"query_{hash(query)}" | |
| if self.config.cache_enabled and cache_key in self.query_cache: | |
| cached_result = self.query_cache[cache_key] | |
| # Check if cache is still valid | |
| if time.time() - cached_result['timestamp'] < self.config.cache_ttl: | |
| return cached_result['result'] | |
| try: | |
| # Search for relevant files | |
| similar_files = self.knowledge_base.search_similar_files(query) | |
| # Get answer from Gemini | |
| answer = self.gemini_client.answer_query( | |
| query, | |
| self.github_manager.repo_data, | |
| similar_files, | |
| self.knowledge_base.insights | |
| ) | |
| result = { | |
| 'success': True, | |
| 'message': "Query answered successfully", | |
| 'answer': answer, | |
| 'relevant_files': [f['file'] for f in similar_files] | |
| } | |
| # Update cache | |
| if self.config.cache_enabled: | |
| self.query_cache[cache_key] = { | |
| 'result': result, | |
| 'timestamp': time.time() | |
| } | |
| return result | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'message': f"Error answering query: {str(e)}", | |
| 'answer': "" | |
| } | |
| def analyze_code(self, file_path: str = "", code_snippet: str = "", language: str = "") -> Dict: | |
| """Analyze a code file or snippet with improved error handling""" | |
| if not file_path and not code_snippet: | |
| return { | |
| 'success': False, | |
| 'message': "Please provide a file path or code snippet", | |
| 'analysis': "" | |
| } | |
| try: | |
| # If file path provided, get code from repository | |
| if file_path: | |
| if not self.repository_loaded: | |
| return { | |
| 'success': False, | |
| 'message': "No repository loaded. Please load a repository first.", | |
| 'analysis': "" | |
| } | |
| if file_path not in self.github_manager.file_contents: | |
| return { | |
| 'success': False, | |
| 'message': f"File not found: {file_path}", | |
| 'analysis': "" | |
| } | |
| code = self.github_manager.file_contents[file_path]['content'] | |
| _, ext = os.path.splitext(file_path) | |
| language = ext.lstrip('.') | |
| else: | |
| code = code_snippet | |
| # Analyze code with Gemini | |
| analysis = self.gemini_client.analyze_code_snippet(code, language) | |
| return { | |
| 'success': True, | |
| 'message': "Code analyzed successfully", | |
| 'analysis': analysis | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'message': f"Error analyzing code: {str(e)}", | |
| 'analysis': "" | |
| } | |
| def find_collaborators(self, requirements: str) -> Dict: | |
| """Find potential collaborators based on requirements""" | |
| if not self.repository_loaded: | |
| return { | |
| 'success': False, | |
| 'message': "No repository loaded. Please load a repository first.", | |
| 'collaborators': [] | |
| } | |
| try: | |
| # Find collaborators with Gemini | |
| collaborators = self.gemini_client.identify_potential_collaborators( | |
| self.github_manager.contributors_data, | |
| self.knowledge_base.insights, | |
| requirements | |
| ) | |
| return { | |
| 'success': True, | |
| 'message': "Potential collaborators identified", | |
| 'collaborators': collaborators | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'message': f"Error finding collaborators: {str(e)}", | |
| 'collaborators': [] | |
| } | |
| def get_repository_insights(self) -> Dict: | |
| """Get insights about the repository""" | |
| if not self.repository_loaded: | |
| return { | |
| 'success': False, | |
| 'message': "No repository loaded. Please load a repository first.", | |
| 'insights': {} | |
| } | |
| try: | |
| return { | |
| 'success': True, | |
| 'message': "Repository insights retrieved", | |
| 'insights': self.knowledge_base.insights, | |
| 'analysis': self.repository_analysis | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'message': f"Error getting repository insights: {str(e)}", | |
| 'insights': {} | |
| } | |
| def get_visualizations(self) -> Dict: | |
| """Get repository visualizations""" | |
| if not self.repository_loaded: | |
| return { | |
| 'success': False, | |
| 'message': "No repository loaded. Please load a repository first.", | |
| 'visualizations': {} | |
| } | |
| return { | |
| 'success': True, | |
| 'message': "Repository visualizations retrieved", | |
| 'visualizations': self.visualizations | |
| } | |
| def clear_caches(self) -> None: | |
| """Clear all caches""" | |
| self.file_cache.clear() | |
| self.contributor_cache.clear() | |
| self.commit_cache.clear() | |
| self.issue_cache.clear() | |
| self.query_cache.clear() | |
| # Clear LRU caches | |
| self.answer_query.cache_clear() | |
| if hasattr(self.knowledge_base, 'search_similar_files'): | |
| self.knowledge_base.search_similar_files.cache_clear() |