import gradio as gr import os from pathlib import Path import subprocess import requests import json from datetime import datetime import textwrap import time from tqdm import tqdm import html from dotenv import load_dotenv # Load environment variables load_dotenv() # Metadata CURRENT_TIME = "2025-05-22 23:15:49" CURRENT_USER = "ErRickow" # Ollama API settings OLLAMA_API = os.getenv('OLLAMA_API') # Default models DEFAULT_MODELS = [ "llama2", "codellama", "mistral", "neural-chat", "starling-lm", "dolphin-phi", "phi", "orca-mini" ] # Custom CSS CUSTOM_CSS = """ .output-block { background-color: #1e1e1e; border-radius: 5px; padding: 10px; margin: 10px 0; font-family: 'Courier New', monospace; position: relative; } .copy-button { position: absolute; top: 5px; right: 5px; padding: 5px 10px; background-color: #2d2d2d; border: none; border-radius: 3px; color: white; cursor: pointer; } .copy-button:hover { background-color: #3d3d3d; } pre { margin: 0; white-space: pre-wrap; word-wrap: break-word; } code { color: #d4d4d4; } """ def format_code_block(text, language=""): """Format text as a code block with copy button""" escaped_text = html.escape(text) return f"""
{escaped_text}
""" def check_ollama_status(): try: response = requests.get(f"{OLLAMA_API}/api/tags", timeout=10) return response.status_code == 200 except: return False def list_available_models(): try: response = requests.get(f"{OLLAMA_API}/api/tags") installed_models = [model['name'] for model in response.json().get('models', [])] all_models = list(set(installed_models + DEFAULT_MODELS)) return sorted(all_models) except: return sorted(DEFAULT_MODELS) def download_model(model_name, progress=gr.Progress()): if not model_name: return "Silakan pilih model terlebih dahulu" try: progress(0, desc=f"Mulai mengunduh {model_name}") headers = {"Content-Type": "application/json"} response = requests.post( f"{OLLAMA_API}/api/pull", headers=headers, json={"name": model_name}, stream=True ) if response.status_code == 200: total_progress = 0 for line in response.iter_lines(): if line: data = json.loads(line) if 'completed' in data and 'total' in data: progress = min(data['completed'] / data['total'], 1.0) progress(progress, desc=f"Mengunduh {model_name}: {int(progress*100)}%") elif 'status' in data: progress(total_progress, desc=f"Status: {data['status']}") total_progress += 0.1 if total_progress > 0.9: total_progress = 0.1 progress(1.0, desc="Selesai!") return f"Model {model_name} berhasil diunduh!" else: return f"Gagal mengunduh model. Status: {response.status_code}" except Exception as e: return f"Error saat mengunduh model: {str(e)}" def clone_repository(repo_url, github_token, branch=None): repo_name = repo_url.split('/')[-1].replace('.git', '') print(f"Mengkloning repository: {repo_url} ke {repo_name}") if os.path.exists(repo_name): print(f"Menghapus repository yang sudah ada: {repo_name}") subprocess.run(['rm', '-rf', repo_name], check=True) try: cmd = ['git', 'clone'] if branch: cmd.extend(['--branch', branch]) if github_token: owner_repo = '/'.join(repo_url.split('/')[-2:]) auth_url = f"https://{github_token}@github.com/{owner_repo}" cmd.append(auth_url) else: cmd.append(repo_url) process = subprocess.run( cmd, capture_output=True, text=True, env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0') ) if process.returncode == 0: print(f"Berhasil mengkloning repository: {repo_name}") return True, repo_name else: print(f"Gagal mengkloning repository: {process.stderr}") return False, process.stderr except Exception as e: error_msg = f"Error saat mengkloning repository: {str(e)}" print(error_msg) return False, error_msg def analyze_with_ollama(model_name, text, stream=True): print(f"\nMenganalisis dengan {model_name}...") try: payload = { "model": model_name, "prompt": text, "stream": stream, "options": { "temperature": 0.7, "top_p": 0.9, "max_tokens": 2048, "stop": None } } response = requests.post( f"{OLLAMA_API}/api/generate", headers={"Content-Type": "application/json"}, json=payload, stream=stream, timeout=60 ) if response.status_code == 200: if stream: full_response = "" for line in response.iter_lines(): if line: data = json.loads(line) if 'response' in data: chunk = data['response'] full_response += chunk yield full_response return full_response else: result = response.json() if 'response' in result: return result['response'] else: return "Error: Format respons tidak sesuai" else: return f"Error API {response.status_code}: {response.text}" except Exception as e: return f"Error saat memproses: {str(e)}" def chunk_text(text, max_length=4000): return textwrap.wrap(text, max_length, break_long_words=False, break_on_hyphens=False) def read_file_safely(file_path): encodings = ['utf-8', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() print(f"Berhasil membaca file dengan encoding {encoding}") return True, content except UnicodeDecodeError: continue except Exception as e: error_msg = f"Error membaca file: {str(e)}" print(error_msg) return False, error_msg return False, "Tidak dapat membaca file dengan encoding yang didukung" def handle_clone(url, token, branch_name): """Handle repository cloning and file listing""" print(f"\nMengkloning repository: {url}") success, result = clone_repository(url, token, branch_name if branch_name else None) if success: try: files = [str(p) for p in Path(result).rglob('*') if p.is_file() and '.git' not in str(p)] print(f"Menemukan {len(files)} file dalam repository") return f"Berhasil mengkloning repository ke: {result}", gr.Dropdown(choices=files) except Exception as e: error_msg = f"Error saat mencari file: {str(e)}" print(error_msg) return error_msg, None return f"Gagal mengkloning: {result}", None def create_ui(): with gr.Blocks(title="Analisis Repository dengan Ollama", css=CUSTOM_CSS) as app: gr.Markdown(f""" # 🚀 Analisis Repository dengan Ollama Waktu Server: {CURRENT_TIME} Pengguna: {CURRENT_USER} """) with gr.Tab("Manajemen Model"): model_status = gr.Textbox(label="Status Ollama", interactive=False) available_models = gr.Dropdown( label="Model Tersedia", choices=DEFAULT_MODELS, interactive=True ) with gr.Row(): download_button = gr.Button("Unduh Model") cancel_button = gr.Button("Batalkan") download_status = gr.Textbox(label="Status Unduhan", interactive=False) progress = gr.Progress() def update_status(): status = "Terhubung" if check_ollama_status() else "Tidak Terhubung" models = list_available_models() return status, gr.Dropdown(choices=models) download_button.click( fn=download_model, inputs=[available_models], outputs=[download_status] ) with gr.Tab("Analisis Repository"): with gr.Row(): repo_url = gr.Textbox(label="URL Repository") github_token = gr.Textbox(label="Token GitHub", type="password") branch = gr.Textbox(label="Branch (opsional)") clone_button = gr.Button("Clone Repository") clone_status = gr.Textbox(label="Status Clone", interactive=False) with gr.Row(): file_list = gr.Dropdown( label="File dalam Repository", multiselect=True, info="Pilih file yang akan dianalisis" ) selected_model = gr.Dropdown( label="Model untuk Analisis", choices=DEFAULT_MODELS, interactive=True ) with gr.Row(): stream_checkbox = gr.Checkbox( label="Streaming Response", value=True, info="Tampilkan hasil analisis secara real-time" ) code_block_checkbox = gr.Checkbox( label="Format Output sebagai Code Block", value=True, info="Tampilkan hasil dalam format code block yang bisa di-copy" ) analyze_button = gr.Button("Analisis File") debug_output = gr.HTML(label="Info Debug") analysis_output = gr.HTML() def analyze_files(files, model_name, stream=True, use_code_block=True): if not files: return ("Silakan pilih file untuk dianalisis", format_code_block("Tidak ada file dipilih")) debug_info = [] results = [] debug_info.append(f"Mulai analisis dengan model: {model_name}") debug_info.append(f"File yang akan dianalisis: {len(files)}") for file_path in files: debug_info.append(f"\nMemproses file: {file_path}") success, content = read_file_safely(file_path) if success: chunks = chunk_text(content) debug_info.append(f"Dibagi menjadi {len(chunks)} bagian") analysis = [] for i, chunk in enumerate(chunks, 1): debug_info.append(f"Menganalisis bagian {i}/{len(chunks)}") prompt = f""" Analisis kode berikut: File: {file_path} Bagian {i} dari {len(chunks)} ``` {chunk} ``` Berikan: 1. Ringkasan singkat 2. Fungsi utama 3. Pola dan masalah yang ditemukan 4. Saran perbaikan (jika ada) """ if stream: for response_chunk in analyze_with_ollama(model_name, prompt, stream=True): formatted_response = (format_code_block(response_chunk) if use_code_block else response_chunk) yield formatted_response, format_code_block("\n".join(debug_info)) else: response = analyze_with_ollama(model_name, prompt, stream=False) analysis.append(response) formatted_results = "\n\n---\n\n".join(results + [response]) if use_code_block: formatted_results = format_code_block(formatted_results) yield (formatted_results, format_code_block("\n".join(debug_info))) if not stream: results.append(f"### Analisis {file_path}\n\n" + "\n\n=== Bagian Selanjutnya ===\n\n".join(analysis)) else: error_msg = f"Error membaca {file_path}: {content}" debug_info.append(error_msg) results.append(error_msg) formatted_results = "\n\n---\n\n".join(results) if use_code_block: formatted_results = format_code_block(formatted_results) yield formatted_results, format_code_block("\n".join(debug_info)) clone_button.click( fn=handle_clone, inputs=[repo_url, github_token, branch], outputs=[clone_status, file_list] ) analyze_button.click( fn=analyze_files, inputs=[ file_list, selected_model, stream_checkbox, code_block_checkbox ], outputs=[analysis_output, debug_output] ) app.load(update_status, outputs=[model_status, available_models]) return app if __name__ == "__main__": print(f""" Memulai Analisis Repository Waktu: {CURRENT_TIME} API: {OLLAMA_API} """) app = create_ui() app.launch()