Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from pathlib import Path | |
| import subprocess | |
| import requests | |
| import json | |
| from datetime import datetime | |
| import textwrap | |
| # Metadata | |
| CURRENT_TIME = "2025-05-22 22:42:10" | |
| CURRENT_USER = "ErRickow" | |
| # Ollama API settings | |
| OLLAMA_API = os.environ["OLLAMA_API"] | |
| # Default available models | |
| DEFAULT_MODELS = [ | |
| "llama2", | |
| "codellama", | |
| "mistral", | |
| "neural-chat", | |
| "starling-lm", | |
| "dolphin-phi", | |
| "phi", | |
| "orca-mini" | |
| ] | |
| def check_ollama_status(): | |
| try: | |
| response = requests.get(f"{OLLAMA_API}/api/tags", timeout=10) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def list_available_models(): | |
| try: | |
| response = requests.get(f"{OLLAMA_API}/api/tags") | |
| installed_models = [model['name'] for model in response.json().get('models', [])] | |
| # Combine installed and default models | |
| all_models = list(set(installed_models + DEFAULT_MODELS)) | |
| return sorted(all_models) # Sort for better presentation | |
| except: | |
| return sorted(DEFAULT_MODELS) | |
| def download_model(model_name): | |
| if not model_name: | |
| return "Please select a model to download" | |
| print(f"Starting download of model: {model_name}") | |
| try: | |
| headers = { | |
| "Content-Type": "application/json", | |
| } | |
| response = requests.post( | |
| f"{OLLAMA_API}/api/pull", | |
| headers=headers, | |
| json={"name": model_name}, | |
| stream=True | |
| ) | |
| if response.status_code == 200: | |
| for line in response.iter_lines(): | |
| if line: | |
| print(f"Download progress: {line.decode()}") | |
| return f"Successfully downloaded model: {model_name}" | |
| else: | |
| error_msg = f"Failed to download model. Status: {response.status_code}" | |
| print(error_msg) | |
| return error_msg | |
| except Exception as e: | |
| error_msg = f"Error downloading model: {str(e)}" | |
| print(error_msg) | |
| return error_msg | |
| def clone_repository(repo_url, github_token, branch=None): | |
| """Clone a repository with authentication""" | |
| repo_name = repo_url.split('/')[-1].replace('.git', '') | |
| print(f"Cloning repository: {repo_url} to {repo_name}") | |
| if os.path.exists(repo_name): | |
| print(f"Removing existing repository: {repo_name}") | |
| subprocess.run(['rm', '-rf', repo_name], check=True) | |
| try: | |
| owner_repo = '/'.join(repo_url.split('/')[-2:]) | |
| auth_url = f"https://{github_token}@github.com/{owner_repo}" | |
| cmd = ['git', 'clone'] | |
| if branch: | |
| cmd.extend(['--branch', branch]) | |
| cmd.append(auth_url) | |
| process = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0') | |
| ) | |
| if process.returncode == 0: | |
| print(f"Successfully cloned repository: {repo_name}") | |
| return True, repo_name | |
| else: | |
| print(f"Failed to clone repository: {process.stderr}") | |
| return False, process.stderr | |
| except Exception as e: | |
| error_msg = f"Error cloning repository: {str(e)}" | |
| print(error_msg) | |
| return False, error_msg | |
| def analyze_with_ollama(model_name, text): | |
| """Process text with Ollama model""" | |
| print(f"\nAnalyzing with {model_name}...") | |
| try: | |
| payload = { | |
| "model": model_name, | |
| "prompt": text, | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.7, | |
| "top_p": 0.9, | |
| "max_tokens": 2048, | |
| "stop": None | |
| } | |
| } | |
| print("Sending request to Ollama API...") | |
| response = requests.post( | |
| f"{OLLAMA_API}/api/generate", | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| print(f"Response status: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| if 'response' in result: | |
| print("Got response from model") | |
| return result['response'] | |
| else: | |
| print("Unexpected response format:", result) | |
| return "Error: Unexpected response format from model" | |
| else: | |
| error_msg = f"API Error {response.status_code}: {response.text}" | |
| print(error_msg) | |
| return error_msg | |
| except Exception as e: | |
| error_msg = f"Error processing with model: {str(e)}" | |
| print(error_msg) | |
| return error_msg | |
| def chunk_text(text, max_length=4000): | |
| return textwrap.wrap(text, max_length, break_long_words=False, break_on_hyphens=False) | |
| def read_file_safely(file_path): | |
| encodings = ['utf-8', 'latin-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| content = f.read() | |
| print(f"Successfully read file with {encoding} encoding") | |
| return True, content | |
| except UnicodeDecodeError: | |
| continue | |
| except Exception as e: | |
| error_msg = f"Error reading file: {str(e)}" | |
| print(error_msg) | |
| return False, error_msg | |
| return False, "Unable to read file with supported encodings" | |
| def create_ui(): | |
| with gr.Blocks(title="Ollama Repository Analyzer") as app: | |
| gr.Markdown(f""" | |
| # Ollama Repository Analyzer | |
| Current Time: {CURRENT_TIME} | |
| User: {CURRENT_USER} | |
| """) | |
| with gr.Tab("Model Management"): | |
| model_status = gr.Textbox(label="Ollama Status", interactive=False) | |
| available_models = gr.Dropdown( | |
| label="Available Models", | |
| choices=DEFAULT_MODELS, | |
| interactive=True | |
| ) | |
| download_button = gr.Button("Download Selected Model") | |
| download_status = gr.Textbox(label="Download Status", interactive=False) | |
| def update_status(): | |
| status = "Connected" if check_ollama_status() else "Not Connected" | |
| models = list_available_models() | |
| return status, gr.Dropdown(choices=models) | |
| download_button.click( | |
| fn=download_model, | |
| inputs=[available_models], | |
| outputs=[download_status] | |
| ) | |
| with gr.Tab("Repository Analysis"): | |
| repo_url = gr.Textbox(label="Repository URL") | |
| github_token = gr.Textbox(label="GitHub Token", type="password") | |
| branch = gr.Textbox(label="Branch (optional)") | |
| clone_button = gr.Button("Clone Repository") | |
| clone_status = gr.Textbox(label="Clone Status", interactive=False) | |
| with gr.Row(): | |
| file_list = gr.Dropdown(label="Files in Repository", multiselect=True) | |
| selected_model = gr.Dropdown( | |
| label="Select Model for Analysis", | |
| choices=DEFAULT_MODELS, | |
| interactive=True | |
| ) | |
| analyze_button = gr.Button("Analyze Selected Files") | |
| debug_output = gr.Textbox(label="Debug Output", interactive=False) | |
| analysis_output = gr.Markdown() | |
| def handle_clone(url, token, branch_name): | |
| print(f"\nCloning repository: {url}") | |
| success, result = clone_repository(url, token, branch_name if branch_name else None) | |
| if success: | |
| files = [str(p) for p in Path(result).rglob('*') | |
| if p.is_file() and '.git' not in str(p)] | |
| print(f"Found {len(files)} files in repository") | |
| return f"Successfully cloned: {result}", gr.Dropdown(choices=files) | |
| return f"Clone failed: {result}", None | |
| def analyze_files(files, model_name): | |
| if not files: | |
| return "Please select files to analyze", "No files selected" | |
| debug_info = [] | |
| results = [] | |
| debug_info.append(f"Starting analysis with model: {model_name}") | |
| debug_info.append(f"Files to analyze: {len(files)}") | |
| for file_path in files: | |
| debug_info.append(f"\nProcessing file: {file_path}") | |
| success, content = read_file_safely(file_path) | |
| if success: | |
| chunks = chunk_text(content) | |
| debug_info.append(f"Split into {len(chunks)} chunks") | |
| analysis = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| debug_info.append(f"Analyzing chunk {i}/{len(chunks)}") | |
| prompt = f""" | |
| Analyze this code/content: | |
| File: {file_path} | |
| Part {i}/{len(chunks)} | |
| ``` | |
| {chunk} | |
| ``` | |
| Provide: | |
| 1. Brief overview | |
| 2. Key functionality | |
| 3. Notable patterns or concerns | |
| 4. Suggestions (if any) | |
| """ | |
| response = analyze_with_ollama(model_name, prompt) | |
| debug_info.append(f"Got response of length: {len(response)}") | |
| analysis.append(response) | |
| results.append(f"### Analysis of {file_path}\n\n" + | |
| "\n\n=== Next Part ===\n\n".join(analysis)) | |
| else: | |
| error_msg = f"Error reading {file_path}: {content}" | |
| debug_info.append(error_msg) | |
| results.append(error_msg) | |
| return "\n\n---\n\n".join(results), "\n".join(debug_info) | |
| clone_button.click( | |
| fn=handle_clone, | |
| inputs=[repo_url, github_token, branch], | |
| outputs=[clone_status, file_list] | |
| ) | |
| analyze_button.click( | |
| fn=analyze_files, | |
| inputs=[file_list, selected_model], | |
| outputs=[analysis_output, debug_output] | |
| ) | |
| # Update status every 30 seconds | |
| app.load(update_status, outputs=[model_status, available_models]) | |
| return app | |
| # Launch the app | |
| if __name__ == "__main__": | |
| print(f""" | |
| Starting Ollama Repository Analyzer | |
| Time: {CURRENT_TIME} | |
| User: {CURRENT_USER} | |
| """) | |
| app = create_ui() | |
| app.launch(share=True) |