OpenRepoAI / app.py
ErNewdev0's picture
Update app.py
1034afa verified
raw
history blame
30.2 kB
import gradio as gr
import os
from pathlib import Path
import subprocess
import requests
import json
import time
import asyncio
from datetime import datetime
import textwrap
import google.generativeai as genai
from typing import Generator, AsyncGenerator, List
from openai import AsyncOpenAI
import dotenv
# Load environment variables
dotenv.load_dotenv()
# Metadata
CURRENT_TIME = "2025-05-23 12:57:22"
CURRENT_USER = "ErRickow"
# Default API Keys (fallback if user doesn't provide their own)
DEFAULT_XAI_KEY = os.getenv("XAI_API_KEY", "xai-vfjhklL384Z4HKdItsZomqpFlXubTZJAFnISQUpV7dE8lRnWwYBVPSCxSTlu08wDbAcv720bx2dDiQ9x")
DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY")
# API settings
OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434")
XAI_BASE_URL = "https://api.x.ai/v1"
# Model lists
OLLAMA_MODELS = [
"llama2",
"codellama",
"mistral",
"neural-chat",
"starling-lm",
"dolphin-phi",
"phi",
"orca-mini"
]
XAI_MODELS = [
"grok-2-latest",
"grok-1",
]
GEMINI_MODELS = [
"gemini-1.5-mini",
"gemini-pro-vision",
]
GITHUB_TOKEN_HELP = """
### Cara Mendapatkan GitHub Token:
1. Kunjungi [GitHub Token Settings](https://github.com/settings/tokens)
2. Klik "Generate new token" > "Generate new token (classic)"
3. Beri nama token Anda di "Note"
4. Pilih scope:
- `repo` (untuk akses repository private)
- `read:packages` (opsional, untuk akses package)
5. Klik "Generate token"
6. **PENTING**: Salin token segera! Token hanya ditampilkan sekali
Token diperlukan untuk:
- Mengakses repository private
- Clone repository dengan rate limit lebih tinggi
- Mengakses fitur GitHub API
"""
GEMINI_API_HELP = """
### Cara Mendapatkan Gemini API Key:
1. Kunjungi [Google AI Studio](https://makersuite.google.com/app/apikey)
2. Login dengan akun Google Anda
3. Klik "Create API Key"
4. Salin API Key yang dihasilkan
Catatan:
- Gemini memberikan kuota gratis setiap bulan
- Key bisa dibuat ulang jika diperlukan
- Monitor penggunaan di [Google Cloud Console](https://console.cloud.google.com/)
"""
OLLAMA_HELP = """
### Cara Menggunakan Ollama:
1. Install Ollama dari [ollama.ai](https://ollama.ai)
2. Jalankan Ollama di komputer Anda
3. Pastikan Ollama berjalan di http://localhost:11434
Catatan:
- Ollama berjalan secara lokal di komputer Anda
- Tidak memerlukan API key
- Ideal untuk privasi dan penggunaan offline
"""
# Help texts
XAI_API_HELP = """
### Cara Mendapatkan X.AI (Grok) API Key:
1. Kunjungi [X.AI Developer Portal](https://x.ai)
2. Daftar/Login ke akun Anda
3. Buat API Key baru
4. Salin API Key
Note:
- Jika tidak diisi, akan menggunakan API key default
- Masukkan API key Anda sendiri jika default mencapai limit
"""
class AIProvider:
OLLAMA = "ollama"
GEMINI = "gemini"
XAI = "xai"
class RepoAnalyzer:
def __init__(self):
self.current_repo = None
self.repo_content = {}
self.chat_history = []
async def stream_xai_response(self, prompt: str, api_key: str = None, model: str = "grok-2-latest") -> AsyncGenerator[str, None]:
"""Stream response dari X.AI (Grok) API"""
try:
# Use default key if none provided
actual_key = api_key if api_key else DEFAULT_XAI_KEY
if not actual_key:
yield "⚠️ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key."
return
client = AsyncOpenAI(
api_key=actual_key,
base_url=XAI_BASE_URL
)
# Prepare messages with repository context if available
messages = [
{"role": "system", "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia."}
]
if self.current_repo:
context = f"Repository: {self.current_repo}\n\n"
repo_files = "\n".join(list(self.repo_content.keys()))
context += f"Files in repository:\n{repo_files}\n\n"
messages.append({"role": "system", "content": context})
messages.append({"role": "user", "content": prompt})
stream = await client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
self.chat_history.append({"role": "user", "content": prompt})
self.chat_history.append({"role": "assistant", "content": full_response})
except Exception as e:
error_msg = f"⚠️ Error dalam X.AI API: {str(e)}"
print(error_msg)
yield error_msg
async def stream_gemini_response(self, prompt: str, api_key: str) -> AsyncGenerator[str, None]:
"""Stream response dari Gemini API"""
try:
if not api_key:
yield "⚠️ API Key Gemini diperlukan. Klik icon bantuan (?) di samping input API Key untuk panduan mendapatkan key."
return
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-pro')
# Tambahkan konteks repository jika ada
if self.current_repo:
context = f"Repository: {self.current_repo}\n\n"
repo_files = "\n".join(list(self.repo_content.keys()))
context += f"Files in repository:\n{repo_files}\n\n"
prompt = context + prompt
response = model.generate_content(
prompt,
generation_config={
"temperature": 0.7,
"top_p": 0.8,
"top_k": 40
},
stream=True
)
full_response = ""
async for chunk in response:
if chunk.text:
full_response += chunk.text
yield chunk.text
self.chat_history.append({"role": "user", "content": prompt})
self.chat_history.append({"role": "assistant", "content": full_response})
except Exception as e:
error_msg = f"⚠️ Error dalam Gemini API: {str(e)}\n\nPastikan API Key valid dan memiliki kuota yang cukup."
print(error_msg)
yield error_msg
def clone_repository(self, repo_url: str, github_token: str, branch: str = None) -> tuple[bool, str]:
"""Clone repository GitHub dengan autentikasi"""
if not repo_url:
return False, "⚠️ URL repository diperlukan"
repo_name = repo_url.split('/')[-1].replace('.git', '')
if os.path.exists(repo_name):
subprocess.run(['rm', '-rf', repo_name], check=True)
try:
owner_repo = '/'.join(repo_url.split('/')[-2:])
# Cek apakah repository private
headers = {'Authorization': f'token {github_token}'} if github_token else {}
repo_check = requests.get(f"https://api.github.com/repos/{owner_repo}", headers=headers)
if repo_check.status_code == 404:
return False, "⚠️ Repository tidak ditemukan. Periksa URL repository."
elif repo_check.status_code == 401:
return False, "⚠️ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token."
elif repo_check.status_code == 403 and repo_check.json().get('private', False):
return False, "⚠️ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan."
auth_url = f"https://{github_token}@github.com/{owner_repo}" if github_token else f"https://github.com/{owner_repo}"
cmd = ['git', 'clone']
if branch:
cmd.extend(['--branch', branch])
cmd.append(auth_url)
process = subprocess.run(
cmd,
capture_output=True,
text=True,
env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0')
)
if process.returncode == 0:
self.current_repo = repo_name
# Scan dan simpan konten repository
file_count = 0
for file_path in Path(repo_name).rglob('*'):
if file_path.is_file() and '.git' not in str(file_path):
success, content = self.read_file_safely(str(file_path))
if success:
self.repo_content[str(file_path)] = content
file_count += 1
return True, f"✅ Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini."
else:
return False, f"⚠️ Gagal clone repository:\n{process.stderr}"
except Exception as e:
return False, f"⚠️ Error: {str(e)}"
def read_file_safely(self, file_path: str) -> tuple[bool, str]:
"""Baca file dengan aman menggunakan berbagai encoding"""
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return True, content
except Exception as e:
continue
return False, "Tidak dapat membaca file dengan encoding yang didukung"
async def handle_chat(message, history, provider_choice, model_name, xai_key, gemini_key, selected_files):
if not analyzer.current_repo:
yield history + [[message, "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan."]]
return
history = history or []
history.append([message, ""])
try:
# Add context about selected files to the prompt
file_context = ""
if selected_files:
file_context = "\n\nFile yang dipilih:\n"
for file in selected_files:
content = analyzer.repo_content.get(file, "")
escaped_content = content.replace('`', r'\`')
html = (
'<div class="wrapper-artifact">'
f'<div class="header-artifact">'
f'<span>{file}</span>'
f'<button class="copy-button" onclick="copyToClipboard(`{escaped_content}`)">Copy</button>'
'</div>'
'<div class="content-artifact">'
f'<pre><code>{content}</code></pre>'
'</div>'
'</div>'
)
file_context += html
enhanced_message = f"{message}\n{file_context}"
full_response = ""
if provider_choice == AIProvider.XAI:
async for chunk in analyzer.stream_xai_response(enhanced_message, xai_key, model_name):
# Wrap code blocks in custom styling
chunk = process_code_blocks(chunk)
full_response += chunk
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
elif provider_choice == AIProvider.GEMINI:
async for chunk in analyzer.stream_gemini_response(enhanced_message, gemini_key or DEFAULT_GEMINI_KEY):
chunk = process_code_blocks(chunk)
full_response += chunk
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
else: # OLLAMA
response = analyze_with_ollama(model_name, enhanced_message)
response = process_code_blocks(response)
words = response.split()
for i in range(len(words)):
full_response = " ".join(words[:i+1])
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
except Exception as e:
history[-1][1] = f"⚠️ Error: {str(e)}"
yield history
def process_code_blocks(text):
"""Process markdown code blocks to use custom artifact styling"""
import re
# Pattern for code blocks with language specification
pattern = r"```(\w+)\n(.*?)```"
def replace_code_block(match):
language = match.group(1)
code = match.group(2)
escaped_code = code.replace('`', r'\`')
html = (
'<div class="wrapper-artifact">'
f'<div class="header-artifact">'
f'<span>{language}</span>'
f'<button class="copy-button" onclick="copyToClipboard(`{escaped_code}`)">Copy</button>'
'</div>'
'<div class="content-artifact">'
f'<pre><code>{code}</code></pre>'
'</div>'
'</div>'
)
return html
# Replace all code blocks in the text
processed_text = re.sub(pattern, replace_code_block, text, flags=re.DOTALL)
return processed_text
analyzer = RepoAnalyzer()
def create_ui():
# Get local time
local_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
analyzer = RepoAnalyzer()
with gr.Blocks(title="Repository Chat Analysis", theme=gr.themes.Soft()) as app:
gr.Markdown("""
<style>
.container { max-width: 100% !important; padding: 1rem; }
.mobile-full { width: 100% !important; }
.file-list { margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
.file-item { display: flex; justify-content: space-between; padding: 5px 0; }
.file-remove { color: red; cursor: pointer; }
/* Enhanced code block styling */
.wrapper-artifact {
border: 1px solid #e0e0e0;
border-radius: 8px;
margin: 16px 0;
background: #f8f9fa;
}
.header-artifact {
padding: 8px 16px;
background: #f1f3f4;
border-bottom: 1px solid #e0e0e0;
border-radius: 8px 8px 0 0;
font-family: monospace;
display: flex;
justify-content: space-between;
align-items: center;
}
.content-artifact {
padding: 16px;
overflow-x: auto;
background: #ffffff;
border-radius: 0 0 8px 8px;
}
.content-artifact pre {
margin: 0;
padding: 0;
}
.copy-button {
background: #e0e0e0;
border: none;
padding: 4px 8px;
border-radius: 4px;
cursor: pointer;
font-size: 12px;
}
.copy-button:hover {
background: #d0d0d0;
}
/* Chat interface styling */
.fullscreen-chat {
height: calc(100vh - 200px) !important;
margin: 20px 0;
}
.chat-input {
position: sticky;
bottom: 0;
background: white;
padding: 10px 0;
border-top: 1px solid #eee;
}
.chat-message {
margin: 8px 0;
}
@media (max-width: 768px) {
.gr-form { flex-direction: column !important; }
.gr-group { margin: 0.5rem 0 !important; }
.fullscreen-chat {
height: calc(100vh - 300px) !important;
}
}
</style>
<script>
function copyToClipboard(text) {
navigator.clipboard.writeText(text).then(() => {
const button = event.target;
button.textContent = 'Copied!';
setTimeout(() => {
button.textContent = 'Copy';
}, 2000);
});
}
</script>
""")
# Header
with gr.Row(elem_classes="container"):
gr.Markdown(f"# 🤖 Repository Chat Analysis\n\n📅 {local_time}")
# Main Interface
with gr.Tabs() as tabs:
# Configuration Tab
with gr.Tab("🛠️ Konfigurasi", elem_classes="mobile-full"):
provider = gr.Radio(
choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA],
label="Penyedia AI",
value=AIProvider.XAI,
interactive=True,
elem_classes="mobile-full"
)
with gr.Group() as api_settings:
# XAI API Key
with gr.Group():
with gr.Row():
xai_key = gr.Textbox(
label="X.AI (Grok) API Key",
type="password",
placeholder="Opsional - Klik icon (?) untuk info",
show_label=True,
scale=3
)
with gr.Column(scale=1):
gr.Markdown(XAI_API_HELP)
# Gemini API Key
with gr.Group():
with gr.Row():
gemini_key = gr.Textbox(
label="Gemini API Key",
type="password",
placeholder="Opsional - Kosongkan untuk gunakan key default",
show_label=True,
scale=3
)
with gr.Column(scale=1):
gr.Markdown(GEMINI_API_HELP)
# Model Selection
with gr.Row():
model_dropdown = gr.Dropdown(
label="Model AI",
choices=XAI_MODELS,
value="grok-2-latest",
interactive=True,
elem_classes="mobile-full"
)
# Repository Analysis Tab
with gr.Tab("📊 Analisis Repository", elem_classes="mobile-full"):
# Repository Input Section
with gr.Group():
with gr.Row():
repo_url = gr.Textbox(
label="URL Repository GitHub",
placeholder="https://github.com/username/repository",
elem_classes="mobile-full"
)
with gr.Row():
with gr.Column(scale=2):
github_token = gr.Textbox(
label="Token GitHub",
type="password",
placeholder="Klik icon (?) untuk panduan",
elem_classes="mobile-full"
)
with gr.Column(scale=1):
branch = gr.Textbox(
label="Branch (opsional)",
placeholder="main",
elem_classes="mobile-full"
)
# Clone Repository Section
clone_button = gr.Button(
"🔄 Clone Repository",
variant="primary",
elem_classes="mobile-full"
)
clone_status = gr.Markdown(
value="",
label="Status Repository",
elem_classes="mobile-full"
)
# File Selection Section
with gr.Group():
gr.Markdown("### 📎 File yang Dipilih")
with gr.Row():
file_selector = gr.Dropdown(
label="Pilih File dari Repository",
choices=[],
multiselect=True,
elem_classes="mobile-full"
)
file_list = gr.HTML(
value="<div class='file-list'>Belum ada file yang dipilih</div>",
label="Daftar File Terpilih"
)
# Chat Interface (Outside tabs for full width)
with gr.Group():
chat_history = gr.Chatbot(
label="📝 Riwayat Chat",
elem_classes="fullscreen-chat",
height=None,
show_label=True,
type="messages"
)
with gr.Group(elem_classes="chat-input"):
chat_input = gr.Textbox(
label="💭 Tanyakan tentang Repository",
placeholder="Ketik pertanyaan Anda di sini...",
lines=3,
elem_classes="mobile-full"
)
with gr.Row():
clear_button = gr.Button("🧹 Bersihkan", variant="secondary")
send_button = gr.Button("📤 Kirim", variant="primary")
# Event Handlers
def clear_chat_history():
return None
def handle_clone(repo_url, github_token, branch):
if not repo_url:
return "⚠️ URL repository diperlukan!", [], []
success, message = analyzer.clone_repository(repo_url, github_token, branch)
if success:
# Get files with metadata for better display
files = []
file_metadata = []
for file_path in analyzer.repo_content.keys():
files.append(file_path)
content = analyzer.repo_content[file_path]
size = len(content.encode('utf-8'))
file_metadata.append({
'path': file_path,
'size': f"{size / 1024:.1f} KB",
'lines': len(content.splitlines())
})
# Create rich HTML display for clone status
status_html = f"""
<div style="padding: 1rem; background: #f0f8ff; border-radius: 8px; border: 1px solid #add8e6;">
<h3 style="margin-top: 0;">✅ Repository berhasil di-clone!</h3>
<p><strong>Nama:</strong> {analyzer.current_repo}</p>
<p><strong>Jumlah file:</strong> {len(files)}</p>
<p><strong>Status:</strong> Siap untuk analisis</p>
</div>
"""
# Create initial file list display
file_list_html = create_file_list_html(file_metadata)
return status_html, files, file_list_html
return message, [], []
def create_file_list_html(file_metadata):
if not file_metadata:
return "<div class='file-list'>Belum ada file yang dipilih</div>"
html = "<div class='file-list'>"
html += "<style>\n"
html += ".file-list { max-height: 300px; overflow-y: auto; }\n"
html += ".file-item { display: flex; justify-content: space-between; padding: 8px; border-bottom: 1px solid #eee; }\n"
html += ".file-item:hover { background: #f5f5f5; }\n"
html += ".file-info { display: flex; gap: 1rem; color: #666; font-size: 0.9em; }\n"
html += "</style>"
for metadata in file_metadata:
html += f"""
<div class='file-item'>
<div class='file-path'>{metadata['path']}</div>
<div class='file-info'>
<span>{metadata['size']}</span>
<span>{metadata['lines']} lines</span>
</div>
</div>
"""
html += "</div>"
return html
def update_file_list(selected):
if not selected:
return "<div class='file-list'>Belum ada file yang dipilih</div>"
file_metadata = []
for file_path in selected:
if file_path in analyzer.repo_content:
content = analyzer.repo_content[file_path]
size = len(content.encode('utf-8'))
file_metadata.append({
'path': file_path,
'size': f"{size / 1024:.1f} KB",
'lines': len(content.splitlines())
})
return create_file_list_html(file_metadata)
# Modified handle_chat function with better file context handling
async def handle_chat(message, history, provider_choice, model_name, xai_key, gemini_key, selected_files):
if not analyzer.current_repo:
yield history + [[message, "⚠️ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan."]]
return
history = history or []
history.append([message, ""])
try:
# Improved file context formatting
file_context = ""
if selected_files:
file_context = "\n\nAnalisis berdasarkan file yang dipilih:\n"
for file in selected_files:
content = analyzer.repo_content.get(file, "")
file_context += f"\n### File: {file}\n```\n{content}\n```\n"
enhanced_message = f"{message}\n{file_context}"
# Rest of the handle_chat function remains the same
full_response = ""
if provider_choice == AIProvider.XAI:
async for chunk in analyzer.stream_xai_response(enhanced_message, xai_key, model_name):
chunk = process_code_blocks(chunk)
full_response += chunk
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
elif provider_choice == AIProvider.GEMINI:
async for chunk in analyzer.stream_gemini_response(enhanced_message, gemini_key or DEFAULT_GEMINI_KEY):
chunk = process_code_blocks(chunk)
full_response += chunk
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
else: # OLLAMA
response = analyze_with_ollama(model_name, enhanced_message)
response = process_code_blocks(response)
words = response.split()
for i in range(len(words)):
full_response = " ".join(words[:i+1])
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
except Exception as e:
history[-1][1] = f"⚠️ Error: {str(e)}"
yield history
# Add additional CSS for improved repository display
gr.Markdown("""
<style>
/* Additional styles for repository content */
.repo-status {
margin: 1rem 0;
padding: 1rem;
background: #f0f8ff;
border-radius: 8px;
border: 1px solid #add8e6;
}
.file-list {
border: 1px solid #ddd;
border-radius: 8px;
margin-top: 1rem;
background: white;
}
.file-item {
transition: background-color 0.2s;
}
.file-info {
opacity: 0.7;
}
.file-path {
font-family: monospace;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
/* Improved scrollbars for file list */
.file-list::-webkit-scrollbar {
width: 8px;
height: 8px;
}
.file-list::-webkit-scrollbar-track {
background: #f1f1f1;
border-radius: 4px;
}
.file-list::-webkit-scrollbar-thumb {
background: #888;
border-radius: 4px;
}
.file-list::-webkit-scrollbar-thumb:hover {
background: #555;
}
</style>
""")
return app
if __name__ == "__main__":
print(f"""
🚀 Memulai Repository Chat Analysis
📅 Current Date and Time (UTC): {datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")}
""")
app = create_ui()
app.launch(
share=True,
show_error=True,
server_name="0.0.0.0",
server_port=7860
)