import gradio as gr import os import time from pathlib import Path import subprocess import requests import json from datetime import datetime import textwrap import google.generativeai as genai import asyncio from typing import Generator, AsyncGenerator, List from openai import AsyncOpenAI import dotenv # Load environment variables dotenv.load_dotenv() # Metadata# Metadata CURRENT_TIME = datetime.now().strftime("%Y-%m-%d %H:%M:%S") CURRENT_USER = os.getenv("USER", "ErRickow") # Default API Keys (fallback if user doesn't provide their own) DEFAULT_XAI_KEY = os.getenv( "XAI_API_KEY", "xai-vfjhklL384Z4HKdItsZomqpFlXubTZJAFnISQUpV7dE8lRnWwYBVPSCxSTlu08wDbAcv720bx2dDiQ9x", ) DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY") GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/" # API settings OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434") XAI_BASE_URL = "https://api.x.ai/v1" # Model lists OLLAMA_MODELS = [ "llama2", "codellama", "mistral", "neural-chat", "starling-lm", "dolphin-phi", "phi", "orca-mini", ] XAI_MODELS = [ "grok-2-latest", "grok-1", ] GEMINI_MODELS = [ "gemini-1.5-mini", "gemini-pro-vision", ] # Help texts GITHUB_TOKEN_HELP = """ ### Cara Mendapatkan GitHub Token: 1. Kunjungi [GitHub Token Settings](https://github.com/settings/tokens) 2. Klik "Generate new token" > "Generate new token (classic)" 3. Beri nama token Anda di "Note" 4. Pilih scope: - `repo` (untuk akses repository private) - `read:packages` (opsional, untuk akses package) 5. Klik "Generate token" 6. **PENTING**: Salin token segera! Token hanya ditampilkan sekali Token diperlukan untuk: - Mengakses repository private - Clone repository dengan rate limit lebih tinggi - Mengakses fitur GitHub API """ GEMINI_API_HELP = """ ### Cara Mendapatkan Gemini API Key: 1. Kunjungi [Google AI Studio](https://makersuite.google.com/app/apikey) 2. Login dengan akun Google Anda 3. Klik "Create API Key" 4. Salin API Key yang dihasilkan Catatan: - Gemini memberikan kuota gratis setiap bulan - Key bisa dibuat ulang jika diperlukan - Monitor penggunaan di [Google Cloud Console](https://console.cloud.google.com/) """ OLLAMA_HELP = """ ### Cara Menggunakan Ollama: 1. Install Ollama dari [ollama.ai](https://ollama.ai) 2. Jalankan Ollama di komputer Anda 3. Pastikan Ollama berjalan di http://localhost:11434 Catatan: - Ollama berjalan secara lokal di komputer Anda - Tidak memerlukan API key - Ideal untuk privasi dan penggunaan offline """ XAI_API_HELP = """ ### Cara Mendapatkan X.AI (Grok) API Key: 1. Kunjungi [X.AI Developer Portal](https://x.ai) 2. Daftar/Login ke akun Anda 3. Buat API Key baru 4. Salin API Key Note: - Jika tidak diisi, akan menggunakan API key default - Masukkan API key Anda sendiri jika default mencapai limit """ async def get_available_models(provider: str, api_key: str = None) -> List[str]: """Mendapatkan daftar model yang tersedia dari API.""" try: if provider == AIProvider.XAI: if not api_key and not DEFAULT_XAI_KEY: return ["grok-2-latest"], "⚠️ API Key diperlukan untuk mendapatkan daftar model lengkap" client = AsyncOpenAI(api_key=api_key or DEFAULT_XAI_KEY, base_url=XAI_BASE_URL) models = await client.models.list() available_models = [m.id for m in models.data if "grok" in m.id.lower()] return available_models, None elif provider == AIProvider.GEMINI: if not api_key and not DEFAULT_GEMINI_KEY: return ["gemini-pro"], "⚠️ API Key diperlukan untuk mendapatkan daftar model lengkap" genai.configure(api_key=api_key or DEFAULT_GEMINI_KEY) models = [m.name for m in genai.list_models() if "gemini" in m.name.lower()] return models, None else: # OLLAMA try: response = requests.get(f"{OLLAMA_API}/api/tags") if response.status_code == 200: models = [m["name"] for m in response.json()["models"]] return models, None return ["llama2"], f"⚠️ Error mengakses Ollama API: {response.status_code}" except Exception as e: return ["llama2"], f"⚠️ Error connecting to Ollama: {str(e)}" except Exception as e: return [], f"⚠️ Error mendapatkan daftar model: {str(e)}" class AIProvider: OLLAMA = "ollama" GEMINI = "gemini" XAI = "xai" class RepoAnalyzer: def __init__(self): self.current_repo = None self.repo_content = {} self.chat_history = [] async def stream_gemini_response( self, prompt: str, api_key: str = None, model: str = "gemini-pro" ) -> AsyncGenerator[str, None]: """Stream response dari Gemini API menggunakan OpenAI client""" try: actual_key = api_key if api_key else DEFAULT_GEMINI_KEY if not actual_key: yield "⚠️ API Key Gemini diperlukan. Klik icon bantuan (?) untuk panduan mendapatkan key." return # Gunakan OpenAI client untuk Gemini client = AsyncOpenAI(api_key=actual_key, base_url=GEMINI_BASE_URL) # Tambahkan konteks repository jika ada messages = [ { "role": "system", "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia.", } ] if self.current_repo: context = f"Repository: {self.current_repo}\n\n" repo_files = "\n".join(list(self.repo_content.keys())) context += f"Files in repository:\n{repo_files}\n\n" prompt = context + prompt messages.append({"role": "user", "content": prompt}) try: stream = await client.chat.completions.create( model=model, messages=messages, stream=True, temperature=0.7, top_p=0.8, max_tokens=4096, ) async for chunk in stream: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content except Exception as e: if "model not found" in str(e).lower(): yield f"⚠️ Model {model} tidak tersedia di Gemini API" elif "rate limit" in str(e).lower(): yield "⚠️ Rate limit tercapai. Coba lagi nanti atau gunakan API key yang berbeda." else: yield f"⚠️ Error saat streaming dari Gemini: {str(e)}" return except Exception as e: error_msg = f"⚠️ Error dalam Gemini API: {str(e)}\n\nPastikan:\n1. API Key valid\n2. Model {model} tersedia\n3. Anda memiliki kuota yang cukup" print(error_msg) yield error_msg async def stream_xai_response( self, prompt: str, api_key: str = None, model: str = "grok-2-latest" ) -> AsyncGenerator[str, None]: """Stream response dari X.AI (Grok) API dengan support berbagai model""" try: actual_key = api_key if api_key else DEFAULT_XAI_KEY if not actual_key: yield "⚠️ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key." return client = AsyncOpenAI(api_key=actual_key, base_url=XAI_BASE_URL) # Verifikasi model support try: model_info = await client.models.retrieve(model) if not any(c.type == "chat" for c in model_info.capabilities): yield f"⚠️ Model {model} tidak mendukung chat completion" return except Exception as e: yield f"⚠️ Error verifikasi model {model}: {str(e)}" return try: stream = await client.chat.completions.create( model=model, messages=[ { "role": "system", "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia.", }, {"role": "user", "content": prompt}, ], stream=True, ) except Exception as e: yield f"⚠️ Error streaming dari model {model}: {str(e)}" return async for chunk in stream: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content except Exception as e: yield f"⚠️ Error dalam X.AI API: {str(e)}\nPastikan:\n1. API Key valid\n2. Model {model} tersedia\n3. Anda memiliki akses ke model ini" def clone_repository(self, repo_url: str, github_token: str, branch: str = None) -> tuple[bool, str]: """Clone repository GitHub dengan autentikasi""" if not repo_url: return False, "⚠️ URL repository diperlukan" repo_name = repo_url.split("/")[-1].replace(".git", "") if os.path.exists(repo_name): subprocess.run(["rm", "-rf", repo_name], check=True) try: owner_repo = "/".join(repo_url.split("/")[-2:]) # Cek apakah repository private headers = {"Authorization": f"token {github_token}"} if github_token else {} repo_check = requests.get(f"https://api.github.com/repos/{owner_repo}", headers=headers) if repo_check.status_code == 404: return False, "⚠️ Repository tidak ditemukan. Periksa URL repository." elif repo_check.status_code == 401: return ( False, "⚠️ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token.", ) elif repo_check.status_code == 403 and repo_check.json().get("private", False): return ( False, "⚠️ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan.", ) auth_url = ( f"https://{github_token}@github.com/{owner_repo}" if github_token else f"https://github.com/{owner_repo}" ) cmd = ["git", "clone"] if branch: cmd.extend(["--branch", branch]) cmd.append(auth_url) process = subprocess.run( cmd, capture_output=True, text=True, env=dict(os.environ, GIT_ASKPASS="echo", GIT_TERMINAL_PROMPT="0"), ) if process.returncode == 0: self.current_repo = repo_name # Scan dan simpan konten repository file_count = 0 for file_path in Path(repo_name).rglob("*"): if file_path.is_file() and ".git" not in str(file_path): success, content = self.read_file_safely(str(file_path)) if success: self.repo_content[str(file_path)] = content file_count += 1 return ( True, f"✅ Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini.", ) else: return False, f"⚠️ Gagal clone repository:\n{process.stderr}" except Exception as e: return False, f"⚠️ Error: {str(e)}" def read_file_safely(self, file_path: str) -> tuple[bool, str]: """Baca file dengan aman menggunakan berbagai encoding""" encodings = ["utf-8", "latin-1", "cp1252"] for encoding in encodings: try: with open(file_path, "r", encoding=encoding) as f: content = f.read() return True, content except Exception as e: continue return False, "Tidak dapat membaca file dengan encoding yang didukung" analyzer = RepoAnalyzer() async def handle_chat( message, history, provider_choice, model_name, xai_key, gemini_key, selected_files, analyzer=analyzer, ): """Menangani interaksi chat dengan model AI""" try: if not analyzer.current_repo: new_message = { "role": "assistant", "content": "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan.", } history = history or [] history.append({"role": "user", "content": message}) history.append(new_message) yield history return history = history or [] user_message = f'
{message}
' history.append({"role": "user", "content": user_message}) history.append({"role": "assistant", "content": '
'}) # Add context about selected files file_context = "" if selected_files: file_context = "\n\nFile yang dipilih:\n" for file in selected_files: content = analyzer.repo_content.get(file, "") if content: file_context += f"\n{file}:\n```\n{content}\n```\n" enhanced_message = f"{message}\n{file_context}" full_response = "" try: if provider_choice == AIProvider.XAI: async for chunk in analyzer.stream_xai_response(enhanced_message, xai_key, model_name): full_response += chunk formatted_response = format_response(full_response) history[-1]["content"] = f'
{formatted_response}
' await asyncio.sleep(0.05) yield history elif provider_choice == AIProvider.GEMINI: if not gemini_key and not DEFAULT_GEMINI_KEY: error_msg = "⚠️ API Key Gemini diperlukan" history[-1]["content"] = f'
{error_msg}
' yield history return async for chunk in analyzer.stream_gemini_response(enhanced_message, gemini_key or DEFAULT_GEMINI_KEY): full_response += chunk formatted_response = format_response(full_response) history[-1]["content"] = f'
{formatted_response}
' await asyncio.sleep(0.05) yield history else: # OLLAMA try: response = analyze_with_ollama(model_name, enhanced_message) words = response.split() for i in range(len(words)): full_response = " ".join(words[: i + 1]) formatted_response = format_response(full_response) history[-1]["content"] = f'
{formatted_response}
' await asyncio.sleep(0.05) yield history except Exception as e: error_msg = f"⚠️ Error dengan Ollama: {str(e)}\nPastikan:\n1. Ollama berjalan di {OLLAMA_API}\n2. Model {model_name} sudah diinstall" history[-1]["content"] = f'
{error_msg}
' yield history except Exception as e: error_msg = f"⚠️ Error saat streaming response: {str(e)}" history[-1]["content"] = f'
{error_msg}
' yield history except Exception as e: error_msg = f"⚠️ Error umum: {str(e)}" history[-1]["content"] = f'
{error_msg}
' yield history def format_response(text): """Format response text with code blocks""" import re import uuid # Replace code blocks with styled versions def replace_code_block(match): language = match.group(1) or "" code = match.group(2) block_id = f"code-block-{uuid.uuid4().hex[:8]}" return f'''
{language}
{code}
''' # Replace ```language\ncode``` blocks text = re.sub(r"```(\w+)?\n(.*?)```", replace_code_block, text, flags=re.DOTALL) # Replace inline code text = re.sub(r"`([^`]+)`", r"\1", text) return text def clear_all(): """Clear semua input dan status""" return ( "", # repo_url "", # github_token "", # branch "", # clone_status [], # chat_history [], # file_selector "
Belum ada file yang dipilih
", # file_list "", # xai_key "", # gemini_key "grok-2-latest", # model_dropdown default value ) def create_ui(): global analyzer current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with gr.Blocks(title="Open Repo AI", theme=gr.themes.Soft()) as app: # CSS Styling gr.Markdown(""" """) # Header with gr.Row(elem_classes="container"): gr.Markdown(f""" # AI Github Repository Chat
Current Date and Time (UTC - YYYY-MM-DD HH:MM:SS formatted): {current_time}
""") # Main Tabs Container with gr.Tabs() as tabs: # Configuration Tab with gr.Tab("🛠️ Konfigurasi"): provider = gr.Radio( choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA], label="Penyedia AI", value=AIProvider.XAI, ) with gr.Group() as api_settings: # X.AI API Key section with gr.Row(): with gr.Column(scale=3): xai_key = gr.Textbox( label="X.AI (Grok) API Key", type="password", placeholder="Opsional - Klik (?) untuk info", show_label=True, ) with gr.Column(scale=1): gr.Markdown("""
❓ Cara Mendapatkan X.AI API Key
  1. Kunjungi X.AI Developer Portal
  2. Daftar/Login ke akun Anda
  3. Buat API Key baru
  4. Salin API Key

Catatan:

  • Jika tidak diisi, akan menggunakan API key default
  • Masukkan API key Anda sendiri jika default mencapai limit
""") # Gemini API Key section with gr.Row(): with gr.Column(scale=3): gemini_key = gr.Textbox( label="Gemini API Key", type="password", placeholder="Opsional - Klik (?) untuk info", show_label=True, ) with gr.Column(scale=1): gr.Markdown("""
❓ Cara Mendapatkan Gemini API Key
  1. Kunjungi Google AI Studio
  2. Login dengan akun Google Anda
  3. Klik "Create API Key"
  4. Salin API Key yang dihasilkan

Catatan:

  • Gemini memberikan kuota gratis setiap bulan
  • Key bisa dibuat ulang jika diperlukan
  • Monitor penggunaan di Google Cloud Console
""") with gr.Row(): model_dropdown = gr.Dropdown( label="Model AI", choices=XAI_MODELS, value="grok-2-latest", interactive=True, ) # Repository Analysis Tab with gr.Tab("📊 Analisis Repository"): with gr.Group(): # Repository URL input with gr.Row(): repo_url = gr.Textbox( label="URL Repository GitHub", placeholder="https://github.com/username/repository", elem_classes="mobile-full", ) # GitHub Token section with gr.Row(): with gr.Column(scale=2): github_token = gr.Textbox( label="Token GitHub", type="password", placeholder="Klik (?) untuk panduan", elem_classes="mobile-full", ) gr.Markdown("""
❓ Cara Mendapatkan GitHub Token
  1. Kunjungi GitHub Token Settings
  2. Klik "Generate new token" > "Generate new token (classic)"
  3. Beri nama token Anda di "Note"
  4. Pilih scope:
    • repo (untuk akses repository private)
    • read:packages (opsional, untuk akses package)
  5. Klik "Generate token"
  6. PENTING: Salin token segera! Token hanya ditampilkan sekali
""") with gr.Column(scale=1): branch = gr.Textbox( label="Branch (opsional)", placeholder="main", elem_classes="mobile-full", ) # Clone Button and Status clone_button = gr.Button( "🔄 Clone Repository", variant="primary", elem_classes="mobile-full", ) clone_status = gr.Markdown(value="", label="Status Repository", elem_classes="mobile-full") # File Selection with gr.Group(): gr.Markdown("### 📎 File yang Dipilih") with gr.Row(): file_selector = gr.Dropdown( label="Pilih File dari Repository", choices=[], multiselect=True, value=[], allow_custom_value=True, max_choices=None, elem_classes="mobile-full", ) file_list = gr.HTML( value="
Belum ada file yang dipilih
", label="Daftar File Terpilih", ) # Examples Tab with gr.Tab("💡 Examples"): gr.Markdown("""

🎯 Contoh Pertanyaan

Klik pada kategori untuk melihat contoh pertanyaan yang bisa diajukan

🔍 Analisis Kode
📚 Best Practices
🧪 Testing
📝 Dokumentasi
""") # Chat Interface (outside tabs) with gr.Group(): chat_history = gr.Chatbot( label="📝 Riwayat Chat", height=500, show_label=True, type="messages", elem_classes="mobile-full", ) with gr.Row(): chat_input = gr.Textbox( label="💭 Tanyakan tentang Repository", placeholder="Ketik pertanyaan Anda di sini...", lines=3, elem_classes="mobile-full", ) with gr.Column(scale=1): send_button = gr.Button("📤 Kirim", variant="primary") clear_button = gr.Button("🧹 Bersihkan Semua", variant="secondary") # Event Handlers def handle_clone(repo_url, github_token, branch): if not repo_url: return ( "⚠️ URL repository diperlukan!", gr.Dropdown(choices=[]), "
Belum ada file yang dipilih
", ) success, message = analyzer.clone_repository(repo_url, github_token, branch) if success: files = sorted(list(analyzer.repo_content.keys())) return ( message, gr.Dropdown(choices=files, value=[]), "
Belum ada file yang dipilih
", ) return ( message, gr.Dropdown(choices=[]), "
Belum ada file yang dipilih
", ) def update_file_list(selected): if not selected: return "
Belum ada file yang dipilih
" html = "
" for file in selected: html += f"
{file}
" html += "
" return html def clear_chat_history(): return [] def update_model_list(provider_choice, api_key=None): try: models, error = asyncio.run(get_available_models(provider_choice, api_key)) if error: return gr.Dropdown( choices=models, value=models[0] if models else None, label=f"Model AI ({error})", ) return gr.Dropdown( choices=models, value=models[0] if models else None, label="Model AI", ) except Exception as e: return gr.Dropdown( choices=["grok-2-latest"] if provider_choice == AIProvider.XAI else ["gemini-pro"], value="grok-2-latest" if provider_choice == AIProvider.XAI else "gemini-pro", label=f"Model AI (Error: {str(e)})", ) # Connect Events provider.change(fn=update_model_list, inputs=[provider, xai_key], outputs=[model_dropdown]) xai_key.change( fn=lambda p, k: update_model_list(p, k) if p == AIProvider.XAI else None, inputs=[provider, xai_key], outputs=[model_dropdown], ) gemini_key.change( fn=lambda p, k: update_model_list(p, k) if p == AIProvider.GEMINI else None, inputs=[provider, gemini_key], outputs=[model_dropdown], ) clone_button.click( fn=handle_clone, inputs=[repo_url, github_token, branch], outputs=[clone_status, file_selector, file_list], ) file_selector.change(fn=update_file_list, inputs=[file_selector], outputs=[file_list]) clear_button = gr.Button("🧹 Bersihkan Semua", variant="secondary") # Chat events clear_button.click( fn=clear_all, outputs=[ repo_url, github_token, branch, clone_status, chat_history, file_selector, file_list, xai_key, gemini_key, model_dropdown, ], ) send_button.click( fn=handle_chat, inputs=[ chat_input, chat_history, provider, model_dropdown, xai_key, gemini_key, file_selector, ], outputs=chat_history, show_progress=True, ).then(fn=lambda: gr.update(value=""), outputs=chat_input) chat_input.submit( fn=handle_chat, inputs=[ chat_input, chat_history, provider, model_dropdown, xai_key, gemini_key, file_selector, ], outputs=chat_history, show_progress=True, ).then(fn=lambda: gr.update(value=""), outputs=chat_input) return app if __name__ == "__main__": print(""" 🚀 Starting Repository Chat Analysis ==================================== """) app = create_ui() app.launch(share=True, server_name="0.0.0.0", server_port=7860, debug=True)