import gradio as gr import os from pathlib import Path import subprocess import requests import json from datetime import datetime import textwrap import google.generativeai as genai import asyncio from typing import Generator, AsyncGenerator from openai import AsyncOpenAI import dotenv # Load environment variables dotenv.load_dotenv() # Metadata CURRENT_TIME = "2025-05-23 12:57:22" CURRENT_USER = "ErRickow" # Default API Keys (fallback if user doesn't provide their own) DEFAULT_XAI_KEY = os.getenv("XAI_API_KEY", "xai-vfjhklL384Z4HKdItsZomqpFlXubTZJAFnISQUpV7dE8lRnWwYBVPSCxSTlu08wDbAcv720bx2dDiQ9x") DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY") # API settings OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434") XAI_BASE_URL = "https://api.x.ai/v1" # Model lists OLLAMA_MODELS = [ "llama2", "codellama", "mistral", "neural-chat", "starling-lm", "dolphin-phi", "phi", "orca-mini" ] XAI_MODELS = [ "grok-2-latest", "grok-1", ] GEMINI_MODELS = [ "gemini-1.5-mini", "gemini-pro-vision", ] # Help texts XAI_API_HELP = """ ### Cara Mendapatkan X.AI (Grok) API Key: 1. Kunjungi [X.AI Developer Portal](https://x.ai) 2. Daftar/Login ke akun Anda 3. Buat API Key baru 4. Salin API Key Note: - Jika tidak diisi, akan menggunakan API key default - Masukkan API key Anda sendiri jika default mencapai limit """ class AIProvider: OLLAMA = "ollama" GEMINI = "gemini" XAI = "xai" class RepoAnalyzer: def __init__(self): self.current_repo = None self.repo_content = {} self.chat_history = [] async def stream_xai_response(self, prompt: str, api_key: str = None, model: str = "grok-2-latest") -> AsyncGenerator[str, None]: """Stream response dari X.AI (Grok) API""" try: # Use default key if none provided actual_key = api_key if api_key else DEFAULT_XAI_KEY if not actual_key: yield "⚠️ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key." return client = AsyncOpenAI( api_key=actual_key, base_url=XAI_BASE_URL ) # Prepare messages with repository context if available messages = [ {"role": "system", "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia."} ] if self.current_repo: context = f"Repository: {self.current_repo}\n\n" repo_files = "\n".join(list(self.repo_content.keys())) context += f"Files in repository:\n{repo_files}\n\n" messages.append({"role": "system", "content": context}) messages.append({"role": "user", "content": prompt}) stream = await client.chat.completions.create( model=model, messages=messages, stream=True ) full_response = "" async for chunk in stream: if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_response += content yield content self.chat_history.append({"role": "user", "content": prompt}) self.chat_history.append({"role": "assistant", "content": full_response}) except Exception as e: error_msg = f"⚠️ Error dalam X.AI API: {str(e)}" print(error_msg) yield error_msg async def stream_gemini_response(self, prompt: str, api_key: str) -> AsyncGenerator[str, None]: """Stream response dari Gemini API""" try: if not api_key: yield "⚠️ API Key Gemini diperlukan. Klik icon bantuan (?) di samping input API Key untuk panduan mendapatkan key." return genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-pro') # Tambahkan konteks repository jika ada if self.current_repo: context = f"Repository: {self.current_repo}\n\n" repo_files = "\n".join(list(self.repo_content.keys())) context += f"Files in repository:\n{repo_files}\n\n" prompt = context + prompt response = model.generate_content( prompt, generation_config={ "temperature": 0.7, "top_p": 0.8, "top_k": 40 }, stream=True ) full_response = "" async for chunk in response: if chunk.text: full_response += chunk.text yield chunk.text self.chat_history.append({"role": "user", "content": prompt}) self.chat_history.append({"role": "assistant", "content": full_response}) except Exception as e: error_msg = f"⚠️ Error dalam Gemini API: {str(e)}\n\nPastikan API Key valid dan memiliki kuota yang cukup." print(error_msg) yield error_msg def clone_repository(self, repo_url: str, github_token: str, branch: str = None) -> tuple[bool, str]: """Clone repository GitHub dengan autentikasi""" if not repo_url: return False, "⚠️ URL repository diperlukan" repo_name = repo_url.split('/')[-1].replace('.git', '') if os.path.exists(repo_name): subprocess.run(['rm', '-rf', repo_name], check=True) try: owner_repo = '/'.join(repo_url.split('/')[-2:]) # Cek apakah repository private headers = {'Authorization': f'token {github_token}'} if github_token else {} repo_check = requests.get(f"https://api.github.com/repos/{owner_repo}", headers=headers) if repo_check.status_code == 404: return False, "⚠️ Repository tidak ditemukan. Periksa URL repository." elif repo_check.status_code == 401: return False, "⚠️ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token." elif repo_check.status_code == 403 and repo_check.json().get('private', False): return False, "⚠️ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan." auth_url = f"https://{github_token}@github.com/{owner_repo}" if github_token else f"https://github.com/{owner_repo}" cmd = ['git', 'clone'] if branch: cmd.extend(['--branch', branch]) cmd.append(auth_url) process = subprocess.run( cmd, capture_output=True, text=True, env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0') ) if process.returncode == 0: self.current_repo = repo_name # Scan dan simpan konten repository file_count = 0 for file_path in Path(repo_name).rglob('*'): if file_path.is_file() and '.git' not in str(file_path): success, content = self.read_file_safely(str(file_path)) if success: self.repo_content[str(file_path)] = content file_count += 1 return True, f"✅ Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini." else: return False, f"⚠️ Gagal clone repository:\n{process.stderr}" except Exception as e: return False, f"⚠️ Error: {str(e)}" def read_file_safely(self, file_path: str) -> tuple[bool, str]: """Baca file dengan aman menggunakan berbagai encoding""" encodings = ['utf-8', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() return True, content except Exception as e: continue return False, "Tidak dapat membaca file dengan encoding yang didukung" def create_ui(): analyzer = RepoAnalyzer() with gr.Blocks(title="Repository Chat Analysis") as app: gr.Markdown(f""" # 🤖 Repository Chat Analysis 📅 Waktu: {CURRENT_TIME} 👤 Pengguna: {CURRENT_USER} """) with gr.Tab("🛠️ Konfigurasi"): provider = gr.Radio( choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA], label="Penyedia AI", value=AIProvider.XAI ) with gr.Group() as api_settings: with gr.Row(): xai_key = gr.Textbox( label="X.AI (Grok) API Key", type="password", placeholder="Opsional - Klik icon (?) untuk info. Kosongkan untuk gunakan key default", show_label=True ) gr.Markdown(XAI_API_HELP) with gr.Row(): gemini_key = gr.Textbox( label="Gemini API Key", type="password", placeholder="Opsional - Kosongkan untuk gunakan key default", show_label=True ) gr.Markdown(GEMINI_API_HELP) # Model selection based on provider with gr.Row(): model_dropdown = gr.Dropdown( label="Model AI", choices=XAI_MODELS, value="grok-2-latest", interactive=True ) def update_model_list(provider_choice): if provider_choice == AIProvider.XAI: return gr.Dropdown(choices=XAI_MODELS, value="grok-2-latest") elif provider_choice == AIProvider.GEMINI: return gr.Dropdown(choices=GEMINI_MODELS, value="gemini-pro") else: # OLLAMA return gr.Dropdown(choices=OLLAMA_MODELS, value="llama2") provider.change( fn=update_model_list, inputs=[provider], outputs=[model_dropdown] ) with gr.Tab("📊 Analisis Repository"): with gr.Row(): repo_url = gr.Textbox( label="URL Repository GitHub", placeholder="Contoh: https://github.com/username/repository" ) with gr.Column(): github_token = gr.Textbox( label="Token GitHub", type="password", placeholder="Klik icon (?) untuk panduan mendapatkan token" ) gr.Markdown(GITHUB_TOKEN_HELP) branch = gr.Textbox( label="Branch (opsional)", placeholder="main" ) clone_button = gr.Button("🔄 Clone Repository", variant="primary") clone_status = gr.Markdown( label="Status Repository", ) gr.Markdown(""" ### 💡 Contoh Pertanyaan: - "Jelaskan struktur utama dari repository ini" - "Apa saja fitur-fitur utama dalam kode ini?" - "Bagaimana cara memperbaiki [masalah specific] di repository ini?" - "Tolong analisis kualitas kode di file [nama file]" """) with gr.Row(): chat_input = gr.Textbox( label="💭 Tanyakan tentang Repository", placeholder="Ketik pertanyaan Anda di sini...", lines=3 ) send_button = gr.Button("📤 Kirim", variant="primary") chat_history = gr.Chatbot( label="📝 Riwayat Chat", height=500, show_label=True ) async def handle_chat(message, history, provider_choice, model_name, xai_key, gemini_key): if not analyzer.current_repo: yield history + [[message, "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan."]] return history = history or [] history.append([message, ""]) full_response = "" if provider_choice == AIProvider.XAI: async for chunk in analyzer.stream_xai_response(message, xai_key, model_name): full_response += chunk history[-1][1] = full_response yield history elif provider_choice == AIProvider.GEMINI: async for chunk in analyzer.stream_gemini_response(message, gemini_key or DEFAULT_GEMINI_KEY): full_response += chunk history[-1][1] = full_response yield history else: # OLLAMA response = analyze_with_ollama(model_name, message) history[-1][1] = response yield history send_button.click( fn=handle_chat, inputs=[ chat_input, chat_history, provider, model_dropdown, xai_key, gemini_key ], outputs=chat_history ).then( fn=lambda: gr.update(value=""), outputs=chat_input ) chat_input.submit( fn=handle_chat, inputs=[ chat_input, chat_history, provider, model_dropdown, xai_key, gemini_key ], outputs=chat_history ).then( fn=lambda: gr.update(value=""), outputs=chat_input ) return app if __name__ == "__main__": print(f""" 🚀 Memulai Repository Chat Analysis 📅 Waktu: {CURRENT_TIME} 👤 Pengguna: {CURRENT_USER} """) app = create_ui() app.launch(share=True)