import gradio as gr import os from pathlib import Path import subprocess import requests import json import time import asyncio from datetime import datetime import textwrap import google.generativeai as genai from typing import Generator, AsyncGenerator, List from openai import AsyncOpenAI import dotenv # Load environment variables dotenv.load_dotenv() # Metadata CURRENT_TIME = "2025-05-23 12:57:22" CURRENT_USER = "ErRickow" # Default API Keys (fallback if user doesn't provide their own) DEFAULT_XAI_KEY = os.getenv("XAI_API_KEY", "xai-vfjhklL384Z4HKdItsZomqpFlXubTZJAFnISQUpV7dE8lRnWwYBVPSCxSTlu08wDbAcv720bx2dDiQ9x") DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY") # API settings OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434") XAI_BASE_URL = "https://api.x.ai/v1" # Model lists OLLAMA_MODELS = [ "llama2", "codellama", "mistral", "neural-chat", "starling-lm", "dolphin-phi", "phi", "orca-mini" ] XAI_MODELS = [ "grok-2-latest", "grok-1", ] GEMINI_MODELS = [ "gemini-1.5-mini", "gemini-pro-vision", ] GITHUB_TOKEN_HELP = """ ### Cara Mendapatkan GitHub Token: 1. Kunjungi [GitHub Token Settings](https://github.com/settings/tokens) 2. Klik "Generate new token" > "Generate new token (classic)" 3. Beri nama token Anda di "Note" 4. Pilih scope: - `repo` (untuk akses repository private) - `read:packages` (opsional, untuk akses package) 5. Klik "Generate token" 6. **PENTING**: Salin token segera! Token hanya ditampilkan sekali Token diperlukan untuk: - Mengakses repository private - Clone repository dengan rate limit lebih tinggi - Mengakses fitur GitHub API """ GEMINI_API_HELP = """ ### Cara Mendapatkan Gemini API Key: 1. Kunjungi [Google AI Studio](https://makersuite.google.com/app/apikey) 2. Login dengan akun Google Anda 3. Klik "Create API Key" 4. Salin API Key yang dihasilkan Catatan: - Gemini memberikan kuota gratis setiap bulan - Key bisa dibuat ulang jika diperlukan - Monitor penggunaan di [Google Cloud Console](https://console.cloud.google.com/) """ OLLAMA_HELP = """ ### Cara Menggunakan Ollama: 1. Install Ollama dari [ollama.ai](https://ollama.ai) 2. Jalankan Ollama di komputer Anda 3. Pastikan Ollama berjalan di http://localhost:11434 Catatan: - Ollama berjalan secara lokal di komputer Anda - Tidak memerlukan API key - Ideal untuk privasi dan penggunaan offline """ # Help texts XAI_API_HELP = """ ### Cara Mendapatkan X.AI (Grok) API Key: 1. Kunjungi [X.AI Developer Portal](https://x.ai) 2. Daftar/Login ke akun Anda 3. Buat API Key baru 4. Salin API Key Note: - Jika tidak diisi, akan menggunakan API key default - Masukkan API key Anda sendiri jika default mencapai limit """ class AIProvider: OLLAMA = "ollama" GEMINI = "gemini" XAI = "xai" class RepoAnalyzer: def __init__(self): self.current_repo = None self.repo_content = {} self.chat_history = [] async def stream_xai_response(self, prompt: str, api_key: str = None, model: str = "grok-2-latest") -> AsyncGenerator[str, None]: """Stream response dari X.AI (Grok) API""" try: # Use default key if none provided actual_key = api_key if api_key else DEFAULT_XAI_KEY if not actual_key: yield "⚠️ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key." return client = AsyncOpenAI( api_key=actual_key, base_url=XAI_BASE_URL ) # Prepare messages with repository context if available messages = [ {"role": "system", "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia."} ] if self.current_repo: context = f"Repository: {self.current_repo}\n\n" repo_files = "\n".join(list(self.repo_content.keys())) context += f"Files in repository:\n{repo_files}\n\n" messages.append({"role": "system", "content": context}) messages.append({"role": "user", "content": prompt}) stream = await client.chat.completions.create( model=model, messages=messages, stream=True ) full_response = "" async for chunk in stream: if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_response += content yield content self.chat_history.append({"role": "user", "content": prompt}) self.chat_history.append({"role": "assistant", "content": full_response}) except Exception as e: error_msg = f"⚠️ Error dalam X.AI API: {str(e)}" print(error_msg) yield error_msg async def stream_gemini_response(self, prompt: str, api_key: str) -> AsyncGenerator[str, None]: """Stream response dari Gemini API""" try: if not api_key: yield "⚠️ API Key Gemini diperlukan. Klik icon bantuan (?) di samping input API Key untuk panduan mendapatkan key." return genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-pro') # Tambahkan konteks repository jika ada if self.current_repo: context = f"Repository: {self.current_repo}\n\n" repo_files = "\n".join(list(self.repo_content.keys())) context += f"Files in repository:\n{repo_files}\n\n" prompt = context + prompt response = model.generate_content( prompt, generation_config={ "temperature": 0.7, "top_p": 0.8, "top_k": 40 }, stream=True ) full_response = "" async for chunk in response: if chunk.text: full_response += chunk.text yield chunk.text self.chat_history.append({"role": "user", "content": prompt}) self.chat_history.append({"role": "assistant", "content": full_response}) except Exception as e: error_msg = f"⚠️ Error dalam Gemini API: {str(e)}\n\nPastikan API Key valid dan memiliki kuota yang cukup." print(error_msg) yield error_msg def clone_repository(self, repo_url: str, github_token: str, branch: str = None) -> tuple[bool, str]: """Clone repository GitHub dengan autentikasi""" if not repo_url: return False, "⚠️ URL repository diperlukan" repo_name = repo_url.split('/')[-1].replace('.git', '') if os.path.exists(repo_name): subprocess.run(['rm', '-rf', repo_name], check=True) try: owner_repo = '/'.join(repo_url.split('/')[-2:]) # Cek apakah repository private headers = {'Authorization': f'token {github_token}'} if github_token else {} repo_check = requests.get(f"https://api.github.com/repos/{owner_repo}", headers=headers) if repo_check.status_code == 404: return False, "⚠️ Repository tidak ditemukan. Periksa URL repository." elif repo_check.status_code == 401: return False, "⚠️ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token." elif repo_check.status_code == 403 and repo_check.json().get('private', False): return False, "⚠️ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan." auth_url = f"https://{github_token}@github.com/{owner_repo}" if github_token else f"https://github.com/{owner_repo}" cmd = ['git', 'clone'] if branch: cmd.extend(['--branch', branch]) cmd.append(auth_url) process = subprocess.run( cmd, capture_output=True, text=True, env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0') ) if process.returncode == 0: self.current_repo = repo_name # Scan dan simpan konten repository file_count = 0 for file_path in Path(repo_name).rglob('*'): if file_path.is_file() and '.git' not in str(file_path): success, content = self.read_file_safely(str(file_path)) if success: self.repo_content[str(file_path)] = content file_count += 1 return True, f"✅ Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini." else: return False, f"⚠️ Gagal clone repository:\n{process.stderr}" except Exception as e: return False, f"⚠️ Error: {str(e)}" def read_file_safely(self, file_path: str) -> tuple[bool, str]: """Baca file dengan aman menggunakan berbagai encoding""" encodings = ['utf-8', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() return True, content except Exception as e: continue return False, "Tidak dapat membaca file dengan encoding yang didukung" async def handle_chat(message, history, provider_choice, model_name, xai_key, gemini_key, selected_files): if not analyzer.current_repo: yield history + [[message, "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan."]] return history = history or [] history.append([message, ""]) try: # Add context about selected files to the prompt file_context = "" if selected_files: file_context = "\n\nFile yang dipilih:\n" for file in selected_files: content = analyzer.repo_content.get(file, "") escaped_content = content.replace('`', r'\`') html = ( '
' f'
' f'{file}' f'' '
' '
' f'
{content}
' '
' '
' ) file_context += html enhanced_message = f"{message}\n{file_context}" full_response = "" if provider_choice == AIProvider.XAI: async for chunk in analyzer.stream_xai_response(enhanced_message, xai_key, model_name): # Wrap code blocks in custom styling chunk = process_code_blocks(chunk) full_response += chunk await asyncio.sleep(0.08) history[-1][1] = full_response yield history elif provider_choice == AIProvider.GEMINI: async for chunk in analyzer.stream_gemini_response(enhanced_message, gemini_key or DEFAULT_GEMINI_KEY): chunk = process_code_blocks(chunk) full_response += chunk await asyncio.sleep(0.08) history[-1][1] = full_response yield history else: # OLLAMA response = analyze_with_ollama(model_name, enhanced_message) response = process_code_blocks(response) words = response.split() for i in range(len(words)): full_response = " ".join(words[:i+1]) await asyncio.sleep(0.08) history[-1][1] = full_response yield history except Exception as e: history[-1][1] = f"⚠️ Error: {str(e)}" yield history def process_code_blocks(text): """Process markdown code blocks to use custom artifact styling""" import re # Pattern for code blocks with language specification pattern = r"```(\w+)\n(.*?)```" def replace_code_block(match): language = match.group(1) code = match.group(2) escaped_code = code.replace('`', r'\`') html = ( '
' f'
' f'{language}' f'' '
' '
' f'
{code}
' '
' '
' ) return html # Replace all code blocks in the text processed_text = re.sub(pattern, replace_code_block, text, flags=re.DOTALL) return processed_text analyzer = RepoAnalyzer() def create_ui(): # Get local time local_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") analyzer = RepoAnalyzer() with gr.Blocks(title="Repository Chat Analysis", theme=gr.themes.Soft()) as app: gr.Markdown(""" """) # Header with gr.Row(elem_classes="container"): gr.Markdown(f"# 🤖 Repository Chat Analysis\n\n📅 {local_time}") # Main Interface with gr.Tabs() as tabs: # Configuration Tab with gr.Tab("🛠️ Konfigurasi", elem_classes="mobile-full"): provider = gr.Radio( choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA], label="Penyedia AI", value=AIProvider.XAI, interactive=True, elem_classes="mobile-full" ) with gr.Group() as api_settings: # XAI API Key with gr.Group(): with gr.Row(): xai_key = gr.Textbox( label="X.AI (Grok) API Key", type="password", placeholder="Opsional - Klik icon (?) untuk info", show_label=True, scale=3 ) with gr.Column(scale=1): gr.Markdown(XAI_API_HELP) # Gemini API Key with gr.Group(): with gr.Row(): gemini_key = gr.Textbox( label="Gemini API Key", type="password", placeholder="Opsional - Kosongkan untuk gunakan key default", show_label=True, scale=3 ) with gr.Column(scale=1): gr.Markdown(GEMINI_API_HELP) # Model Selection with gr.Row(): model_dropdown = gr.Dropdown( label="Model AI", choices=XAI_MODELS, value="grok-2-latest", interactive=True, elem_classes="mobile-full" ) # Repository Analysis Tab with gr.Tab("📊 Analisis Repository", elem_classes="mobile-full"): # Repository Input Section with gr.Group(): with gr.Row(): repo_url = gr.Textbox( label="URL Repository GitHub", placeholder="https://github.com/username/repository", elem_classes="mobile-full" ) with gr.Row(): with gr.Column(scale=2): github_token = gr.Textbox( label="Token GitHub", type="password", placeholder="Klik icon (?) untuk panduan", elem_classes="mobile-full" ) with gr.Column(scale=1): branch = gr.Textbox( label="Branch (opsional)", placeholder="main", elem_classes="mobile-full" ) # Clone Repository Section clone_button = gr.Button( "🔄 Clone Repository", variant="primary", elem_classes="mobile-full" ) clone_status = gr.Markdown( value="", label="Status Repository", elem_classes="mobile-full" ) # File Selection Section with gr.Group(): gr.Markdown("### 📎 File yang Dipilih") with gr.Row(): file_selector = gr.Dropdown( label="Pilih File dari Repository", choices=[], multiselect=True, elem_classes="mobile-full" ) file_list = gr.HTML( value="
Belum ada file yang dipilih
", label="Daftar File Terpilih" ) # Chat Interface (Outside tabs for full width) with gr.Group(): chat_history = gr.Chatbot( label="📝 Riwayat Chat", elem_classes="fullscreen-chat", height=None, show_label=True, type="messages" ) with gr.Group(elem_classes="chat-input"): chat_input = gr.Textbox( label="💭 Tanyakan tentang Repository", placeholder="Ketik pertanyaan Anda di sini...", lines=3, elem_classes="mobile-full" ) with gr.Row(): clear_button = gr.Button("🧹 Bersihkan", variant="secondary") send_button = gr.Button("📤 Kirim", variant="primary") # Event Handlers def clear_chat_history(): return None def handle_clone(repo_url, github_token, branch): if not repo_url: return "⚠️ URL repository diperlukan!", [], [] success, message = analyzer.clone_repository(repo_url, github_token, branch) if success: # Get files with metadata for better display files = [] file_metadata = [] for file_path in analyzer.repo_content.keys(): files.append(file_path) content = analyzer.repo_content[file_path] size = len(content.encode('utf-8')) file_metadata.append({ 'path': file_path, 'size': f"{size / 1024:.1f} KB", 'lines': len(content.splitlines()) }) # Create rich HTML display for clone status status_html = f"""

✅ Repository berhasil di-clone!

Nama: {analyzer.current_repo}

Jumlah file: {len(files)}

Status: Siap untuk analisis

""" # Create initial file list display file_list_html = create_file_list_html(file_metadata) return status_html, files, file_list_html return message, [], [] def create_file_list_html(file_metadata): if not file_metadata: return "
Belum ada file yang dipilih
" html = "
" html += "" for metadata in file_metadata: html += f"""
{metadata['path']}
{metadata['size']} {metadata['lines']} lines
""" html += "
" return html def update_file_list(selected): if not selected: return "
Belum ada file yang dipilih
" file_metadata = [] for file_path in selected: if file_path in analyzer.repo_content: content = analyzer.repo_content[file_path] size = len(content.encode('utf-8')) file_metadata.append({ 'path': file_path, 'size': f"{size / 1024:.1f} KB", 'lines': len(content.splitlines()) }) return create_file_list_html(file_metadata) # Modified handle_chat function with better file context handling async def handle_chat(message, history, provider_choice, model_name, xai_key, gemini_key, selected_files): if not analyzer.current_repo: yield history + [[message, "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan."]] return history = history or [] history.append([message, ""]) try: # Improved file context formatting file_context = "" if selected_files: file_context = "\n\nAnalisis berdasarkan file yang dipilih:\n" for file in selected_files: content = analyzer.repo_content.get(file, "") file_context += f"\n### File: {file}\n```\n{content}\n```\n" enhanced_message = f"{message}\n{file_context}" # Rest of the handle_chat function remains the same full_response = "" if provider_choice == AIProvider.XAI: async for chunk in analyzer.stream_xai_response(enhanced_message, xai_key, model_name): chunk = process_code_blocks(chunk) full_response += chunk await asyncio.sleep(0.08) history[-1][1] = full_response yield history elif provider_choice == AIProvider.GEMINI: async for chunk in analyzer.stream_gemini_response(enhanced_message, gemini_key or DEFAULT_GEMINI_KEY): chunk = process_code_blocks(chunk) full_response += chunk await asyncio.sleep(0.08) history[-1][1] = full_response yield history else: # OLLAMA response = analyze_with_ollama(model_name, enhanced_message) response = process_code_blocks(response) words = response.split() for i in range(len(words)): full_response = " ".join(words[:i+1]) await asyncio.sleep(0.08) history[-1][1] = full_response yield history except Exception as e: history[-1][1] = f"⚠️ Error: {str(e)}" yield history # Add additional CSS for improved repository display gr.Markdown(""" """) return app if __name__ == "__main__": print(f""" 🚀 Memulai Repository Chat Analysis 📅 Current Date and Time (UTC): {datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")} """) app = create_ui() app.launch( share=True, show_error=True, server_name="0.0.0.0", server_port=7860 )