Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import time | |
| from pathlib import Path | |
| import subprocess | |
| import requests | |
| import json | |
| from datetime import datetime | |
| import textwrap | |
| import google.generativeai as genai | |
| import asyncio | |
| from typing import Generator, AsyncGenerator, List | |
| from openai import AsyncOpenAI | |
| import dotenv | |
| # Load environment variables | |
| dotenv.load_dotenv() | |
| # Metadata | |
| CURRENT_TIME = "2025-05-23 12:57:22" | |
| CURRENT_USER = "ErRickow" | |
| # Default API Keys (fallback if user doesn't provide their own) | |
| DEFAULT_XAI_KEY = os.getenv( | |
| "XAI_API_KEY", | |
| "xai-vfjhklL384Z4HKdItsZomqpFlXubTZJAFnISQUpV7dE8lRnWwYBVPSCxSTlu08wDbAcv720bx2dDiQ9x", | |
| ) | |
| DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY") | |
| # API settings | |
| OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434") | |
| XAI_BASE_URL = "https://api.x.ai/v1" | |
| # Model lists | |
| OLLAMA_MODELS = [ | |
| "llama2", | |
| "codellama", | |
| "mistral", | |
| "neural-chat", | |
| "starling-lm", | |
| "dolphin-phi", | |
| "phi", | |
| "orca-mini", | |
| ] | |
| XAI_MODELS = [ | |
| "grok-2-latest", | |
| "grok-1", | |
| ] | |
| GEMINI_MODELS = [ | |
| "gemini-1.5-mini", | |
| "gemini-pro-vision", | |
| ] | |
| # Help texts | |
| GITHUB_TOKEN_HELP = """ | |
| ### Cara Mendapatkan GitHub Token: | |
| 1. Kunjungi [GitHub Token Settings](https://github.com/settings/tokens) | |
| 2. Klik "Generate new token" > "Generate new token (classic)" | |
| 3. Beri nama token Anda di "Note" | |
| 4. Pilih scope: | |
| - `repo` (untuk akses repository private) | |
| - `read:packages` (opsional, untuk akses package) | |
| 5. Klik "Generate token" | |
| 6. **PENTING**: Salin token segera! Token hanya ditampilkan sekali | |
| Token diperlukan untuk: | |
| - Mengakses repository private | |
| - Clone repository dengan rate limit lebih tinggi | |
| - Mengakses fitur GitHub API | |
| """ | |
| GEMINI_API_HELP = """ | |
| ### Cara Mendapatkan Gemini API Key: | |
| 1. Kunjungi [Google AI Studio](https://makersuite.google.com/app/apikey) | |
| 2. Login dengan akun Google Anda | |
| 3. Klik "Create API Key" | |
| 4. Salin API Key yang dihasilkan | |
| Catatan: | |
| - Gemini memberikan kuota gratis setiap bulan | |
| - Key bisa dibuat ulang jika diperlukan | |
| - Monitor penggunaan di [Google Cloud Console](https://console.cloud.google.com/) | |
| """ | |
| OLLAMA_HELP = """ | |
| ### Cara Menggunakan Ollama: | |
| 1. Install Ollama dari [ollama.ai](https://ollama.ai) | |
| 2. Jalankan Ollama di komputer Anda | |
| 3. Pastikan Ollama berjalan di http://localhost:11434 | |
| Catatan: | |
| - Ollama berjalan secara lokal di komputer Anda | |
| - Tidak memerlukan API key | |
| - Ideal untuk privasi dan penggunaan offline | |
| """ | |
| XAI_API_HELP = """ | |
| ### Cara Mendapatkan X.AI (Grok) API Key: | |
| 1. Kunjungi [X.AI Developer Portal](https://x.ai) | |
| 2. Daftar/Login ke akun Anda | |
| 3. Buat API Key baru | |
| 4. Salin API Key | |
| Note: | |
| - Jika tidak diisi, akan menggunakan API key default | |
| - Masukkan API key Anda sendiri jika default mencapai limit | |
| """ | |
| class AIProvider: | |
| OLLAMA = "ollama" | |
| GEMINI = "gemini" | |
| XAI = "xai" | |
| class RepoAnalyzer: | |
| def __init__(self): | |
| self.current_repo = None | |
| self.repo_content = {} | |
| self.chat_history = [] | |
| async def stream_xai_response( | |
| self, prompt: str, api_key: str = None, model: str = "grok-2-latest" | |
| ) -> AsyncGenerator[str, None]: | |
| """Stream response dari X.AI (Grok) API""" | |
| try: | |
| # Use default key if none provided | |
| actual_key = api_key if api_key else DEFAULT_XAI_KEY | |
| if not actual_key: | |
| yield "⚠️ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key." | |
| return | |
| client = AsyncOpenAI(api_key=actual_key, base_url=XAI_BASE_URL) | |
| # Prepare messages with repository context if available | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia.", | |
| } | |
| ] | |
| if self.current_repo: | |
| context = f"Repository: {self.current_repo}\n\n" | |
| repo_files = "\n".join(list(self.repo_content.keys())) | |
| context += f"Files in repository:\n{repo_files}\n\n" | |
| messages.append({"role": "system", "content": context}) | |
| messages.append({"role": "user", "content": prompt}) | |
| stream = await client.chat.completions.create( | |
| model=model, messages=messages, stream=True | |
| ) | |
| full_response = "" | |
| async for chunk in stream: | |
| if chunk.choices[0].delta.content: | |
| content = chunk.choices[0].delta.content | |
| full_response += content | |
| yield content | |
| self.chat_history.append({"role": "user", "content": prompt}) | |
| self.chat_history.append({"role": "assistant", "content": full_response}) | |
| except Exception as e: | |
| error_msg = f"⚠️ Error dalam X.AI API: {str(e)}" | |
| print(error_msg) | |
| yield error_msg | |
| async def stream_gemini_response( | |
| self, prompt: str, api_key: str | |
| ) -> AsyncGenerator[str, None]: | |
| """Stream response dari Gemini API""" | |
| try: | |
| if not api_key: | |
| yield "⚠️ API Key Gemini diperlukan. Klik icon bantuan (?) di samping input API Key untuk panduan mendapatkan key." | |
| return | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel("gemini-pro") | |
| # Tambahkan konteks repository jika ada | |
| if self.current_repo: | |
| context = f"Repository: {self.current_repo}\n\n" | |
| repo_files = "\n".join(list(self.repo_content.keys())) | |
| context += f"Files in repository:\n{repo_files}\n\n" | |
| prompt = context + prompt | |
| response = model.generate_content( | |
| prompt, | |
| generation_config={"temperature": 0.7, "top_p": 0.8, "top_k": 40}, | |
| stream=True, | |
| ) | |
| full_response = "" | |
| async for chunk in response: | |
| if chunk.text: | |
| full_response += chunk.text | |
| yield chunk.text | |
| self.chat_history.append({"role": "user", "content": prompt}) | |
| self.chat_history.append({"role": "assistant", "content": full_response}) | |
| except Exception as e: | |
| error_msg = f"⚠️ Error dalam Gemini API: {str(e)}\n\nPastikan API Key valid dan memiliki kuota yang cukup." | |
| print(error_msg) | |
| yield error_msg | |
| def clone_repository( | |
| self, repo_url: str, github_token: str, branch: str = None | |
| ) -> tuple[bool, str]: | |
| """Clone repository GitHub dengan autentikasi""" | |
| if not repo_url: | |
| return False, "⚠️ URL repository diperlukan" | |
| repo_name = repo_url.split("/")[-1].replace(".git", "") | |
| if os.path.exists(repo_name): | |
| subprocess.run(["rm", "-rf", repo_name], check=True) | |
| try: | |
| owner_repo = "/".join(repo_url.split("/")[-2:]) | |
| # Cek apakah repository private | |
| headers = {"Authorization": f"token {github_token}"} if github_token else {} | |
| repo_check = requests.get( | |
| f"https://api.github.com/repos/{owner_repo}", headers=headers | |
| ) | |
| if repo_check.status_code == 404: | |
| return False, "⚠️ Repository tidak ditemukan. Periksa URL repository." | |
| elif repo_check.status_code == 401: | |
| return ( | |
| False, | |
| "⚠️ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token.", | |
| ) | |
| elif repo_check.status_code == 403 and repo_check.json().get( | |
| "private", False | |
| ): | |
| return ( | |
| False, | |
| "⚠️ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan.", | |
| ) | |
| auth_url = ( | |
| f"https://{github_token}@github.com/{owner_repo}" | |
| if github_token | |
| else f"https://github.com/{owner_repo}" | |
| ) | |
| cmd = ["git", "clone"] | |
| if branch: | |
| cmd.extend(["--branch", branch]) | |
| cmd.append(auth_url) | |
| process = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| env=dict(os.environ, GIT_ASKPASS="echo", GIT_TERMINAL_PROMPT="0"), | |
| ) | |
| if process.returncode == 0: | |
| self.current_repo = repo_name | |
| # Scan dan simpan konten repository | |
| file_count = 0 | |
| for file_path in Path(repo_name).rglob("*"): | |
| if file_path.is_file() and ".git" not in str(file_path): | |
| success, content = self.read_file_safely(str(file_path)) | |
| if success: | |
| self.repo_content[str(file_path)] = content | |
| file_count += 1 | |
| return ( | |
| True, | |
| f"✅ Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini.", | |
| ) | |
| else: | |
| return False, f"⚠️ Gagal clone repository:\n{process.stderr}" | |
| except Exception as e: | |
| return False, f"⚠️ Error: {str(e)}" | |
| def read_file_safely(self, file_path: str) -> tuple[bool, str]: | |
| """Baca file dengan aman menggunakan berbagai encoding""" | |
| encodings = ["utf-8", "latin-1", "cp1252"] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, "r", encoding=encoding) as f: | |
| content = f.read() | |
| return True, content | |
| except Exception as e: | |
| continue | |
| return False, "Tidak dapat membaca file dengan encoding yang didukung" | |
| def create_ui(): | |
| analyzer = RepoAnalyzer() | |
| with gr.Blocks(title="Open Repo AI", theme=gr.themes.Soft()) as app: | |
| gr.Markdown(""" | |
| <style> | |
| .container { max-width: 100% !important; padding: 1rem; } | |
| .mobile-full { width: 100% !important; } | |
| .file-list { margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 4px; } | |
| .file-item { display: flex; justify-content: space-between; padding: 5px 0; } | |
| .file-remove { color: red; cursor: pointer; } | |
| @media (max-width: 768px) { | |
| .gr-form { flex-direction: column !important; } | |
| .gr-group { margin: 0.5rem 0 !important; } | |
| } | |
| </style> | |
| """) | |
| with gr.Row(elem_classes="container"): | |
| gr.Markdown(f""" | |
| # AI Github Repository Chat | |
| 📅 {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} UTC | |
| """) | |
| with gr.Tabs() as tabs: # Main tabs container | |
| # Configuration Tab | |
| with gr.Tab("🛠️ Konfigurasi"): | |
| provider = gr.Radio( | |
| choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA], | |
| label="Penyedia AI", | |
| value=AIProvider.XAI, | |
| ) | |
| with gr.Group() as api_settings: | |
| with gr.Row(): | |
| xai_key = gr.Textbox( | |
| label="X.AI (Grok) API Key", | |
| type="password", | |
| placeholder="Opsional - Klik icon (?) untuk info", | |
| show_label=True, | |
| scale=3, | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown(XAI_API_HELP) | |
| with gr.Row(): | |
| gemini_key = gr.Textbox( | |
| label="Gemini API Key", | |
| type="password", | |
| placeholder="Opsional - Kosongkan untuk gunakan key default", | |
| show_label=True, | |
| scale=3, | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown(GEMINI_API_HELP) | |
| # Model selection | |
| with gr.Row(): | |
| model_dropdown = gr.Dropdown( | |
| label="Model AI", | |
| choices=XAI_MODELS, | |
| value="grok-2-latest", | |
| interactive=True, | |
| ) | |
| # Repository Analysis Tab | |
| with gr.Tab("📊 Analisis Repository"): | |
| with gr.Group(): | |
| with gr.Row(): | |
| repo_url = gr.Textbox( | |
| label="URL Repository GitHub", | |
| placeholder="https://github.com/username/repository", | |
| elem_classes="mobile-full", | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| github_token = gr.Textbox( | |
| label="Token GitHub", | |
| type="password", | |
| placeholder="Klik icon (?) untuk panduan", | |
| elem_classes="mobile-full", | |
| ) | |
| gr.Markdown(GITHUB_TOKEN_HELP) | |
| with gr.Column(scale=1): | |
| branch = gr.Textbox( | |
| label="Branch (opsional)", | |
| placeholder="main", | |
| elem_classes="mobile-full", | |
| ) | |
| clone_button = gr.Button( | |
| "🔄 Clone Repository", variant="primary", elem_classes="mobile-full" | |
| ) | |
| clone_status = gr.Markdown( | |
| value="", label="Status Repository", elem_classes="mobile-full" | |
| ) | |
| # File Selection with improved handling | |
| with gr.Group(): | |
| gr.Markdown("### 📎 File yang Dipilih") | |
| with gr.Row(): | |
| file_selector = gr.Dropdown( | |
| label="Pilih File dari Repository", | |
| choices=[], | |
| multiselect=True, | |
| value=[], | |
| allow_custom_value=True, # Allow custom values | |
| max_choices=None, # Allow multiple selections | |
| elem_classes="mobile-full", | |
| ) | |
| file_list = gr.HTML( | |
| value="<div class='file-list'>Belum ada file yang dipilih</div>", | |
| label="Daftar File Terpilih", | |
| ) | |
| # Event Handlers | |
| def handle_clone(repo_url, github_token, branch): | |
| if not repo_url: | |
| return ( | |
| "⚠️ URL repository diperlukan!", | |
| gr.Dropdown(choices=[]), | |
| "<div class='file-list'>Belum ada file yang dipilih</div>", | |
| ) | |
| success, message = analyzer.clone_repository( | |
| repo_url, github_token, branch | |
| ) | |
| if success: | |
| # Get list of files and sort them | |
| files = sorted(list(analyzer.repo_content.keys())) | |
| # Update dropdown with new choices | |
| return ( | |
| message, | |
| gr.Dropdown(choices=files, value=[]), | |
| "<div class='file-list'>Belum ada file yang dipilih</div>", | |
| ) | |
| return ( | |
| message, | |
| gr.Dropdown(choices=[]), | |
| "<div class='file-list'>Belum ada file yang dipilih</div>", | |
| ) | |
| def update_file_list(selected): | |
| if not selected: | |
| return ( | |
| "<div class='file-list'>Belum ada file yang dipilih</div>" | |
| ) | |
| html = "<div class='file-list'>" | |
| for file in selected: | |
| html += f"<div class='file-item'><span>{file}</span></div>" | |
| html += "</div>" | |
| return html | |
| # Connect clone button | |
| clone_button.click( | |
| fn=handle_clone, | |
| inputs=[repo_url, github_token, branch], | |
| outputs=[clone_status, file_selector, file_list], | |
| ) | |
| # Update file list when selection changes | |
| file_selector.change( | |
| fn=update_file_list, inputs=[file_selector], outputs=[file_list] | |
| ) | |
| # Chat Interface | |
| with gr.Group(): | |
| chat_history = gr.Chatbot( | |
| label="📝 Riwayat Chat", | |
| height=500, | |
| show_label=True, | |
| type="messages", | |
| elem_classes="mobile-full", | |
| ) | |
| with gr.Row(): | |
| chat_input = gr.Textbox( | |
| label="💭 Tanyakan tentang Repository", | |
| placeholder="Ketik pertanyaan Anda di sini...", | |
| lines=3, | |
| elem_classes="mobile-full", | |
| ) | |
| send_button = gr.Button("📤 Kirim", variant="primary") | |
| loading_indicator = gr.HTML( | |
| '<div id="loading" style="display:none">Memproses permintaan...</div>' | |
| ) | |
| async def handle_chat( | |
| message, | |
| history, | |
| provider_choice, | |
| model_name, | |
| xai_key, | |
| gemini_key, | |
| selected_files, | |
| ): | |
| if not analyzer.current_repo: | |
| yield history + [ | |
| [ | |
| message, | |
| "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan.", | |
| ] | |
| ] | |
| return | |
| history = history or [] | |
| history.append([message, ""]) | |
| try: | |
| file_context = "" | |
| if selected_files: | |
| file_context = "\n\nFile yang dipilih:\n" | |
| for file in selected_files: | |
| content = analyzer.repo_content.get(file, "") | |
| if content: # Only include files that exist | |
| file_context += f"\n{file}:\n```\n{content}\n```\n" | |
| enhanced_message = f"{message}\n{file_context}" | |
| full_response = "" | |
| if provider_choice == AIProvider.XAI: | |
| async for chunk in analyzer.stream_xai_response( | |
| enhanced_message, xai_key, model_name | |
| ): | |
| full_response += chunk | |
| # Add delay between chunks for readability | |
| await asyncio.sleep(0.05) | |
| history[-1][1] = full_response | |
| yield history | |
| elif provider_choice == AIProvider.GEMINI: | |
| async for chunk in analyzer.stream_gemini_response( | |
| enhanced_message, gemini_key or DEFAULT_GEMINI_KEY | |
| ): | |
| full_response += chunk | |
| # Add delay between chunks for readability | |
| await asyncio.sleep(0.05) | |
| history[-1][1] = full_response | |
| yield history | |
| else: # OLLAMA | |
| response = analyze_with_ollama(model_name, enhanced_message) | |
| # Simulate streaming for OLLAMA with delay | |
| words = response.split() | |
| for i in range(len(words)): | |
| full_response = " ".join(words[: i + 1]) | |
| await asyncio.sleep(0.05) | |
| history[-1][1] = full_response | |
| yield history | |
| except Exception as e: | |
| history[-1][1] = f"⚠️ Error: {str(e)}" | |
| yield history | |
| # Connect chat events with file selection | |
| send_event = send_button.click( | |
| fn=handle_chat, | |
| inputs=[ | |
| chat_input, | |
| chat_history, | |
| provider, | |
| model_dropdown, | |
| xai_key, | |
| gemini_key, | |
| file_selector, | |
| ], | |
| outputs=chat_history, | |
| show_progress=True, | |
| ).then(fn=lambda: gr.update(value=""), outputs=chat_input) | |
| input_event = chat_input.submit( | |
| fn=handle_chat, | |
| inputs=[ | |
| chat_input, | |
| chat_history, | |
| provider, | |
| model_dropdown, | |
| xai_key, | |
| gemini_key, | |
| file_selector, | |
| ], | |
| outputs=chat_history, | |
| show_progress=True, | |
| ).then(fn=lambda: gr.update(value=""), outputs=chat_input) | |
| return app | |
| if __name__ == "__main__": | |
| print(f""" | |
| 🚀 Memulai Repository Chat Analysis | |
| """) | |
| app = create_ui() | |
| app.launch(share=True) |