Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from pathlib import Path | |
| import subprocess | |
| import requests | |
| import json | |
| from datetime import datetime | |
| import textwrap | |
| import time | |
| from tqdm import tqdm | |
| import html | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Metadata | |
| CURRENT_TIME = "2025-05-22 23:15:49" | |
| CURRENT_USER = "ErRickow" | |
| # Ollama API settings | |
| OLLAMA_API = os.getenv('OLLAMA_API') | |
| # Default models | |
| DEFAULT_MODELS = [ | |
| "llama2", | |
| "codellama", | |
| "mistral", | |
| "neural-chat", | |
| "starling-lm", | |
| "dolphin-phi", | |
| "phi", | |
| "orca-mini" | |
| ] | |
| # Custom CSS | |
| CUSTOM_CSS = """ | |
| .output-block { | |
| background-color: #1e1e1e; | |
| border-radius: 5px; | |
| padding: 10px; | |
| margin: 10px 0; | |
| font-family: 'Courier New', monospace; | |
| position: relative; | |
| } | |
| .copy-button { | |
| position: absolute; | |
| top: 5px; | |
| right: 5px; | |
| padding: 5px 10px; | |
| background-color: #2d2d2d; | |
| border: none; | |
| border-radius: 3px; | |
| color: white; | |
| cursor: pointer; | |
| } | |
| .copy-button:hover { | |
| background-color: #3d3d3d; | |
| } | |
| pre { | |
| margin: 0; | |
| white-space: pre-wrap; | |
| word-wrap: break-word; | |
| } | |
| code { | |
| color: #d4d4d4; | |
| } | |
| """ | |
| def format_code_block(text, language=""): | |
| """Format text as a code block with copy button""" | |
| escaped_text = html.escape(text) | |
| return f""" | |
| <div class="output-block"> | |
| <button onclick="navigator.clipboard.writeText(`{escaped_text}`)" class="copy-button"> | |
| Copy | |
| </button> | |
| <pre><code class="language-{language}">{escaped_text}</code></pre> | |
| </div> | |
| """ | |
| def check_ollama_status(): | |
| try: | |
| response = requests.get(f"{OLLAMA_API}/api/tags", timeout=10) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def list_available_models(): | |
| try: | |
| response = requests.get(f"{OLLAMA_API}/api/tags") | |
| installed_models = [model['name'] for model in response.json().get('models', [])] | |
| all_models = list(set(installed_models + DEFAULT_MODELS)) | |
| return sorted(all_models) | |
| except: | |
| return sorted(DEFAULT_MODELS) | |
| def download_model(model_name, progress=gr.Progress()): | |
| if not model_name: | |
| return "Silakan pilih model terlebih dahulu" | |
| try: | |
| progress(0, desc=f"Mulai mengunduh {model_name}") | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post( | |
| f"{OLLAMA_API}/api/pull", | |
| headers=headers, | |
| json={"name": model_name}, | |
| stream=True | |
| ) | |
| if response.status_code == 200: | |
| total_progress = 0 | |
| for line in response.iter_lines(): | |
| if line: | |
| data = json.loads(line) | |
| if 'completed' in data and 'total' in data: | |
| progress = min(data['completed'] / data['total'], 1.0) | |
| progress(progress, desc=f"Mengunduh {model_name}: {int(progress*100)}%") | |
| elif 'status' in data: | |
| progress(total_progress, desc=f"Status: {data['status']}") | |
| total_progress += 0.1 | |
| if total_progress > 0.9: | |
| total_progress = 0.1 | |
| progress(1.0, desc="Selesai!") | |
| return f"Model {model_name} berhasil diunduh!" | |
| else: | |
| return f"Gagal mengunduh model. Status: {response.status_code}" | |
| except Exception as e: | |
| return f"Error saat mengunduh model: {str(e)}" | |
| def clone_repository(repo_url, github_token, branch=None): | |
| repo_name = repo_url.split('/')[-1].replace('.git', '') | |
| print(f"Mengkloning repository: {repo_url} ke {repo_name}") | |
| if os.path.exists(repo_name): | |
| print(f"Menghapus repository yang sudah ada: {repo_name}") | |
| subprocess.run(['rm', '-rf', repo_name], check=True) | |
| try: | |
| cmd = ['git', 'clone'] | |
| if branch: | |
| cmd.extend(['--branch', branch]) | |
| if github_token: | |
| owner_repo = '/'.join(repo_url.split('/')[-2:]) | |
| auth_url = f"https://{github_token}@github.com/{owner_repo}" | |
| cmd.append(auth_url) | |
| else: | |
| cmd.append(repo_url) | |
| process = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0') | |
| ) | |
| if process.returncode == 0: | |
| print(f"Berhasil mengkloning repository: {repo_name}") | |
| return True, repo_name | |
| else: | |
| print(f"Gagal mengkloning repository: {process.stderr}") | |
| return False, process.stderr | |
| except Exception as e: | |
| error_msg = f"Error saat mengkloning repository: {str(e)}" | |
| print(error_msg) | |
| return False, error_msg | |
| def analyze_with_ollama(model_name, text, stream=True): | |
| print(f"\nMenganalisis dengan {model_name}...") | |
| try: | |
| payload = { | |
| "model": model_name, | |
| "prompt": text, | |
| "stream": stream, | |
| "options": { | |
| "temperature": 0.7, | |
| "top_p": 0.9, | |
| "max_tokens": 2048, | |
| "stop": None | |
| } | |
| } | |
| response = requests.post( | |
| f"{OLLAMA_API}/api/generate", | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| stream=stream, | |
| timeout=60 | |
| ) | |
| if response.status_code == 200: | |
| if stream: | |
| full_response = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| data = json.loads(line) | |
| if 'response' in data: | |
| chunk = data['response'] | |
| full_response += chunk | |
| yield full_response | |
| return full_response | |
| else: | |
| result = response.json() | |
| if 'response' in result: | |
| return result['response'] | |
| else: | |
| return "Error: Format respons tidak sesuai" | |
| else: | |
| return f"Error API {response.status_code}: {response.text}" | |
| except Exception as e: | |
| return f"Error saat memproses: {str(e)}" | |
| def chunk_text(text, max_length=4000): | |
| return textwrap.wrap(text, max_length, break_long_words=False, break_on_hyphens=False) | |
| def read_file_safely(file_path): | |
| encodings = ['utf-8', 'latin-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| content = f.read() | |
| print(f"Berhasil membaca file dengan encoding {encoding}") | |
| return True, content | |
| except UnicodeDecodeError: | |
| continue | |
| except Exception as e: | |
| error_msg = f"Error membaca file: {str(e)}" | |
| print(error_msg) | |
| return False, error_msg | |
| return False, "Tidak dapat membaca file dengan encoding yang didukung" | |
| def handle_clone(url, token, branch_name): | |
| """Handle repository cloning and file listing""" | |
| print(f"\nMengkloning repository: {url}") | |
| success, result = clone_repository(url, token, branch_name if branch_name else None) | |
| if success: | |
| try: | |
| files = [str(p) for p in Path(result).rglob('*') | |
| if p.is_file() and '.git' not in str(p)] | |
| print(f"Menemukan {len(files)} file dalam repository") | |
| return f"Berhasil mengkloning repository ke: {result}", gr.Dropdown(choices=files) | |
| except Exception as e: | |
| error_msg = f"Error saat mencari file: {str(e)}" | |
| print(error_msg) | |
| return error_msg, None | |
| return f"Gagal mengkloning: {result}", None | |
| def create_ui(): | |
| with gr.Blocks(title="Analisis Repository dengan Ollama", css=CUSTOM_CSS) as app: | |
| gr.Markdown(f""" | |
| # 🚀 Analisis Repository dengan Ollama | |
| Waktu Server: {CURRENT_TIME} | |
| Pengguna: {CURRENT_USER} | |
| """) | |
| with gr.Tab("Manajemen Model"): | |
| model_status = gr.Textbox(label="Status Ollama", interactive=False) | |
| available_models = gr.Dropdown( | |
| label="Model Tersedia", | |
| choices=DEFAULT_MODELS, | |
| interactive=True | |
| ) | |
| with gr.Row(): | |
| download_button = gr.Button("Unduh Model") | |
| cancel_button = gr.Button("Batalkan") | |
| download_status = gr.Textbox(label="Status Unduhan", interactive=False) | |
| progress = gr.Progress() | |
| def update_status(): | |
| status = "Terhubung" if check_ollama_status() else "Tidak Terhubung" | |
| models = list_available_models() | |
| return status, gr.Dropdown(choices=models) | |
| download_button.click( | |
| fn=download_model, | |
| inputs=[available_models], | |
| outputs=[download_status] | |
| ) | |
| with gr.Tab("Analisis Repository"): | |
| with gr.Row(): | |
| repo_url = gr.Textbox(label="URL Repository") | |
| github_token = gr.Textbox(label="Token GitHub", type="password") | |
| branch = gr.Textbox(label="Branch (opsional)") | |
| clone_button = gr.Button("Clone Repository") | |
| clone_status = gr.Textbox(label="Status Clone", interactive=False) | |
| with gr.Row(): | |
| file_list = gr.Dropdown( | |
| label="File dalam Repository", | |
| multiselect=True, | |
| info="Pilih file yang akan dianalisis" | |
| ) | |
| selected_model = gr.Dropdown( | |
| label="Model untuk Analisis", | |
| choices=DEFAULT_MODELS, | |
| interactive=True | |
| ) | |
| with gr.Row(): | |
| stream_checkbox = gr.Checkbox( | |
| label="Streaming Response", | |
| value=True, | |
| info="Tampilkan hasil analisis secara real-time" | |
| ) | |
| code_block_checkbox = gr.Checkbox( | |
| label="Format Output sebagai Code Block", | |
| value=True, | |
| info="Tampilkan hasil dalam format code block yang bisa di-copy" | |
| ) | |
| analyze_button = gr.Button("Analisis File") | |
| debug_output = gr.HTML(label="Info Debug") | |
| analysis_output = gr.HTML() | |
| def analyze_files(files, model_name, stream=True, use_code_block=True): | |
| if not files: | |
| return ("Silakan pilih file untuk dianalisis", | |
| format_code_block("Tidak ada file dipilih")) | |
| debug_info = [] | |
| results = [] | |
| debug_info.append(f"Mulai analisis dengan model: {model_name}") | |
| debug_info.append(f"File yang akan dianalisis: {len(files)}") | |
| for file_path in files: | |
| debug_info.append(f"\nMemproses file: {file_path}") | |
| success, content = read_file_safely(file_path) | |
| if success: | |
| chunks = chunk_text(content) | |
| debug_info.append(f"Dibagi menjadi {len(chunks)} bagian") | |
| analysis = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| debug_info.append(f"Menganalisis bagian {i}/{len(chunks)}") | |
| prompt = f""" | |
| Analisis kode berikut: | |
| File: {file_path} | |
| Bagian {i} dari {len(chunks)} | |
| ``` | |
| {chunk} | |
| ``` | |
| Berikan: | |
| 1. Ringkasan singkat | |
| 2. Fungsi utama | |
| 3. Pola dan masalah yang ditemukan | |
| 4. Saran perbaikan (jika ada) | |
| """ | |
| if stream: | |
| for response_chunk in analyze_with_ollama(model_name, prompt, stream=True): | |
| formatted_response = (format_code_block(response_chunk) | |
| if use_code_block else response_chunk) | |
| yield formatted_response, format_code_block("\n".join(debug_info)) | |
| else: | |
| response = analyze_with_ollama(model_name, prompt, stream=False) | |
| analysis.append(response) | |
| formatted_results = "\n\n---\n\n".join(results + [response]) | |
| if use_code_block: | |
| formatted_results = format_code_block(formatted_results) | |
| yield (formatted_results, | |
| format_code_block("\n".join(debug_info))) | |
| if not stream: | |
| results.append(f"### Analisis {file_path}\n\n" + | |
| "\n\n=== Bagian Selanjutnya ===\n\n".join(analysis)) | |
| else: | |
| error_msg = f"Error membaca {file_path}: {content}" | |
| debug_info.append(error_msg) | |
| results.append(error_msg) | |
| formatted_results = "\n\n---\n\n".join(results) | |
| if use_code_block: | |
| formatted_results = format_code_block(formatted_results) | |
| yield formatted_results, format_code_block("\n".join(debug_info)) | |
| clone_button.click( | |
| fn=handle_clone, | |
| inputs=[repo_url, github_token, branch], | |
| outputs=[clone_status, file_list] | |
| ) | |
| analyze_button.click( | |
| fn=analyze_files, | |
| inputs=[ | |
| file_list, | |
| selected_model, | |
| stream_checkbox, | |
| code_block_checkbox | |
| ], | |
| outputs=[analysis_output, debug_output] | |
| ) | |
| app.load(update_status, outputs=[model_status, available_models]) | |
| return app | |
| if __name__ == "__main__": | |
| print(f""" | |
| Memulai Analisis Repository | |
| Waktu: {CURRENT_TIME} | |
| API: {OLLAMA_API} | |
| """) | |
| app = create_ui() | |
| app.launch() |