diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -5,98 +5,33 @@ import base64 import re import ast import networkx as nx -# Make sure radon is installed: pip install radon -try: - import radon.metrics as metrics - import radon.complexity as complexity -except ImportError: - print("Warning: Radon library not found. Code complexity analysis will be limited.") - # Provide dummy functions if radon is not available - class DummyRadon: - def cc_visit(self, *args, **kwargs): return 0 - def cc_visit_ast(self, *args, **kwargs): return 0 - def mi_visit(self, *args, **kwargs): return None - metrics = DummyRadon() - complexity = DummyRadon() - +import radon.metrics as metrics +import radon.complexity as complexity from datetime import datetime, timedelta from collections import defaultdict, Counter import pandas as pd -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -# Ensure IPython is available or handle its absence -try: - from IPython.display import display, Markdown, HTML - IPYTHON_AVAILABLE = True -except ImportError: - IPYTHON_AVAILABLE = False - # Define dummy display functions if not in IPython - def display(*args, **kwargs): print(*args) - def Markdown(text): print(f"--- Markdown ---\n{text}\n---------------") - def HTML(text): print(f"----- HTML -----\n{text}\n--------------") - import numpy as np -# Ensure PyGithub is installed: pip install PyGithub -try: - from github import Github, GithubException -except ImportError: - print("Warning: PyGithub library not found. Some features might be limited.") - Github = None # Set to None if not available - GithubException = Exception # Use base Exception - +from github import Github, GithubException import time -# Ensure python-dotenv is installed: pip install python-dotenv -try: - from dotenv import load_dotenv -except ImportError: - print("Warning: python-dotenv not found. .env file will not be loaded.") - def load_dotenv(): pass # Dummy function +from dotenv import load_dotenv -# Import Neo4j and Gemini libraries -# Ensure neo4j is installed: pip install neo4j -try: - from neo4j import GraphDatabase, basic_auth -except ImportError: - print("Warning: Neo4j library not found. Graph features will be disabled.") - GraphDatabase = None # Set to None - basic_auth = None +# Visualization imports +import vizro.plotly.express as px +import vizro +import vizro.models as vzm +import plotly.graph_objects as go +import gradio as gr +from pyvis.network import Network -# Ensure google-generativeai is installed: pip install google-generativeai +# Google Gemini AI (optional) try: import google.generativeai as genai + GEMINI_AVAILABLE = True except ImportError: - print("Warning: google-generativeai library not found. Gemini features will be disabled.") - genai = None # Set to None + GEMINI_AVAILABLE = False + print("Google Generative AI package not found. PR summarization feature will be disabled.") -# Import Vizro and Gradio -# Ensure vizro, vizro-plotly, plotly, gradio are installed -# pip install vizro vizro-plotly plotly gradio pandas networkx matplotlib numpy -try: - import vizro.plotly.express as px - import vizro - import vizro.models as vzm - import plotly.graph_objects as go -except ImportError: - print("Critical Error: Vizro or Plotly libraries not found. Dashboard generation will fail.") - # Define dummy classes/functions to avoid NameErrors later, though functionality will be broken - class DummyVzm: - Card = lambda **kwargs: None - Graph = lambda **kwargs: None - Page = lambda **kwargs: None - Dashboard = lambda **kwargs: type('obj', (object,), {'save': lambda self, path: print(f"Vizro not installed, cannot save to {path}")})() - vzm = DummyVzm() - px = None - go = None - vizro = None - -try: - import gradio as gr -except ImportError: - print("Critical Error: Gradio library not found. Cannot launch the UI.") - gr = None # Set to None - -# --- GitHubRepoInfo Class (Keep as provided, ensuring dependencies like PyGithub are handled) --- class GitHubRepoInfo: """Enhanced class to get comprehensive information about a GitHub repository.""" @@ -110,37 +45,23 @@ class GitHubRepoInfo: # Set up authentication if token: self.headers["Authorization"] = f"token {token}" - if Github: # Check if PyGithub was imported - try: - self.github = Github(token) - self.github.get_user().login # Test connection - except Exception as e: - print(f"Warning: Failed to initialize PyGithub with token: {e}") - self.github = Github() # Fallback to unauthenticated - else: - print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") - self.github = None # Explicitly set to None - + try: + self.github = Github(token) + self.github.get_user().login # Test connection + except Exception as e: + print(f"Warning: Failed to initialize PyGithub with token: {e}") + self.github = Github() # Fallback to unauthenticated elif os.environ.get("GITHUB_TOKEN"): self.token = os.environ.get("GITHUB_TOKEN") self.headers["Authorization"] = f"token {self.token}" - if Github: - try: - self.github = Github(self.token) - self.github.get_user().login # Test connection - except Exception as e: - print(f"Warning: Failed to initialize PyGithub with token: {e}") - self.github = Github() # Fallback to unauthenticated - else: - print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") - self.github = None + try: + self.github = Github(self.token) + self.github.get_user().login # Test connection + except Exception as e: + print(f"Warning: Failed to initialize PyGithub with token: {e}") + self.github = Github() # Fallback to unauthenticated else: - if Github: - self.github = Github() # Unauthenticated - else: - print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") - self.github = None - + self.github = Github() # Unauthenticated # Configure rate limit handling self.rate_limit_remaining = 5000 # Assume higher limit if authenticated @@ -152,54 +73,25 @@ class GitHubRepoInfo: self.rate_limit_remaining = rate_limit.core.remaining self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) except Exception as e: - # Don't print warning if self.github is None - if self.github is not None: - print(f"Warning: Could not get initial rate limit from PyGithub: {e}") - # Check rate limit via REST if PyGithub failed or wasn't used - elif self.token: - try: - response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) - if response.status_code == 200: - rate_data = response.json() - self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] - self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) - else: - print(f"Warning: Could not get initial rate limit via REST: Status {response.status_code}") - except Exception as e: - print(f"Warning: Could not get initial rate limit via REST: {e}") - + print(f"Warning: Could not get initial rate limit from PyGithub: {e}") def _check_rate_limit(self): """Check API rate limit and wait if necessary.""" - # Update rate limit info before checking - try: - response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) - if response.status_code == 200: - rate_data = response.json() - # Ensure keys exist before accessing - core_limits = rate_data.get("resources", {}).get("core", {}) - self.rate_limit_remaining = core_limits.get("remaining", self.rate_limit_remaining) # Use old value if missing - reset_timestamp = core_limits.get("reset") - if reset_timestamp: - self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp) - # No else needed, just use previous values if update fails - except Exception as e: - print(f"Warning: Failed to update rate limit info: {e}") - # Proceed with potentially outdated values - if self.rate_limit_remaining <= 10: reset_time = self.rate_limit_reset - # Use timezone-naive comparison current_time = datetime.now() if reset_time > current_time: wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer - if wait_time > 0: # Only wait if reset time is in the future - print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") - time.sleep(wait_time) - # Re-fetch rate limit after waiting - self._check_rate_limit() + print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") + time.sleep(wait_time) + # Update rate limit info after each API call + response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) + if response.status_code == 200: + rate_data = response.json() + self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] + self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) def _paginated_get(self, url, params=None, max_items=None): """Handle paginated API responses with rate limit awareness.""" @@ -208,41 +100,33 @@ class GitHubRepoInfo: items = [] page = 1 - # Use a smaller default per_page to be safer with rate limits if unauthenticated - default_per_page = 100 if self.token else 30 - per_page = min(100, params.get("per_page", default_per_page)) + per_page = min(100, params.get("per_page", 30)) params["per_page"] = per_page while True: - self._check_rate_limit() # Check before each request + self._check_rate_limit() params["page"] = page - try: - response = requests.get(url, headers=self.headers, params=params, timeout=20) # Add timeout - response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + response = requests.get(url, headers=self.headers, params=params) - page_items = response.json() - if not page_items: # No more items + if response.status_code == 200: + page_items = response.json() + if not page_items: break - items.extend(page_items) - page += 1 + items.extend(page_items) + page += 1 - # Check if we've reached the requested limit - if max_items and len(items) >= max_items: + # Check if we've reached the requested limit + if max_items and len(items) >= max_items: return items[:max_items] - # Check if we've reached the end (GitHub returns fewer items than requested) - if len(page_items) < per_page: + # Check if we've reached the end (GitHub returns fewer items than requested) + if len(page_items) < per_page: break - - except requests.exceptions.RequestException as e: - print(f"Error during paginated request to {url} (page {page}): {e}") - # Decide whether to break or retry (here we break) - break - except json.JSONDecodeError as e: - print(f"Error decoding JSON response from {url} (page {page}): {e}") - break + else: + print(f"Error {response.status_code}: {response.text}") + break return items @@ -250,90 +134,96 @@ class GitHubRepoInfo: """Get basic repository information.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}" - try: - response = requests.get(url, headers=self.headers, timeout=15) - response.raise_for_status() # Check for 4xx/5xx errors + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: return response.json() - except requests.exceptions.RequestException as e: - print(f"Error getting repository info for {owner}/{repo}: {e}") - return None # Return None on failure + else: + print(f"Error {response.status_code}: {response.text}") + return None + + def get_contributors(self, owner, repo, max_contributors=None): + """Get repository contributors with pagination support.""" + url = f"{self.base_url}/repos/{owner}/{repo}/contributors" + return self._paginated_get(url, max_items=max_contributors) - # ... (other GitHubRepoInfo methods - assume they return sensible defaults like [] or {} on failure) ... - # --- Add safe defaults to methods that might return None unexpectedly --- def get_languages(self, owner, repo): """Get languages used in the repository.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/languages" - try: - response = requests.get(url, headers=self.headers, timeout=15) - response.raise_for_status() - return response.json() - except requests.exceptions.RequestException as e: - print(f"Error getting languages for {owner}/{repo}: {e}") - return {} # Return empty dict on failure + response = requests.get(url, headers=self.headers) - def get_contributors(self, owner, repo, max_contributors=None): - """Get repository contributors with pagination support.""" - url = f"{self.base_url}/repos/{owner}/{repo}/contributors" - # _paginated_get should already handle errors and return a list - return self._paginated_get(url, max_items=max_contributors) or [] # Ensure list return + if response.status_code == 200: + return response.json() + else: + print(f"Error getting languages: {response.status_code}") + return {} def get_commits(self, owner, repo, params=None, max_commits=None): """Get commits with enhanced filtering and pagination.""" url = f"{self.base_url}/repos/{owner}/{repo}/commits" - return self._paginated_get(url, params=params, max_items=max_commits) or [] # Ensure list return - - def _get_stats_with_retry(self, url): - """Helper for stats endpoints that might return 202.""" - retries = 3 - delay = 5 # Initial delay in seconds - for i in range(retries): - self._check_rate_limit() - try: - response = requests.get(url, headers=self.headers, timeout=30) # Longer timeout for stats - if response.status_code == 200: - return response.json() - elif response.status_code == 202 and i < retries - 1: - print(f"GitHub is computing statistics for {url.split('/stats/')[1]}, waiting {delay}s and retrying ({i+1}/{retries})...") - time.sleep(delay) - delay *= 2 # Exponential backoff - continue - elif response.status_code == 204: # No content, valid response but empty data - print(f"No content (204) returned for {url.split('/stats/')[1]}. Returning empty list.") - return [] - else: - print(f"Error getting stats from {url}: Status {response.status_code}, Body: {response.text[:200]}") - return [] # Return empty list on other errors - except requests.exceptions.RequestException as e: - print(f"Request error getting stats from {url}: {e}") - return [] # Return empty list on request error - print(f"Failed to get stats from {url} after {retries} retries.") - return [] # Return empty list after all retries fail + return self._paginated_get(url, params=params, max_items=max_commits) def get_commit_activity(self, owner, repo): """Get commit activity stats for the past year.""" + self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" - return self._get_stats_with_retry(url) + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + elif response.status_code == 202: + # GitHub is computing the statistics, wait and retry + print("GitHub is computing statistics, waiting and retrying...") + time.sleep(2) + return self.get_commit_activity(owner, repo) + else: + print(f"Error getting commit activity: {response.status_code}") + return [] def get_code_frequency(self, owner, repo): """Get weekly code addition and deletion statistics.""" + self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" - return self._get_stats_with_retry(url) + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + elif response.status_code == 202: + # GitHub is computing the statistics, wait and retry + print("GitHub is computing statistics, waiting and retrying...") + time.sleep(2) + return self.get_code_frequency(owner, repo) + else: + print(f"Error getting code frequency: {response.status_code}") + return [] def get_contributor_activity(self, owner, repo): """Get contributor commit activity over time.""" + self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" - return self._get_stats_with_retry(url) + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + elif response.status_code == 202: + # GitHub is computing the statistics, wait and retry + print("GitHub is computing statistics, waiting and retrying...") + time.sleep(2) + return self.get_contributor_activity(owner, repo) + else: + print(f"Error getting contributor activity: {response.status_code}") + return [] def get_branches(self, owner, repo): """Get repository branches.""" url = f"{self.base_url}/repos/{owner}/{repo}/branches" - return self._paginated_get(url) or [] + return self._paginated_get(url) def get_releases(self, owner, repo, max_releases=None): """Get repository releases with pagination support.""" url = f"{self.base_url}/repos/{owner}/{repo}/releases" - return self._paginated_get(url, max_items=max_releases) or [] + return self._paginated_get(url, max_items=max_releases) def get_issues(self, owner, repo, state="all", max_issues=None, params=None): """Get repository issues with enhanced filtering.""" @@ -341,7 +231,54 @@ class GitHubRepoInfo: if params is None: params = {} params["state"] = state - return self._paginated_get(url, params=params, max_items=max_issues) or [] + return self._paginated_get(url, params=params, max_items=max_issues) + + def get_issue_timeline(self, owner, repo, days_back=180): + """Analyze issue creation and closing over time.""" + # Get issues including closed ones + issues = self.get_issues(owner, repo, state="all") + + # Prepare timeline data + end_date = datetime.now() + start_date = end_date - timedelta(days=days_back) + + # Initialize daily counters + date_range = pd.date_range(start=start_date, end=end_date) + created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + + # Collect issue creation and closing dates + for issue in issues: + created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') + if created_at >= start_date: + created_counts[created_at.strftime('%Y-%m-%d')] += 1 + + if issue['state'] == 'closed' and issue.get('closed_at'): + closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') + if closed_at >= start_date: + closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 + + # Calculate resolution times for closed issues + resolution_times = [] + for issue in issues: + if issue['state'] == 'closed' and issue.get('closed_at'): + created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') + closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') + resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours + resolution_times.append(resolution_time) + + # Calculate issue labels distribution + label_counts = defaultdict(int) + for issue in issues: + for label in issue.get('labels', []): + label_counts[label['name']] += 1 + + return { + 'created': created_counts, + 'closed': closed_counts, + 'resolution_times': resolution_times, + 'labels': dict(label_counts) + } def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): """Get repository pull requests with enhanced filtering.""" @@ -349,7 +286,71 @@ class GitHubRepoInfo: if params is None: params = {} params["state"] = state - return self._paginated_get(url, params=params, max_items=max_prs) or [] + return self._paginated_get(url, params=params, max_items=max_prs) + + def get_pr_timeline(self, owner, repo, days_back=180): + """Analyze PR creation, closing, and metrics over time.""" + # Get PRs including closed and merged ones + prs = self.get_pull_requests(owner, repo, state="all") + + # Prepare timeline data + end_date = datetime.now() + start_date = end_date - timedelta(days=days_back) + + # Initialize daily counters + date_range = pd.date_range(start=start_date, end=end_date) + created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + + # Track metrics + merge_times = [] + pr_sizes = [] + + # Collect PR data + for pr in prs: + created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') + if created_at >= start_date: + created_counts[created_at.strftime('%Y-%m-%d')] += 1 + + # Get PR size (additions + deletions) + if pr.get('additions') is not None and pr.get('deletions') is not None: + pr_sizes.append({ + 'additions': pr['additions'], + 'deletions': pr['deletions'], + 'total': pr['additions'] + pr['deletions'], + 'files_changed': pr.get('changed_files', 0) + }) + + # Check if PR is closed + if pr['state'] == 'closed': + closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') + if closed_at >= start_date: + closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 + + # Check if PR was merged + if pr['merged_at']: + merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') + if merged_at >= start_date: + merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 + + # Calculate time to merge + merge_time = (merged_at - created_at).total_seconds() / 3600 # hours + merge_times.append(merge_time) + + # Calculate acceptance rate + total_closed = sum(closed_counts.values()) + total_merged = sum(merged_counts.values()) + acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 + + return { + 'created': created_counts, + 'closed': closed_counts, + 'merged': merged_counts, + 'merge_times': merge_times, + 'pr_sizes': pr_sizes, + 'acceptance_rate': acceptance_rate + } def get_contents(self, owner, repo, path="", ref=None): """Get repository contents at the specified path.""" @@ -359,17 +360,13 @@ class GitHubRepoInfo: if ref: params["ref"] = ref - try: - response = requests.get(url, headers=self.headers, params=params, timeout=15) - response.raise_for_status() + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: return response.json() - except requests.exceptions.RequestException as e: - # Handle 404 specifically for contents - if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: - print(f"Contents not found at path '{path}' in {owner}/{repo}.") - else: - print(f"Error getting contents for {owner}/{repo} at path '{path}': {e}") - return [] # Return empty list on failure + else: + print(f"Error getting contents: {response.status_code}") + return [] def get_readme(self, owner, repo, ref=None): """Get repository README file.""" @@ -379,27 +376,20 @@ class GitHubRepoInfo: if ref: params["ref"] = ref - try: - response = requests.get(url, headers=self.headers, params=params, timeout=15) - response.raise_for_status() + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: data = response.json() if data.get("content"): - try: - content = base64.b64decode(data["content"]).decode("utf-8") - return { - "name": data.get("name", "README"), - "path": data.get("path", "README.md"), - "content": content - } - except (UnicodeDecodeError, base64.binascii.Error) as decode_error: - print(f"Error decoding README content: {decode_error}") - return None # Cannot decode - return None # No content key - except requests.exceptions.RequestException as e: - if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: - print(f"README not found for {owner}/{repo}.") - else: - print(f"Error getting README for {owner}/{repo}: {e}") + content = base64.b64decode(data["content"]).decode("utf-8") + return { + "name": data["name"], + "path": data["path"], + "content": content + } + return data + else: + print(f"README not found or error: {response.status_code}") return None def get_file_content(self, owner, repo, path, ref=None): @@ -410,310 +400,440 @@ class GitHubRepoInfo: if ref: params["ref"] = ref - try: - response = requests.get(url, headers=self.headers, params=params, timeout=15) - response.raise_for_status() + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: data = response.json() - if data.get("type") == "file" and data.get("content"): + if data.get("content"): try: content = base64.b64decode(data["content"]).decode("utf-8") return content - except (UnicodeDecodeError, base64.binascii.Error): - # Don't print error here, return indicator + except UnicodeDecodeError: return "[Binary file content not displayed]" - elif data.get("type") != "file": - print(f"Path '{path}' is not a file.") - return None - else: - # File exists but no content? Unlikely but handle. - return "" # Return empty string for empty file - except requests.exceptions.RequestException as e: - if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: - print(f"File not found at path '{path}' in {owner}/{repo}.") - else: - print(f"Error getting file content for {owner}/{repo}, path '{path}': {e}") - return None - - # --- Methods like is_text_file, analyze_ast, analyze_js_ts are generally okay --- - # ... (keep them as they are) ... - - # --- Ensure get_all_text_files handles errors from get_contents/get_file_content --- - def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None, _current_count=0): - """Get content of all text files in the repository (with limit).""" - if _current_count >= max_files: - return [], _current_count + return None + else: + print(f"Error getting file content: {response.status_code}") + return None - # Get contents for the current path - contents = self.get_contents(owner, repo, path, ref) # Returns [] on error - text_files = [] - file_count = _current_count + def is_text_file(self, file_path): + """Determine if a file is likely a text file based on extension.""" + text_extensions = [ + '.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', + '.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', + '.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', + '.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', + '.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', + '.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', + '.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', + '.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', + '.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' + ] - if not isinstance(contents, list): - print(f"Warning: get_contents did not return a list for path '{path}'. Skipping.") - return [], file_count + extension = os.path.splitext(file_path)[1].lower() + return extension in text_extensions + + def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): + """Recursively get repository contents with a depth limit and file count limit.""" + if current_depth >= max_depth: + return [] + + contents = self.get_contents(owner, repo, path, ref) + results = [] + file_count = 0 - # Process current directory for item in contents: if file_count >= max_files: break - # Ensure item is a dictionary and has 'type' and 'name' - if not isinstance(item, dict) or 'type' not in item or 'name' not in item: - print(f"Warning: Skipping malformed item in contents: {item}") - continue - - item_path = item.get("path") # Get path safely - if not item_path: - print(f"Warning: Skipping item with missing path: {item}") - continue + if item["type"] == "dir": + # For directories, add the directory itself and recursively get contents + dir_item = { + "type": "dir", + "name": item["name"], + "path": item["path"], + "contents": self.get_recursive_contents( + owner, repo, item["path"], max_depth, current_depth + 1, + max_files - file_count, ref + ) + } + results.append(dir_item) + else: + # For files, add the file info + results.append({ + "type": "file", + "name": item["name"], + "path": item["path"], + "size": item["size"], + "url": item["html_url"] + }) + file_count += 1 + + return results + + def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): + """Get content of all text files in the repository (with limit).""" + contents = self.get_contents(owner, repo, path, ref) + text_files = [] + file_count = 0 + # Process current directory + for item in contents: + if file_count >= max_files: + break if item["type"] == "file" and self.is_text_file(item["name"]): - content = self.get_file_content(owner, repo, item_path, ref) - # Check if content is valid text (not None or binary indicator) + content = self.get_file_content(owner, repo, item["path"], ref) if content and content != "[Binary file content not displayed]": text_files.append({ "name": item["name"], - "path": item_path, + "path": item["path"], "content": content }) file_count += 1 elif item["type"] == "dir": # Recursively get text files from subdirectories - if file_count < max_files: - try: - subdir_files, file_count = self.get_all_text_files( - owner, repo, item_path, max_files, ref, file_count - ) - text_files.extend(subdir_files) - except Exception as e_rec: - print(f"Error processing subdirectory '{item_path}': {e_rec}") - # Continue with other items in the current directory - - return text_files, file_count # Return count for recursive calls - - # --- Ensure get_documentation_files handles errors --- + subdir_files = self.get_all_text_files( + owner, repo, item["path"], max_files - file_count, ref + ) + text_files.extend(subdir_files) + file_count += len(subdir_files) + + return text_files + def get_documentation_files(self, owner, repo, ref=None): """Get documentation files from the repository.""" + # Common documentation file paths and directories doc_paths = [ - "README.md", "CONTRIBUTING.md", "CODE_OF_CONDUCT.md", "SECURITY.md", - "SUPPORT.md", # Files first - "docs", "doc", "documentation", "wiki", # Common Dirs + "docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", + "CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", + "SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", ".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" ] + doc_files = [] - # 1. Get top-level files first - root_contents = self.get_contents(owner, repo, "", ref) - if isinstance(root_contents, list): - for item in root_contents: - if isinstance(item, dict) and item.get("type") == "file" and item.get("name") in doc_paths: - path = item.get("path") - if path: - content = self.get_file_content(owner, repo, path, ref) - if content and content != "[Binary file content not displayed]": - doc_files.append({ - "name": item["name"], - "path": path, - "content": content - }) - - # 2. Check specific doc directories - doc_dirs_to_check = ["docs", "doc", "documentation", "wiki", ".github"] - for doc_dir in doc_dirs_to_check: + # Try to get each documentation file/directory + for path in doc_paths: try: - dir_contents = self.get_contents(owner, repo, doc_dir, ref) - if isinstance(dir_contents, list): # It's a directory - for item in dir_contents: - if isinstance(item, dict) and item.get("type") == "file": - item_name = item.get("name", "").lower() - item_path = item.get("path") - if item_path and item_name.endswith((".md", ".rst", ".txt")): - content = self.get_file_content(owner, repo, item_path, ref) - if content and content != "[Binary file content not displayed]": - doc_files.append({ - "name": item["name"], - "path": item_path, - "content": content - }) - except Exception as e: - print(f"Error processing documentation path '{doc_dir}': {e}") - continue # Skip this path + contents = self.get_contents(owner, repo, path, ref) + + # If it's a directory, get all markdown files in it + if isinstance(contents, list): + for item in contents: + if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): + content = self.get_file_content(owner, repo, item["path"], ref) + if content: + doc_files.append({ + "name": item["name"], + "path": item["path"], + "content": content + }) + # If it's a file, get its content + elif isinstance(contents, dict) and contents.get("type") == "file": + content = self.get_file_content(owner, repo, path, ref) + if content: + doc_files.append({ + "name": contents["name"], + "path": contents["path"], + "content": content + }) + except: + # Path doesn't exist or access issues + continue return doc_files + def analyze_ast(self, code, file_path): + """Analyze Python code using AST (Abstract Syntax Tree).""" + if not file_path.endswith('.py'): + return None - # ... (rest of GitHubRepoInfo, display methods, etc. - keep as provided but be mindful of data access in display) ... - # Add specific error handling in display methods if needed, though Gradio errors often hide underlying data issues. - def get_all_info(self, owner, repo): - """Get comprehensive information about a repository with enhanced metrics.""" - print(f"--- Fetching data for {owner}/{repo} ---") - result = { - "timestamp": datetime.now().isoformat() - } - - print("Getting basic repo info...") - basic_info = self.get_repo_info(owner, repo) - if not basic_info: - print(f"CRITICAL: Could not retrieve basic repository information for {owner}/{repo}. Aborting analysis.") - return None # Cannot proceed without basic info - result["basic_info"] = basic_info - - print("Getting languages...") - result["languages"] = self.get_languages(owner, repo) # Returns {} on error - print("Getting contributors...") - result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) # Returns [] on error - print("Getting recent commits...") - result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) # Returns [] on error - print("Getting branches...") - result["branches"] = self.get_branches(owner, repo) # Returns [] on error - print("Getting releases...") - result["releases"] = self.get_releases(owner, repo, max_releases=10) # Returns [] on error - print("Getting open issues...") - result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) # Returns [] on error - print("Getting open pull requests...") - result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) # Returns [] on error - print("Getting root contents...") - result["root_contents"] = self.get_contents(owner, repo) # Returns [] on error - - print("Analyzing repository content (README, Docs, Code Files)...") - # This relies on other methods returning sensible defaults try: - # Call get_all_text_files outside get_repo_text_summary to pass count correctly - all_text_files_content, _ = self.get_all_text_files(owner, repo, max_files=30) - # Pass the fetched content to get_repo_text_summary to avoid redundant API calls - result["text_content"] = self.get_repo_text_summary(owner, repo, pre_fetched_files=all_text_files_content) - except Exception as e: - print(f"Error during text content analysis: {e}") - result["text_content"] = {"error": str(e)} # Store error indicator + tree = ast.parse(code) + + # Extract more detailed information using AST + functions = [] + classes = [] + imports = [] + function_complexities = {} + + for node in ast.walk(tree): + # Get function definitions with arguments + if isinstance(node, ast.FunctionDef): + args = [] + defaults = len(node.args.defaults) + args_count = len(node.args.args) - defaults + + # Get positional args + for arg in node.args.args[:args_count]: + if hasattr(arg, 'arg'): # Python 3 + args.append(arg.arg) + else: # Python 2 + args.append(arg.id) + + # Get args with defaults + for i, arg in enumerate(node.args.args[args_count:]): + if hasattr(arg, 'arg'): # Python 3 + args.append(f"{arg.arg}=...") + else: # Python 2 + args.append(f"{arg.id}=...") + + # Calculate function complexity + func_complexity = complexity.cc_visit(node) + function_complexities[node.name] = func_complexity + + # Get docstring if available + docstring = ast.get_docstring(node) + + functions.append({ + 'name': node.name, + 'args': args, + 'complexity': func_complexity, + 'docstring': docstring + }) + # Get class definitions + elif isinstance(node, ast.ClassDef): + methods = [] + class_docstring = ast.get_docstring(node) + + # Get class methods + for child in node.body: + if isinstance(child, ast.FunctionDef): + method_complexity = complexity.cc_visit(child) + method_docstring = ast.get_docstring(child) + + methods.append({ + 'name': child.name, + 'complexity': method_complexity, + 'docstring': method_docstring + }) + + classes.append({ + 'name': node.name, + 'methods': methods, + 'docstring': class_docstring + }) - print("Analyzing repository activity over time...") - # This relies on stats methods returning [] on error/202 timeout - try: - result["temporal_analysis"] = self.get_temporal_analysis(owner, repo) - except Exception as e: - print(f"Error during temporal analysis: {e}") - result["temporal_analysis"] = {"error": str(e)} # Store error indicator + # Get imports + elif isinstance(node, ast.Import): + for name in node.names: + imports.append(name.name) + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + for name in node.names: + imports.append(f"{module}.{name.name}") - print(f"--- Finished fetching data for {owner}/{repo} ---") - return result + # Calculate overall code complexity + code_complexity = complexity.cc_visit_ast(tree) - # Modify get_repo_text_summary to accept pre-fetched files - def get_repo_text_summary(self, owner, repo, max_files=25, pre_fetched_files=None): - """Extract and summarize text content from the repository with improved metrics.""" - # Get README - readme = self.get_readme(owner, repo) # Returns None on error + # Calculate maintainability index + try: + mi_score = metrics.mi_visit(code, True) + except: + mi_score = None + + return { + 'functions': functions, + 'classes': classes, + 'imports': imports, + 'complexity': { + 'overall': code_complexity, + 'functions': function_complexities, + 'maintainability_index': mi_score + } + } - # Get documentation - docs = self.get_documentation_files(owner, repo) # Returns [] on error + except SyntaxError: + print(f"Syntax error in Python file: {file_path}") + return None + except Exception as e: + print(f"Error analyzing {file_path}: {str(e)}") + return None - # Get key code files if not provided - if pre_fetched_files is None: - print("Fetching text files within get_repo_text_summary...") - text_files, _ = self.get_all_text_files(owner, repo, max_files=max_files) # Returns [] on error - else: - print("Using pre-fetched text files in get_repo_text_summary.") - text_files = pre_fetched_files # Use the provided list + def analyze_js_ts(self, code, file_path): + """Analyze JavaScript/TypeScript code using regex with improved patterns.""" + if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): + return None - # Analyze code files - code_summary = {} - complexity_metrics = { - 'cyclomatic_complexity': [], - 'maintainability_index': [], - 'comment_ratios': [] + # More sophisticated regex patterns for JS/TS analysis + results = { + 'functions': [], + 'classes': [], + 'imports': [], + 'exports': [], + 'hooks': [] # For React hooks } - for file in text_files: - # Basic check for file structure - if not isinstance(file, dict) or 'name' not in file or 'content' not in file or 'path' not in file: - print(f"Skipping malformed file data in text summary: {file}") - continue + # Function patterns (covering various declaration styles) + function_patterns = [ + # Regular functions + r'function\s+(\w+)\s*\(([^)]*)\)', + # Arrow functions assigned to variables + r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', + # Class methods + r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', + # Object methods + r'(\w+)\s*:\s*function\s*\(([^)]*)\)' + ] - ext = os.path.splitext(file["name"])[1].lower() - if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: # Add other relevant code extensions if needed - try: - file_summary = self.extract_code_summary(file["content"], file["path"]) - if file_summary: # Ensure summary generation didn't fail - code_summary[file["path"]] = file_summary - - # Collect complexity metrics safely - if file_summary.get('complexity'): - cc = file_summary['complexity'].get('overall') - # Ensure cc is a number before appending - if isinstance(cc, (int, float)): - complexity_metrics['cyclomatic_complexity'].append((file["path"], cc)) - - mi = file_summary['complexity'].get('maintainability_index') - # Ensure mi is a number before appending - if isinstance(mi, (int, float)): - complexity_metrics['maintainability_index'].append((file["path"], mi)) - - if file_summary.get('metrics'): - comment_ratio = file_summary['metrics'].get('comment_ratio') - # Ensure ratio is a number before appending - if isinstance(comment_ratio, (int, float)): - complexity_metrics['comment_ratios'].append((file["path"], comment_ratio)) - except Exception as e_sum: - print(f"Error extracting code summary for {file.get('path', 'unknown file')}: {e_sum}") - - # Analyze dependencies (can be slow, consider limiting files further if needed) - # Use the already fetched text_files for dependency analysis - dependencies = self.analyze_dependencies(owner, repo, pre_fetched_code_files=text_files) + for pattern in function_patterns: + for match in re.finditer(pattern, code): + func_name = match.group(1) + args = match.group(2).strip() if len(match.groups()) > 1 else "" + results['functions'].append({ + 'name': func_name, + 'args': args + }) + + # Class pattern + class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' + for match in re.finditer(class_pattern, code, re.DOTALL): + class_name = match.group(1) + parent_class = match.group(2) if match.group(2) else None + class_body = match.group(3) + + # Find methods in class + methods = [] + method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' + for method_match in re.finditer(method_pattern, class_body): + method_name = method_match.group(1) + methods.append(method_name) + + results['classes'].append({ + 'name': class_name, + 'extends': parent_class, + 'methods': methods + }) + + # Import patterns + import_patterns = [ + # ES6 imports + r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', + # CommonJS requires + r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' + ] + for pattern in import_patterns: + for match in re.finditer(pattern, code): + groups = match.groups() + if groups[0]: # Destructured import + imports = [name.strip() for name in groups[0].split(',')] + for imp in imports: + results['imports'].append(imp) + elif groups[1]: # Namespace import (import * as X) + results['imports'].append(groups[1]) + elif groups[2]: # Default import + results['imports'].append(groups[2]) + elif groups[3]: # Module name + results['imports'].append(groups[3]) + + # React hooks detection (for React files) + if file_path.endswith(('.jsx', '.tsx')): + hook_pattern = r'use([A-Z]\w+)\s*\(' + for match in re.finditer(hook_pattern, code): + hook_name = 'use' + match.group(1) + results['hooks'].append(hook_name) + + # Export patterns + export_patterns = [ + # Named exports + r'export\s+(?:const|let|var|function|class)\s+(\w+)', + # Default exports + r'export\s+default\s+(?:function|class)?\s*(\w+)?' + ] - # Summarize repository content by file type - file_types = defaultdict(int) - for file in text_files: - if isinstance(file, dict) and 'name' in file: # Check again - ext = os.path.splitext(file["name"])[1].lower() - if ext: # Avoid counting files with no extension - file_types[ext] += 1 - - # Calculate aggregate code metrics safely - total_code_lines = 0 - total_comment_lines = 0 - analyzed_code_files = 0 - for path, summary in code_summary.items(): - if summary and summary.get('metrics'): - analyzed_code_files += 1 - total_code_lines += summary['metrics'].get('code_lines', 0) or 0 - total_comment_lines += summary['metrics'].get('comment_lines', 0) or 0 + for pattern in export_patterns: + for match in re.finditer(pattern, code): + if match.group(1): + results['exports'].append(match.group(1)) - aggregate_metrics = { - 'total_files_analyzed': len(text_files), # All text files fetched - 'code_files_summarized': analyzed_code_files, # Files where summary succeeded - 'total_code_lines': total_code_lines, - 'total_comment_lines': total_comment_lines, - 'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0 - } + return results - return { - "readme": readme, # Can be None - "documentation": docs, # Should be list - "code_summary": code_summary, # Dict of summaries - "complexity_metrics": complexity_metrics, # Dict of lists - "dependencies": dependencies, # Dict - "file_type_counts": dict(file_types), # Dict - "aggregate_metrics": aggregate_metrics, # Dict - "text_files": text_files # List of fetched files + def extract_code_summary(self, file_content, file_path): + """Extract comprehensive summary information from code files.""" + extension = os.path.splitext(file_path)[1].lower() + + # Initialize summary + summary = { + "functions": [], + "classes": [], + "imports": [], + "description": "", + "complexity": None } + # Extract Python definitions with AST + if extension == '.py': + ast_result = self.analyze_ast(file_content, file_path) + if ast_result: + summary["functions"] = [f["name"] for f in ast_result["functions"]] + summary["classes"] = [c["name"] for c in ast_result["classes"]] + summary["imports"] = ast_result["imports"] + summary["complexity"] = ast_result["complexity"] + + # Try to extract module docstring + try: + tree = ast.parse(file_content) + module_docstring = ast.get_docstring(tree) + if module_docstring: + summary["description"] = module_docstring + except: + pass + + # Add detailed function and class info + summary["detailed_functions"] = ast_result["functions"] + summary["detailed_classes"] = ast_result["classes"] + + # Extract JavaScript/TypeScript definitions + elif extension in ['.js', '.ts', '.jsx', '.tsx']: + js_result = self.analyze_js_ts(file_content, file_path) + if js_result: + summary["functions"] = [f["name"] for f in js_result["functions"]] + summary["classes"] = [c["name"] for c in js_result["classes"]] + summary["imports"] = js_result["imports"] + + # Add detailed function and class info + summary["detailed_functions"] = js_result["functions"] + summary["detailed_classes"] = js_result["classes"] + summary["hooks"] = js_result.get("hooks", []) + summary["exports"] = js_result.get("exports", []) + + # Calculate basic code metrics for any text file + if file_content: + lines = file_content.split('\n') + code_lines = 0 + comment_lines = 0 + blank_lines = 0 + + comment_prefixes = ['#', '//', '/*', '*', '