OpenRepoAI / app.py
ErNewdev0's picture
chore: fixes groq
8766ecc verified
import gradio as gr
import os
import time
from pathlib import Path
import subprocess
import requests
import json
from datetime import datetime
import textwrap
import google.generativeai as genai
import asyncio
from typing import Generator, AsyncGenerator, List
from openai import AsyncOpenAI
import dotenv
# Load environment variables
dotenv.load_dotenv()
# Default API Keys (fallback if user doesn't provide their own)
DEFAULT_XAI_KEY = os.getenv(
"XAI_API_KEY"
)
DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY")
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"
# API settings
OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434")
XAI_BASE_URL = "https://api.x.ai/v1"
# Model lists
OLLAMA_MODELS = [
"llama2",
"codellama",
"mistral",
"neural-chat",
"starling-lm",
"dolphin-phi",
"phi",
"orca-mini",
]
XAI_MODELS = [
"grok-2-latest",
"grok-1",
]
# Help texts
GITHUB_TOKEN_HELP = """
### Cara Mendapatkan GitHub Token:
1. Kunjungi [GitHub Token Settings](https://github.com/settings/tokens)
2. Klik "Generate new token" > "Generate new token (classic)"
3. Beri nama token Anda di "Note"
4. Pilih scope:
- `repo` (untuk akses repository private)
- `read:packages` (opsional, untuk akses package)
5. Klik "Generate token"
6. **PENTING**: Salin token segera! Token hanya ditampilkan sekali
Token diperlukan untuk:
- Mengakses repository private
- Clone repository dengan rate limit lebih tinggi
- Mengakses fitur GitHub API
"""
GEMINI_API_HELP = """
### Cara Mendapatkan Gemini API Key:
1. Kunjungi [Google AI Studio](https://makersuite.google.com/app/apikey)
2. Login dengan akun Google Anda
3. Klik "Create API Key"
4. Salin API Key yang dihasilkan
Catatan:
- Gemini memberikan kuota gratis setiap bulan
- Key bisa dibuat ulang jika diperlukan
- Monitor penggunaan di [Google Cloud Console](https://console.cloud.google.com/)
"""
OLLAMA_HELP = """
### Cara Menggunakan Ollama:
1. Install Ollama dari [ollama.ai](https://ollama.ai)
2. Jalankan Ollama di komputer Anda
3. Pastikan Ollama berjalan di http://localhost:11434
Catatan:
- Ollama berjalan secara lokal di komputer Anda
- Tidak memerlukan API key
- Ideal untuk privasi dan penggunaan offline
"""
XAI_API_HELP = """
### Cara Mendapatkan X.AI (Grok) API Key:
1. Kunjungi [X.AI Developer Portal](https://x.ai)
2. Daftar/Login ke akun Anda
3. Buat API Key baru
4. Salin API Key
Note:
- Jika tidak diisi, akan menggunakan API key default
- Masukkan API key Anda sendiri jika default mencapai limit
"""
async def get_available_models(provider: str, api_key: str = None) -> List[str]:
"""Mendapatkan daftar model yang tersedia dari API."""
try:
if provider == AIProvider.XAI:
if not api_key and not DEFAULT_XAI_KEY:
return ["grok-2-latest"], "⚠️ API Key diperlukan untuk mendapatkan daftar model lengkap"
client = AsyncOpenAI(
api_key=api_key or DEFAULT_XAI_KEY,
base_url=XAI_BASE_URL
)
models = await client.models.list()
available_models = [m.id for m in models.data if "grok" in m.id.lower()]
return available_models, None
elif provider == AIProvider.GEMINI:
if not api_key and not DEFAULT_GEMINI_KEY:
return ["gemini-pro"], "⚠️ API Key diperlukan untuk mendapatkan daftar model lengkap"
client = AsyncOpenAI(
api_key=api_key or DEFAULT_GEMINI_KEY,
base_url=GEMINI_BASE_URL
)
try:
models = await client.models.list()
available_models = []
if hasattr(models, 'data'):
for model in models.data:
if "gemini" in model.id.lower():
# Remove 'models/' prefix if exists
model_name = model.id.replace("models/", "")
available_models.append(model_name)
if not available_models: # Fallback if no models found
available_models = GEMINI_MODELS
return available_models, None
except Exception as e:
return GEMINI_MODELS, f"⚠️ Error listing models: {str(e)}"
else: # OLLAMA
try:
response = requests.get(f"{OLLAMA_API}/api/tags")
if response.status_code == 200:
models = [m['name'] for m in response.json()['models']]
return models, None
return ["llama2"], f"⚠️ Error mengakses Ollama API: {response.status_code}"
except Exception as e:
return ["llama2"], f"⚠️ Error connecting to Ollama: {str(e)}"
except Exception as e:
return [], f"⚠️ Error mendapatkan daftar model: {str(e)}"
class AIProvider:
OLLAMA = "ollama"
GEMINI = "gemini"
XAI = "xai"
class RepoAnalyzer:
def __init__(self):
self.current_repo = None
self.repo_content = {}
self.chat_history = []
async def stream_gemini_response(
self, prompt: str, api_key: str = None, model: str = "gemini-1.5-flash"
) -> AsyncGenerator[str, None]:
"""Stream response dari Gemini API menggunakan OpenAI client"""
try:
actual_key = api_key if api_key else DEFAULT_GEMINI_KEY
if not actual_key:
yield "⚠️ API Key Gemini diperlukan. Klik icon bantuan (?) untuk panduan mendapatkan key."
return
# Gunakan OpenAI client untuk Gemini
client = AsyncOpenAI(api_key=actual_key, base_url=GEMINI_BASE_URL)
api_model = f"models/{model}" if not model.startswith("models/") else model
# Tambahkan konteks repository jika ada
messages = [
{
"role": "system",
"content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia.",
}
]
if self.current_repo:
context = f"Repository: {self.current_repo}\n\n"
repo_files = "\n".join(list(self.repo_content.keys()))
context += f"Files in repository:\n{repo_files}\n\n"
prompt = context + prompt
messages.append({"role": "user", "content": prompt})
try:
stream = await client.chat.completions.create(
model=api_model,
messages=messages,
stream=True,
temperature=0.7,
top_p=0.8,
max_tokens=4096,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
if "model not found" in str(e).lower():
yield f"⚠️ Model {model} tidak tersedia di Gemini API"
elif "rate limit" in str(e).lower():
yield "⚠️ Rate limit tercapai. Coba lagi nanti atau gunakan API key yang berbeda."
else:
yield f"⚠️ Error saat streaming dari Gemini: {str(e)}"
return
except Exception as e:
error_msg = f"⚠️ Error dalam Gemini API: {str(e)}\n\nPastikan:\n1. API Key valid\n2. Model {model} tersedia\n3. Anda memiliki kuota yang cukup"
print(error_msg)
yield error_msg
async def stream_xai_response(
self, prompt: str, api_key: str = None, model: str = "grok-2-latest"
) -> AsyncGenerator[str, None]:
"""Stream response dari X.AI (Grok) API dengan support berbagai model"""
try:
actual_key = api_key if api_key else DEFAULT_XAI_KEY
if not actual_key:
yield "⚠️ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key."
return
client = AsyncOpenAI(api_key=actual_key, base_url=XAI_BASE_URL)
# Verifikasi model support
#try:
# model_info = await client.models.retrieve(model)
# if not any(c.type == "chat" for c in model_info.capabilities):
# yield f"⚠️ Model {model} tidak mendukung chat completion"
# return
#except Exception as e:
# yield f"⚠️ Error verifikasi model {model}: {str(e)}"
# return
try:
stream = await client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia.",
},
{"role": "user", "content": prompt},
],
stream=True,
)
except Exception as e:
yield f"⚠️ Error streaming dari model {model}: {str(e)}"
return
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"⚠️ Error dalam X.AI API: {str(e)}\nPastikan:\n1. API Key valid\n2. Model {model} tersedia\n3. Anda memiliki akses ke model ini"
def clone_repository(
self, repo_url: str, github_token: str, branch: str = None
) -> tuple[bool, str]:
"""Clone repository GitHub dengan autentikasi"""
if not repo_url:
return False, "⚠️ URL repository diperlukan"
repo_name = repo_url.split("/")[-1].replace(".git", "")
if os.path.exists(repo_name):
subprocess.run(["rm", "-rf", repo_name], check=True)
try:
owner_repo = "/".join(repo_url.split("/")[-2:])
# Cek apakah repository private
headers = {"Authorization": f"token {github_token}"} if github_token else {}
repo_check = requests.get(
f"https://api.github.com/repos/{owner_repo}", headers=headers
)
if repo_check.status_code == 404:
return False, "⚠️ Repository tidak ditemukan. Periksa URL repository."
elif repo_check.status_code == 401:
return (
False,
"⚠️ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token.",
)
elif repo_check.status_code == 403 and repo_check.json().get(
"private", False
):
return (
False,
"⚠️ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan.",
)
auth_url = (
f"https://{github_token}@github.com/{owner_repo}"
if github_token
else f"https://github.com/{owner_repo}"
)
cmd = ["git", "clone"]
if branch:
cmd.extend(["--branch", branch])
cmd.append(auth_url)
process = subprocess.run(
cmd,
capture_output=True,
text=True,
env=dict(os.environ, GIT_ASKPASS="echo", GIT_TERMINAL_PROMPT="0"),
)
if process.returncode == 0:
self.current_repo = repo_name
# Scan dan simpan konten repository
file_count = 0
for file_path in Path(repo_name).rglob("*"):
if file_path.is_file() and ".git" not in str(file_path):
success, content = self.read_file_safely(str(file_path))
if success:
self.repo_content[str(file_path)] = content
file_count += 1
return (
True,
f"✅ Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini.",
)
else:
return False, f"⚠️ Gagal clone repository:\n{process.stderr}"
except Exception as e:
return False, f"⚠️ Error: {str(e)}"
def read_file_safely(self, file_path: str) -> tuple[bool, str]:
"""Baca file dengan aman menggunakan berbagai encoding"""
encodings = ["utf-8", "latin-1", "cp1252"]
for encoding in encodings:
try:
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
return True, content
except Exception as e:
continue
return False, "Tidak dapat membaca file dengan encoding yang didukung"
analyzer = RepoAnalyzer()
async def handle_chat(
message,
history,
provider_choice,
model_name,
xai_key,
gemini_key,
selected_files,
analyzer=analyzer,
):
"""Menangani interaksi chat dengan model AI"""
if not analyzer.current_repo:
new_message = {
"role": "assistant",
"content": "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan.",
}
history = history or []
history.append({"role": "user", "content": message})
history.append(new_message)
yield history
return
history = history or []
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": ""})
try:
# Add context about selected files to the prompt
file_context = ""
if selected_files:
file_context = "\n\nFile yang dipilih:\n"
for file in selected_files:
content = analyzer.repo_content.get(file, "")
if content: # Only include files that exist
file_context += f"\n{file}:\n```\n{content}\n```\n"
enhanced_message = f"{message}\n{file_context}"
full_response = ""
if provider_choice == AIProvider.XAI:
async for chunk in analyzer.stream_xai_response(
enhanced_message, xai_key, model_name
):
full_response += chunk
# Add delay between chunks for readability
await asyncio.sleep(0.5)
history[-1]["content"] = full_response
yield history
elif provider_choice == AIProvider.GEMINI:
async for chunk in analyzer.stream_gemini_response(
enhanced_message, gemini_key or DEFAULT_GEMINI_KEY
):
full_response += chunk
# Add delay between chunks for readability
await asyncio.sleep(0.5)
history[-1]["content"] = full_response
yield history
else: # OLLAMA
response = analyze_with_ollama(model_name, enhanced_message)
# Simulate streaming for OLLAMA with delay
words = response.split()
for i in range(len(words)):
full_response = " ".join(words[: i + 1])
await asyncio.sleep(1)
history[-1]["content"] = full_response
yield history
except Exception as e:
history[-1]["content"] = f"⚠️ Error: {str(e)}"
yield history
def create_ui():
# Gunakan analyzer global
global analyzer
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
with gr.Blocks(title="Open Repo AI", theme=gr.themes.Soft()) as app:
# CSS Styling
gr.Markdown("""
<style>
.container { max-width: 100% !important; padding: 1rem; }
.mobile-full { width: 100% !important; }
.file-list { margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
.file-item { display: flex; justify-content: space-between; padding: 5px 0; }
.file-remove { color: red; cursor: pointer; }
.example-list {
padding: 20px;
background: #f8f9fa;
border-radius: 8px;
border: 1px solid #e9ecef;
margin: 10px 0;
}
.example-list h3 {
color: #2c3e50;
margin-top: 20px;
margin-bottom: 10px;
font-size: 1.2em;
}
.example-list h4 {
color: #34495e;
margin-top: 15px;
margin-bottom: 5px;
font-size: 1.1em;
}
.example-list ul {
margin: 10px 0;
padding-left: 20px;
}
.example-list li {
margin: 8px 0;
line-height: 1.5;
list-style-type: disc;
}
.example-list code {
background: #e9ecef;
padding: 2px 5px;
border-radius: 4px;
font-family: monospace;
font-size: 0.9em;
}
.example-list strong {
color: #2c3e50;
font-weight: bold;
}
.example-list p {
margin: 10px 0;
line-height: 1.5;
}
@media (max-width: 768px) {
.gr-form { flex-direction: column !important; }
.gr-group { margin: 0.5rem 0 !important; }
}
</style>
""")
# Header
with gr.Row(elem_classes="container"):
gr.Markdown(f"""
# AI Github Repository Chat
- Current Date and Time (UTC): {current_time}
""")
# Main Tabs Container
with gr.Tabs() as tabs:
# Configuration Tab
with gr.Tab("Konfigurasi"):
provider = gr.Radio(
choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA],
label="AI Providers",
value=AIProvider.XAI,
)
with gr.Group() as api_settings:
with gr.Row():
xai_key = gr.Textbox(
label="X.AI (Grok) API Key (opsional)",
type="password",
placeholder="Memakai Apikey Kamu Sendiri",
show_label=True,
scale=3,
)
with gr.Column(scale=1):
gr.Markdown(XAI_API_HELP)
with gr.Row():
gemini_key = gr.Textbox(
label="Gemini API Key",
type="password",
placeholder="Opsional - Kosongkan untuk gunakan key default",
show_label=True,
scale=3,
)
with gr.Column(scale=1):
gr.Markdown(GEMINI_API_HELP)
with gr.Row():
model_dropdown = gr.Dropdown(
label="Model AI",
choices=XAI_MODELS,
value="grok-2-latest",
interactive=True,
)
# Repository Analysis Tab
with gr.Tab("Analisis Repository"):
with gr.Group():
# Repository URL and Token inputs
with gr.Row():
repo_url = gr.Textbox(
label="URL Repository GitHub",
placeholder="https://github.com/username/repository",
elem_classes="mobile-full",
)
with gr.Row():
with gr.Column(scale=2):
github_token = gr.Textbox(
label="Token GitHub (opsional)",
type="password",
placeholder="Masukkan github token jika repo private",
elem_classes="mobile-full",
)
gr.Markdown(GITHUB_TOKEN_HELP)
with gr.Column(scale=1):
branch = gr.Textbox(
label="Branch (opsional)",
placeholder="main",
elem_classes="mobile-full",
)
clone_button = gr.Button(
"Analisa Repo",
variant="primary",
elem_classes="mobile-full",
)
clone_status = gr.Markdown(
value="", label="Status Repository", elem_classes="mobile-full"
)
# File Selection
with gr.Group():
gr.Markdown("### File yang Dipilih")
with gr.Row():
file_selector = gr.Dropdown(
label="Pilih File dari Repository",
choices=[],
multiselect=True,
value=[],
allow_custom_value=True,
max_choices=None,
elem_classes="mobile-full",
)
file_list = gr.HTML(
value="<div class='file-list'>Belum ada file yang dipilih</div>",
label="Daftar File Terpilih",
)
# Examples Tab
with gr.Tab("Ide Cepat"):
example_output = gr.HTML(
value="Pilih file di tab Analisis Repository untuk melihat contoh pertanyaan.",
label="Contoh Pertanyaan",
)
# Chat Interface (outside tabs)
with gr.Group():
chat_history = gr.Chatbot(
label="Open Repo AI Assistant",
height=300,
show_label=True,
type="messages",
elem_classes="mobile-full",
)
with gr.Row():
chat_input = gr.Textbox(
label="Chat Dengan Repository",
placeholder="Ketik di sini...",
lines=3,
elem_classes="mobile-full",
)
send_button = gr.Button("Kirim", variant="primary")
clear_button = gr.Button("Bersihkan", variant="secondary")
# Event Handlers
def handle_clone(repo_url, github_token, branch):
if not repo_url:
return (
"URL repository diperlukan!",
gr.Dropdown(choices=[]),
"<div class='file-list'>Belum ada file yang dipilih</div>",
)
success, message = analyzer.clone_repository(repo_url, github_token, branch)
if success:
files = sorted(list(analyzer.repo_content.keys()))
return (
message,
gr.Dropdown(choices=files, value=[]),
"<div class='file-list'>Belum ada file yang dipilih</div>",
)
return (
message,
gr.Dropdown(choices=[]),
"<div class='file-list'>Belum ada file yang dipilih</div>",
)
def update_file_list(selected):
if not selected:
return "<div class='file-list'>Belum ada file yang dipilih</div>"
html = "<div class='file-list'>"
for file in selected:
html += f"<div class='file-item'><span>{file}</span></div>"
html += "</div>"
return html
def generate_examples(selected_files):
if not selected_files:
return """
<div class='example-list'>
<h3>Pilih File Terlebih Dahulu</h3>
<p>Silakan pilih file di tab Analisis Repository untuk melihat contoh pertanyaan yang relevan.</p>
</div>
"""
examples = "<div class='example-list'>"
# General examples for any file
examples += """
<h3>Contoh Pertanyaan Umum:</h3>
<ul>
"""
file_names = ", ".join(
[f"<code>{f.split('/')[-1]}</code>" for f in selected_files]
)
examples += f"""
<li><strong>Analisis Kode:</strong> "Jelaskan logika dan fungsi utama dari {file_names}"</li>
<li><strong>Deteksi Bug:</strong> "Apakah ada potensi bug atau masalah keamanan di file-file ini?"</li>
<li><strong>Best Practices:</strong> "Bagaimana cara mengoptimalkan kode di file-file ini?"</li>
"""
# Specific examples based on file types
for file in selected_files:
filename = file.split("/")[-1]
ext = filename.split(".")[-1].lower() if "." in filename else ""
examples += f"<h4>Contoh untuk {filename}:</h4><ul>"
if ext in ["py", "js", "java", "cpp", "c", "go"]:
examples += f"""
<li>"Jelaskan fungsi-fungsi utama di {filename}"</li>
<li>"Bagaimana cara mengoptimalkan performa di {filename}?"</li>
<li>"Buat unit test untuk fungsi-fungsi di {filename}"</li>
"""
elif ext in ["html", "css"]:
examples += f"""
<li>"Analisis struktur dan layout dari {filename}"</li>
<li>"Bagaimana cara membuat {filename} lebih responsif?"</li>
<li>"Optimasi untuk mobile view di {filename}"</li>
"""
elif ext == "md":
examples += f"""
<li>"Ringkas isi dokumentasi dari {filename}"</li>
<li>"Buat tabel konten untuk {filename}"</li>
<li>"Perbaiki formatting di {filename}"</li>
"""
elif ext in ["json", "yaml", "yml"]:
examples += f"""
<li>"Validasi struktur data di {filename}"</li>
<li>"Jelaskan konfigurasi di {filename}"</li>
<li>"Optimasi format di {filename}"</li>
"""
elif ext == "dockerfile":
examples += f"""
<li>"Analisis keamanan dari {filename}"</li>
<li>"Optimasi multi-stage build di {filename}"</li>
<li>"Best practices untuk {filename}"</li>
"""
else:
examples += f"""
<li>"Analisis isi dari {filename}"</li>
<li>"Jelaskan struktur dan tujuan {filename}"</li>
<li>"Saran perbaikan untuk {filename}"</li>
"""
examples += "</ul>"
examples += """
<h3>Tips:</h3>
<ul>
<li>Gunakan pertanyaan yang spesifik dan fokus pada bagian tertentu</li>
<li>Sebutkan nama file jika bertanya tentang file tertentu</li>
<li>Jelaskan konteks atau masalah yang ingin diselesaikan</li>
</ul>
"""
examples += "</div>"
return examples
def clear_chat_history():
return []
def update_model_list(provider_choice, api_key=None):
try:
models, error = asyncio.run(get_available_models(provider_choice, api_key))
if error:
return gr.Dropdown(
choices=models,
value=models[0] if models else None,
label=f"Model AI ({error})",
)
return gr.Dropdown(
choices=models,
value=models[0] if models else None,
label="Model AI",
)
except Exception as e:
return gr.Dropdown(
choices=["grok-2-latest"] if provider_choice == AIProvider.XAI else ["gemini-1.5-flash"],
value="grok-2-latest" if provider_choice == AIProvider.XAI else "gemini-1.5-flash",
label=f"Model AI (Error: {str(e)})",
)
# Connect Events
provider.change(fn=update_model_list, inputs=[provider, xai_key], outputs=[model_dropdown])
xai_key.change(
fn=lambda p, k: update_model_list(p, k) if p == AIProvider.XAI else None,
inputs=[provider, xai_key],
outputs=[model_dropdown],
)
gemini_key.change(
fn=lambda p, k: update_model_list(p, k) if p == AIProvider.GEMINI else None,
inputs=[provider, gemini_key],
outputs=[model_dropdown],
)
clone_button.click(
fn=handle_clone,
inputs=[repo_url, github_token, branch],
outputs=[clone_status, file_selector, file_list],
)
file_selector.change(
fn=update_file_list, inputs=[file_selector], outputs=[file_list]
)
# Example tab updates
tabs.select(
fn=generate_examples,
inputs=[file_selector],
outputs=[example_output],
api_name=False,
)
# Also update when file selection changes
file_selector.change(
fn=generate_examples, inputs=[file_selector], outputs=[example_output]
)
# Chat events
clear_button.click(fn=clear_chat_history, outputs=[chat_history])
send_button.click(
fn=handle_chat,
inputs=[
chat_input,
chat_history,
provider,
model_dropdown,
xai_key,
gemini_key,
file_selector,
],
outputs=chat_history,
show_progress=True,
).then(fn=lambda: gr.update(value=""), outputs=chat_input)
chat_input.submit(
fn=handle_chat,
inputs=[
chat_input,
chat_history,
provider,
model_dropdown,
xai_key,
gemini_key,
file_selector,
],
outputs=chat_history,
show_progress=True,
).then(fn=lambda: gr.update(value=""), outputs=chat_input)
return app
if __name__ == "__main__":
print(f"""
🚀 Memulai Repository Chat Analysis
""")
app = create_ui()
app.launch(share=True)