import gradio as gr import os from pathlib import Path import subprocess import requests import json from datetime import datetime import textwrap # Metadata CURRENT_TIME = "2025-05-22 22:42:10" CURRENT_USER = "ErRickow" # Ollama API settings OLLAMA_API = os.environ["OLLAMA_API"] # Default available models DEFAULT_MODELS = [ "llama2", "codellama", "mistral", "neural-chat", "starling-lm", "dolphin-phi", "phi", "orca-mini" ] def check_ollama_status(): try: response = requests.get(f"{OLLAMA_API}/api/tags", timeout=10) return response.status_code == 200 except: return False def list_available_models(): try: response = requests.get(f"{OLLAMA_API}/api/tags") installed_models = [model['name'] for model in response.json().get('models', [])] # Combine installed and default models all_models = list(set(installed_models + DEFAULT_MODELS)) return sorted(all_models) # Sort for better presentation except: return sorted(DEFAULT_MODELS) def download_model(model_name): if not model_name: return "Please select a model to download" print(f"Starting download of model: {model_name}") try: headers = { "Content-Type": "application/json", } response = requests.post( f"{OLLAMA_API}/api/pull", headers=headers, json={"name": model_name}, stream=True ) if response.status_code == 200: for line in response.iter_lines(): if line: print(f"Download progress: {line.decode()}") return f"Successfully downloaded model: {model_name}" else: error_msg = f"Failed to download model. Status: {response.status_code}" print(error_msg) return error_msg except Exception as e: error_msg = f"Error downloading model: {str(e)}" print(error_msg) return error_msg def clone_repository(repo_url, github_token, branch=None): """Clone a repository with authentication""" repo_name = repo_url.split('/')[-1].replace('.git', '') print(f"Cloning repository: {repo_url} to {repo_name}") if os.path.exists(repo_name): print(f"Removing existing repository: {repo_name}") subprocess.run(['rm', '-rf', repo_name], check=True) try: owner_repo = '/'.join(repo_url.split('/')[-2:]) auth_url = f"https://{github_token}@github.com/{owner_repo}" cmd = ['git', 'clone'] if branch: cmd.extend(['--branch', branch]) cmd.append(auth_url) process = subprocess.run( cmd, capture_output=True, text=True, env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0') ) if process.returncode == 0: print(f"Successfully cloned repository: {repo_name}") return True, repo_name else: print(f"Failed to clone repository: {process.stderr}") return False, process.stderr except Exception as e: error_msg = f"Error cloning repository: {str(e)}" print(error_msg) return False, error_msg def analyze_with_ollama(model_name, text): """Process text with Ollama model""" print(f"\nAnalyzing with {model_name}...") try: payload = { "model": model_name, "prompt": text, "stream": False, "options": { "temperature": 0.7, "top_p": 0.9, "max_tokens": 2048, "stop": None } } print("Sending request to Ollama API...") response = requests.post( f"{OLLAMA_API}/api/generate", headers={"Content-Type": "application/json"}, json=payload, timeout=60 ) print(f"Response status: {response.status_code}") if response.status_code == 200: result = response.json() if 'response' in result: print("Got response from model") return result['response'] else: print("Unexpected response format:", result) return "Error: Unexpected response format from model" else: error_msg = f"API Error {response.status_code}: {response.text}" print(error_msg) return error_msg except Exception as e: error_msg = f"Error processing with model: {str(e)}" print(error_msg) return error_msg def chunk_text(text, max_length=4000): return textwrap.wrap(text, max_length, break_long_words=False, break_on_hyphens=False) def read_file_safely(file_path): encodings = ['utf-8', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() print(f"Successfully read file with {encoding} encoding") return True, content except UnicodeDecodeError: continue except Exception as e: error_msg = f"Error reading file: {str(e)}" print(error_msg) return False, error_msg return False, "Unable to read file with supported encodings" def create_ui(): with gr.Blocks(title="Ollama Repository Analyzer") as app: gr.Markdown(f""" # Ollama Repository Analyzer Current Time: {CURRENT_TIME} User: {CURRENT_USER} """) with gr.Tab("Model Management"): model_status = gr.Textbox(label="Ollama Status", interactive=False) available_models = gr.Dropdown( label="Available Models", choices=DEFAULT_MODELS, interactive=True ) download_button = gr.Button("Download Selected Model") download_status = gr.Textbox(label="Download Status", interactive=False) def update_status(): status = "Connected" if check_ollama_status() else "Not Connected" models = list_available_models() return status, gr.Dropdown(choices=models) download_button.click( fn=download_model, inputs=[available_models], outputs=[download_status] ) with gr.Tab("Repository Analysis"): repo_url = gr.Textbox(label="Repository URL") github_token = gr.Textbox(label="GitHub Token", type="password") branch = gr.Textbox(label="Branch (optional)") clone_button = gr.Button("Clone Repository") clone_status = gr.Textbox(label="Clone Status", interactive=False) with gr.Row(): file_list = gr.Dropdown(label="Files in Repository", multiselect=True) selected_model = gr.Dropdown( label="Select Model for Analysis", choices=DEFAULT_MODELS, interactive=True ) analyze_button = gr.Button("Analyze Selected Files") debug_output = gr.Textbox(label="Debug Output", interactive=False) analysis_output = gr.Markdown() def handle_clone(url, token, branch_name): print(f"\nCloning repository: {url}") success, result = clone_repository(url, token, branch_name if branch_name else None) if success: files = [str(p) for p in Path(result).rglob('*') if p.is_file() and '.git' not in str(p)] print(f"Found {len(files)} files in repository") return f"Successfully cloned: {result}", gr.Dropdown(choices=files) return f"Clone failed: {result}", None def analyze_files(files, model_name): if not files: return "Please select files to analyze", "No files selected" debug_info = [] results = [] debug_info.append(f"Starting analysis with model: {model_name}") debug_info.append(f"Files to analyze: {len(files)}") for file_path in files: debug_info.append(f"\nProcessing file: {file_path}") success, content = read_file_safely(file_path) if success: chunks = chunk_text(content) debug_info.append(f"Split into {len(chunks)} chunks") analysis = [] for i, chunk in enumerate(chunks, 1): debug_info.append(f"Analyzing chunk {i}/{len(chunks)}") prompt = f""" Analyze this code/content: File: {file_path} Part {i}/{len(chunks)} ``` {chunk} ``` Provide: 1. Brief overview 2. Key functionality 3. Notable patterns or concerns 4. Suggestions (if any) """ response = analyze_with_ollama(model_name, prompt) debug_info.append(f"Got response of length: {len(response)}") analysis.append(response) results.append(f"### Analysis of {file_path}\n\n" + "\n\n=== Next Part ===\n\n".join(analysis)) else: error_msg = f"Error reading {file_path}: {content}" debug_info.append(error_msg) results.append(error_msg) return "\n\n---\n\n".join(results), "\n".join(debug_info) clone_button.click( fn=handle_clone, inputs=[repo_url, github_token, branch], outputs=[clone_status, file_list] ) analyze_button.click( fn=analyze_files, inputs=[file_list, selected_model], outputs=[analysis_output, debug_output] ) # Update status every 30 seconds app.load(update_status, outputs=[model_status, available_models]) return app # Launch the app if __name__ == "__main__": print(f""" Starting Ollama Repository Analyzer Time: {CURRENT_TIME} User: {CURRENT_USER} """) app = create_ui() app.launch(share=True)