OpenRepoAI / app.py
ErNewdev0's picture
Update app.py
a988fb6 verified
raw
history blame
11 kB
import gradio as gr
import os
from pathlib import Path
import subprocess
import requests
import json
from datetime import datetime
import textwrap
# Metadata
CURRENT_TIME = "2025-05-22 22:42:10"
CURRENT_USER = "ErRickow"
# Ollama API settings
OLLAMA_API = os.environ["OLLAMA_API"]
# Default available models
DEFAULT_MODELS = [
"llama2",
"codellama",
"mistral",
"neural-chat",
"starling-lm",
"dolphin-phi",
"phi",
"orca-mini"
]
def check_ollama_status():
try:
response = requests.get(f"{OLLAMA_API}/api/tags", timeout=10)
return response.status_code == 200
except:
return False
def list_available_models():
try:
response = requests.get(f"{OLLAMA_API}/api/tags")
installed_models = [model['name'] for model in response.json().get('models', [])]
# Combine installed and default models
all_models = list(set(installed_models + DEFAULT_MODELS))
return sorted(all_models) # Sort for better presentation
except:
return sorted(DEFAULT_MODELS)
def download_model(model_name):
if not model_name:
return "Please select a model to download"
print(f"Starting download of model: {model_name}")
try:
headers = {
"Content-Type": "application/json",
}
response = requests.post(
f"{OLLAMA_API}/api/pull",
headers=headers,
json={"name": model_name},
stream=True
)
if response.status_code == 200:
for line in response.iter_lines():
if line:
print(f"Download progress: {line.decode()}")
return f"Successfully downloaded model: {model_name}"
else:
error_msg = f"Failed to download model. Status: {response.status_code}"
print(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error downloading model: {str(e)}"
print(error_msg)
return error_msg
def clone_repository(repo_url, github_token, branch=None):
"""Clone a repository with authentication"""
repo_name = repo_url.split('/')[-1].replace('.git', '')
print(f"Cloning repository: {repo_url} to {repo_name}")
if os.path.exists(repo_name):
print(f"Removing existing repository: {repo_name}")
subprocess.run(['rm', '-rf', repo_name], check=True)
try:
owner_repo = '/'.join(repo_url.split('/')[-2:])
auth_url = f"https://{github_token}@github.com/{owner_repo}"
cmd = ['git', 'clone']
if branch:
cmd.extend(['--branch', branch])
cmd.append(auth_url)
process = subprocess.run(
cmd,
capture_output=True,
text=True,
env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0')
)
if process.returncode == 0:
print(f"Successfully cloned repository: {repo_name}")
return True, repo_name
else:
print(f"Failed to clone repository: {process.stderr}")
return False, process.stderr
except Exception as e:
error_msg = f"Error cloning repository: {str(e)}"
print(error_msg)
return False, error_msg
def analyze_with_ollama(model_name, text):
"""Process text with Ollama model"""
print(f"\nAnalyzing with {model_name}...")
try:
payload = {
"model": model_name,
"prompt": text,
"stream": False,
"options": {
"temperature": 0.7,
"top_p": 0.9,
"max_tokens": 2048,
"stop": None
}
}
print("Sending request to Ollama API...")
response = requests.post(
f"{OLLAMA_API}/api/generate",
headers={"Content-Type": "application/json"},
json=payload,
timeout=60
)
print(f"Response status: {response.status_code}")
if response.status_code == 200:
result = response.json()
if 'response' in result:
print("Got response from model")
return result['response']
else:
print("Unexpected response format:", result)
return "Error: Unexpected response format from model"
else:
error_msg = f"API Error {response.status_code}: {response.text}"
print(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error processing with model: {str(e)}"
print(error_msg)
return error_msg
def chunk_text(text, max_length=4000):
return textwrap.wrap(text, max_length, break_long_words=False, break_on_hyphens=False)
def read_file_safely(file_path):
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
print(f"Successfully read file with {encoding} encoding")
return True, content
except UnicodeDecodeError:
continue
except Exception as e:
error_msg = f"Error reading file: {str(e)}"
print(error_msg)
return False, error_msg
return False, "Unable to read file with supported encodings"
def create_ui():
with gr.Blocks(title="Ollama Repository Analyzer") as app:
gr.Markdown(f"""
# Ollama Repository Analyzer
Current Time: {CURRENT_TIME}
User: {CURRENT_USER}
""")
with gr.Tab("Model Management"):
model_status = gr.Textbox(label="Ollama Status", interactive=False)
available_models = gr.Dropdown(
label="Available Models",
choices=DEFAULT_MODELS,
interactive=True
)
download_button = gr.Button("Download Selected Model")
download_status = gr.Textbox(label="Download Status", interactive=False)
def update_status():
status = "Connected" if check_ollama_status() else "Not Connected"
models = list_available_models()
return status, gr.Dropdown(choices=models)
download_button.click(
fn=download_model,
inputs=[available_models],
outputs=[download_status]
)
with gr.Tab("Repository Analysis"):
repo_url = gr.Textbox(label="Repository URL")
github_token = gr.Textbox(label="GitHub Token", type="password")
branch = gr.Textbox(label="Branch (optional)")
clone_button = gr.Button("Clone Repository")
clone_status = gr.Textbox(label="Clone Status", interactive=False)
with gr.Row():
file_list = gr.Dropdown(label="Files in Repository", multiselect=True)
selected_model = gr.Dropdown(
label="Select Model for Analysis",
choices=DEFAULT_MODELS,
interactive=True
)
analyze_button = gr.Button("Analyze Selected Files")
debug_output = gr.Textbox(label="Debug Output", interactive=False)
analysis_output = gr.Markdown()
def handle_clone(url, token, branch_name):
print(f"\nCloning repository: {url}")
success, result = clone_repository(url, token, branch_name if branch_name else None)
if success:
files = [str(p) for p in Path(result).rglob('*')
if p.is_file() and '.git' not in str(p)]
print(f"Found {len(files)} files in repository")
return f"Successfully cloned: {result}", gr.Dropdown(choices=files)
return f"Clone failed: {result}", None
def analyze_files(files, model_name):
if not files:
return "Please select files to analyze", "No files selected"
debug_info = []
results = []
debug_info.append(f"Starting analysis with model: {model_name}")
debug_info.append(f"Files to analyze: {len(files)}")
for file_path in files:
debug_info.append(f"\nProcessing file: {file_path}")
success, content = read_file_safely(file_path)
if success:
chunks = chunk_text(content)
debug_info.append(f"Split into {len(chunks)} chunks")
analysis = []
for i, chunk in enumerate(chunks, 1):
debug_info.append(f"Analyzing chunk {i}/{len(chunks)}")
prompt = f"""
Analyze this code/content:
File: {file_path}
Part {i}/{len(chunks)}
```
{chunk}
```
Provide:
1. Brief overview
2. Key functionality
3. Notable patterns or concerns
4. Suggestions (if any)
"""
response = analyze_with_ollama(model_name, prompt)
debug_info.append(f"Got response of length: {len(response)}")
analysis.append(response)
results.append(f"### Analysis of {file_path}\n\n" +
"\n\n=== Next Part ===\n\n".join(analysis))
else:
error_msg = f"Error reading {file_path}: {content}"
debug_info.append(error_msg)
results.append(error_msg)
return "\n\n---\n\n".join(results), "\n".join(debug_info)
clone_button.click(
fn=handle_clone,
inputs=[repo_url, github_token, branch],
outputs=[clone_status, file_list]
)
analyze_button.click(
fn=analyze_files,
inputs=[file_list, selected_model],
outputs=[analysis_output, debug_output]
)
# Update status every 30 seconds
app.load(update_status, outputs=[model_status, available_models])
return app
# Launch the app
if __name__ == "__main__":
print(f"""
Starting Ollama Repository Analyzer
Time: {CURRENT_TIME}
User: {CURRENT_USER}
""")
app = create_ui()
app.launch(share=True)