Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| from nicegui import ui, run, app | |
| import requests | |
| import zipfile | |
| import io | |
| import ast | |
| import json | |
| import tiktoken | |
| # --- Core Helper Functions --- | |
| def is_file_type(file_path: str, file_extension: str) -> bool: | |
| """Check if the file has the specified file extension.""" | |
| return file_path.endswith(file_extension) | |
| def is_likely_useful_file(file_path: str, lang: str = "python") -> bool: | |
| """Determine if the file is likely to be useful by excluding common non-source directories and config files.""" | |
| excluded_dirs = ["docs", "examples", "tests", "test", "scripts", "utils", "benchmarks"] | |
| utility_or_config_files = [] | |
| github_workflow_or_docs = [".github", ".gitignore", "LICENSE"] | |
| if lang == "python": | |
| excluded_dirs.append("__pycache__") | |
| utility_or_config_files.extend(["hubconf.py", "setup.py"]) | |
| github_workflow_or_docs.extend(["stale.py", "gen-card-", "write_model_card"]) | |
| elif lang == "go": | |
| excluded_dirs.append("vendor") | |
| utility_or_config_files.extend(["go.mod", "go.sum", "Makefile"]) | |
| if any(part.startswith(".") for part in file_path.split("/")): | |
| return False | |
| if "test" in file_path.lower(): | |
| return False | |
| for excluded_dir in excluded_dirs: | |
| if f"/{excluded_dir}/" in file_path or file_path.startswith(excluded_dir + "/"): | |
| return False | |
| for file_name in utility_or_config_files: | |
| if file_name in file_path: | |
| return False | |
| for doc_file in github_workflow_or_docs: | |
| if doc_file in file_path: | |
| return False | |
| return True | |
| def is_test_file(file_content: str, lang: str) -> bool: | |
| """Determine if the file content suggests it is a test file by checking for testing library imports.""" | |
| test_indicators = {"python": ["unittest", "pytest"], "go": ["testing"]}.get(lang, []) | |
| if lang == "python": | |
| try: | |
| module = ast.parse(file_content) | |
| for node in ast.walk(module): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name in test_indicators: | |
| return True | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module in test_indicators: | |
| return True | |
| except SyntaxError: | |
| pass | |
| return False | |
| # --- Token Calculation Function --- | |
| def get_token_count(text: str) -> int: | |
| """Calculates the number of tokens in a string using the cl100k_base encoding.""" | |
| try: | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| tokens = encoding.encode(text) | |
| return len(tokens) | |
| except Exception as e: | |
| print(f"Could not calculate tokens: {e}") | |
| return 0 | |
| # --- Core Processing Logic --- | |
| def download_and_process_repo(repo_url: str, branch_or_tag: str, log: ui.log) -> str | None: | |
| """ | |
| Downloads and processes files from a GitHub repository, logging progress to the UI. | |
| Returns the concatenated content as a string, or None on failure. | |
| """ | |
| download_url = f"{repo_url}/archive/refs/heads/{branch_or_tag}.zip" | |
| lang = "python" | |
| log.push(f"Attempting to download from: {download_url}") | |
| try: | |
| response = requests.get(download_url, timeout=30) | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| log.push(f"❌ Error: Failed to download the repository. {e}") | |
| return None | |
| log.push("✅ Download successful. Processing files...") | |
| zip_file = zipfile.ZipFile(io.BytesIO(response.content)) | |
| file_contents = "" | |
| all_files = zip_file.namelist() | |
| log.push(f"Found {len(all_files)} total files in the archive.") | |
| for file_path in all_files: | |
| cleaned_path = "/".join(file_path.split('/')[1:]) | |
| if not cleaned_path or file_path.endswith("/"): | |
| continue | |
| if not is_file_type(cleaned_path, ".py") or not is_likely_useful_file(cleaned_path, lang): | |
| continue | |
| try: | |
| file_content = zip_file.read(file_path).decode("utf-8") | |
| except (UnicodeDecodeError, Exception) as e: | |
| log.push(f"⚠️ Skipping (read/decode error): {cleaned_path} - {e}") | |
| continue | |
| if is_test_file(file_content, lang): | |
| log.push(f"Skipping (test file): {cleaned_path}") | |
| continue | |
| log.push(f"Appending: {cleaned_path}") | |
| file_contents += f"# File: {cleaned_path}\n" | |
| file_contents += file_content | |
| file_contents += "\n\n" | |
| log.push("✅ Processing complete.") | |
| return file_contents | |
| # --- Dedicated health check endpoint --- | |
| def health_check(): | |
| """A simple endpoint for the Hugging Face health checker.""" | |
| return "OK", 200 | |
| # --- NiceGUI User Interface Definition --- | |
| def main_page(): | |
| """Defines the layout and functionality of the web interface.""" | |
| async def process_repository(): | |
| """Handles the button click event to start processing the repository.""" | |
| # 1. Clear previous results and set loading state | |
| log.clear() | |
| output_area.set_value('') | |
| token_count_label.set_text('0') | |
| process_button.set_visibility(False) | |
| spinner.set_visibility(True) | |
| repo_url = repo_input.value | |
| branch = branch_input.value | |
| if not repo_url: | |
| ui.notify('Repository URL cannot be empty.', type='negative') | |
| process_button.set_visibility(True) | |
| spinner.set_visibility(False) | |
| return | |
| # 2. Run the blocking I/O function in a separate thread | |
| content = await run.io_bound(download_and_process_repo, repo_url, branch, log) | |
| # 3. Restore UI and display results | |
| process_button.set_visibility(True) | |
| spinner.set_visibility(False) | |
| if content is not None: | |
| output_area.set_value(content) | |
| # Calculate and display token count | |
| num_tokens = get_token_count(content) | |
| token_count_label.set_text(f'{num_tokens:,}') | |
| ui.notify('Repository processed successfully!', type='positive') | |
| else: | |
| ui.notify('Failed to process repository. Check log for details.', type='negative') | |
| async def copy_to_clipboard(text: str): | |
| """Copies the provided text to the user's clipboard.""" | |
| if not text: | |
| ui.notify('There is no content to copy.', type='warning') | |
| return | |
| await ui.run_javascript(f'navigator.clipboard.writeText({json.dumps(text)})', respond=False) | |
| ui.notify('Output copied to clipboard!', type='positive') | |
| # --- UI Layout --- | |
| ui.query('body').style('background-color: #f4f4f5;') | |
| with ui.column().classes('w-full items-center gap-4 mx-auto p-4'): | |
| ui.label('Research MAGIC GitHub Repo to Single File').classes('text-3xl font-bold mt-4') | |
| ui.label('Concatenate repository Python and Go source files into a single file for Agentic Analysis.').classes( | |
| 'text-lg text-gray-600') | |
| with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
| with ui.row().classes('w-full items-center gap-4'): | |
| repo_input = ui.input( | |
| label="GitHub Repository URL", | |
| value="https://github.com/rodrigo-masini/github2file" | |
| ).props('outlined dense').classes('flex-grow') | |
| branch_input = ui.input(label="Branch or Tag", value="master").props('outlined dense').style( | |
| 'width: 150px;') | |
| process_button = ui.button('Process Repository', on_click=process_repository).props('icon=hub') | |
| spinner = ui.spinner(size='lg').classes('absolute-center') | |
| spinner.set_visibility(False) | |
| with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
| ui.label('Processing Log').classes('text-xl font-semibold') | |
| log = ui.log().classes('w-full h-48').props('bordered') | |
| # --- New Dynamic Output Information Card --- | |
| with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
| ui.label('Output Information').classes('text-xl font-semibold') | |
| with ui.row().classes('w-full items-center'): | |
| ui.label('Total Tokens (cl100k_base):').classes('text-lg') | |
| token_count_label = ui.label('0').classes('text-lg font-mono font-bold ml-2') | |
| with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
| with ui.row().classes('w-full justify-between items-center'): | |
| ui.label('Concatenated Repo Data').classes('text-xl font-semibold') | |
| ui.button(icon='content_copy', on_click=lambda: copy_to_clipboard(output_area.value)).props( | |
| 'flat dense round') | |
| output_area = ui.textarea().classes('w-full h-96 font-mono').props( | |
| 'outlined readonly placeholder="Output will appear here..."') | |
| # NiceGUI application on port 8080 | |
| ui.run() |