import gradio as gr
import os
from pathlib import Path
import subprocess
import requests
import json
from datetime import datetime
import textwrap
import time
from tqdm import tqdm
import html
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Metadata
CURRENT_TIME = "2025-05-22 23:15:49"
CURRENT_USER = "ErRickow"
# Ollama API settings
OLLAMA_API = os.getenv('OLLAMA_API')
# Default models
DEFAULT_MODELS = [
"llama2",
"codellama",
"mistral",
"neural-chat",
"starling-lm",
"dolphin-phi",
"phi",
"orca-mini"
]
# Custom CSS
CUSTOM_CSS = """
.output-block {
background-color: #1e1e1e;
border-radius: 5px;
padding: 10px;
margin: 10px 0;
font-family: 'Courier New', monospace;
position: relative;
}
.copy-button {
position: absolute;
top: 5px;
right: 5px;
padding: 5px 10px;
background-color: #2d2d2d;
border: none;
border-radius: 3px;
color: white;
cursor: pointer;
}
.copy-button:hover {
background-color: #3d3d3d;
}
pre {
margin: 0;
white-space: pre-wrap;
word-wrap: break-word;
}
code {
color: #d4d4d4;
}
"""
def format_code_block(text, language=""):
"""Format text as a code block with copy button"""
escaped_text = html.escape(text)
return f"""
"""
def check_ollama_status():
try:
response = requests.get(f"{OLLAMA_API}/api/tags", timeout=10)
return response.status_code == 200
except:
return False
def list_available_models():
try:
response = requests.get(f"{OLLAMA_API}/api/tags")
installed_models = [model['name'] for model in response.json().get('models', [])]
all_models = list(set(installed_models + DEFAULT_MODELS))
return sorted(all_models)
except:
return sorted(DEFAULT_MODELS)
def download_model(model_name, progress=gr.Progress()):
if not model_name:
return "Silakan pilih model terlebih dahulu"
try:
progress(0, desc=f"Mulai mengunduh {model_name}")
headers = {"Content-Type": "application/json"}
response = requests.post(
f"{OLLAMA_API}/api/pull",
headers=headers,
json={"name": model_name},
stream=True
)
if response.status_code == 200:
total_progress = 0
for line in response.iter_lines():
if line:
data = json.loads(line)
if 'completed' in data and 'total' in data:
progress = min(data['completed'] / data['total'], 1.0)
progress(progress, desc=f"Mengunduh {model_name}: {int(progress*100)}%")
elif 'status' in data:
progress(total_progress, desc=f"Status: {data['status']}")
total_progress += 0.1
if total_progress > 0.9:
total_progress = 0.1
progress(1.0, desc="Selesai!")
return f"Model {model_name} berhasil diunduh!"
else:
return f"Gagal mengunduh model. Status: {response.status_code}"
except Exception as e:
return f"Error saat mengunduh model: {str(e)}"
def clone_repository(repo_url, github_token, branch=None):
repo_name = repo_url.split('/')[-1].replace('.git', '')
print(f"Mengkloning repository: {repo_url} ke {repo_name}")
if os.path.exists(repo_name):
print(f"Menghapus repository yang sudah ada: {repo_name}")
subprocess.run(['rm', '-rf', repo_name], check=True)
try:
cmd = ['git', 'clone']
if branch:
cmd.extend(['--branch', branch])
if github_token:
owner_repo = '/'.join(repo_url.split('/')[-2:])
auth_url = f"https://{github_token}@github.com/{owner_repo}"
cmd.append(auth_url)
else:
cmd.append(repo_url)
process = subprocess.run(
cmd,
capture_output=True,
text=True,
env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0')
)
if process.returncode == 0:
print(f"Berhasil mengkloning repository: {repo_name}")
return True, repo_name
else:
print(f"Gagal mengkloning repository: {process.stderr}")
return False, process.stderr
except Exception as e:
error_msg = f"Error saat mengkloning repository: {str(e)}"
print(error_msg)
return False, error_msg
def analyze_with_ollama(model_name, text, stream=True):
print(f"\nMenganalisis dengan {model_name}...")
try:
payload = {
"model": model_name,
"prompt": text,
"stream": stream,
"options": {
"temperature": 0.7,
"top_p": 0.9,
"max_tokens": 2048,
"stop": None
}
}
response = requests.post(
f"{OLLAMA_API}/api/generate",
headers={"Content-Type": "application/json"},
json=payload,
stream=stream,
timeout=60
)
if response.status_code == 200:
if stream:
full_response = ""
for line in response.iter_lines():
if line:
data = json.loads(line)
if 'response' in data:
chunk = data['response']
full_response += chunk
yield full_response
return full_response
else:
result = response.json()
if 'response' in result:
return result['response']
else:
return "Error: Format respons tidak sesuai"
else:
return f"Error API {response.status_code}: {response.text}"
except Exception as e:
return f"Error saat memproses: {str(e)}"
def chunk_text(text, max_length=4000):
return textwrap.wrap(text, max_length, break_long_words=False, break_on_hyphens=False)
def read_file_safely(file_path):
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
print(f"Berhasil membaca file dengan encoding {encoding}")
return True, content
except UnicodeDecodeError:
continue
except Exception as e:
error_msg = f"Error membaca file: {str(e)}"
print(error_msg)
return False, error_msg
return False, "Tidak dapat membaca file dengan encoding yang didukung"
def handle_clone(url, token, branch_name):
"""Handle repository cloning and file listing"""
print(f"\nMengkloning repository: {url}")
success, result = clone_repository(url, token, branch_name if branch_name else None)
if success:
try:
files = [str(p) for p in Path(result).rglob('*')
if p.is_file() and '.git' not in str(p)]
print(f"Menemukan {len(files)} file dalam repository")
return f"Berhasil mengkloning repository ke: {result}", gr.Dropdown(choices=files)
except Exception as e:
error_msg = f"Error saat mencari file: {str(e)}"
print(error_msg)
return error_msg, None
return f"Gagal mengkloning: {result}", None
def create_ui():
with gr.Blocks(title="Analisis Repository dengan Ollama", css=CUSTOM_CSS) as app:
gr.Markdown(f"""
# 🚀 Analisis Repository dengan Ollama
Waktu Server: {CURRENT_TIME}
Pengguna: {CURRENT_USER}
""")
with gr.Tab("Manajemen Model"):
model_status = gr.Textbox(label="Status Ollama", interactive=False)
available_models = gr.Dropdown(
label="Model Tersedia",
choices=DEFAULT_MODELS,
interactive=True
)
with gr.Row():
download_button = gr.Button("Unduh Model")
cancel_button = gr.Button("Batalkan")
download_status = gr.Textbox(label="Status Unduhan", interactive=False)
progress = gr.Progress()
def update_status():
status = "Terhubung" if check_ollama_status() else "Tidak Terhubung"
models = list_available_models()
return status, gr.Dropdown(choices=models)
download_button.click(
fn=download_model,
inputs=[available_models],
outputs=[download_status]
)
with gr.Tab("Analisis Repository"):
with gr.Row():
repo_url = gr.Textbox(label="URL Repository")
github_token = gr.Textbox(label="Token GitHub", type="password")
branch = gr.Textbox(label="Branch (opsional)")
clone_button = gr.Button("Clone Repository")
clone_status = gr.Textbox(label="Status Clone", interactive=False)
with gr.Row():
file_list = gr.Dropdown(
label="File dalam Repository",
multiselect=True,
info="Pilih file yang akan dianalisis"
)
selected_model = gr.Dropdown(
label="Model untuk Analisis",
choices=DEFAULT_MODELS,
interactive=True
)
with gr.Row():
stream_checkbox = gr.Checkbox(
label="Streaming Response",
value=True,
info="Tampilkan hasil analisis secara real-time"
)
code_block_checkbox = gr.Checkbox(
label="Format Output sebagai Code Block",
value=True,
info="Tampilkan hasil dalam format code block yang bisa di-copy"
)
analyze_button = gr.Button("Analisis File")
debug_output = gr.HTML(label="Info Debug")
analysis_output = gr.HTML()
def analyze_files(files, model_name, stream=True, use_code_block=True):
if not files:
return ("Silakan pilih file untuk dianalisis",
format_code_block("Tidak ada file dipilih"))
debug_info = []
results = []
debug_info.append(f"Mulai analisis dengan model: {model_name}")
debug_info.append(f"File yang akan dianalisis: {len(files)}")
for file_path in files:
debug_info.append(f"\nMemproses file: {file_path}")
success, content = read_file_safely(file_path)
if success:
chunks = chunk_text(content)
debug_info.append(f"Dibagi menjadi {len(chunks)} bagian")
analysis = []
for i, chunk in enumerate(chunks, 1):
debug_info.append(f"Menganalisis bagian {i}/{len(chunks)}")
prompt = f"""
Analisis kode berikut:
File: {file_path}
Bagian {i} dari {len(chunks)}
```
{chunk}
```
Berikan:
1. Ringkasan singkat
2. Fungsi utama
3. Pola dan masalah yang ditemukan
4. Saran perbaikan (jika ada)
"""
if stream:
for response_chunk in analyze_with_ollama(model_name, prompt, stream=True):
formatted_response = (format_code_block(response_chunk)
if use_code_block else response_chunk)
yield formatted_response, format_code_block("\n".join(debug_info))
else:
response = analyze_with_ollama(model_name, prompt, stream=False)
analysis.append(response)
formatted_results = "\n\n---\n\n".join(results + [response])
if use_code_block:
formatted_results = format_code_block(formatted_results)
yield (formatted_results,
format_code_block("\n".join(debug_info)))
if not stream:
results.append(f"### Analisis {file_path}\n\n" +
"\n\n=== Bagian Selanjutnya ===\n\n".join(analysis))
else:
error_msg = f"Error membaca {file_path}: {content}"
debug_info.append(error_msg)
results.append(error_msg)
formatted_results = "\n\n---\n\n".join(results)
if use_code_block:
formatted_results = format_code_block(formatted_results)
yield formatted_results, format_code_block("\n".join(debug_info))
clone_button.click(
fn=handle_clone,
inputs=[repo_url, github_token, branch],
outputs=[clone_status, file_list]
)
analyze_button.click(
fn=analyze_files,
inputs=[
file_list,
selected_model,
stream_checkbox,
code_block_checkbox
],
outputs=[analysis_output, debug_output]
)
app.load(update_status, outputs=[model_status, available_models])
return app
if __name__ == "__main__":
print(f"""
Memulai Analisis Repository
Waktu: {CURRENT_TIME}
API: {OLLAMA_API}
""")
app = create_ui()
app.launch()