OpenRepoAI / app.py
ErNewdev0's picture
feat: adding artefact like chat ui
5152acc verified
raw
history blame
22.2 kB
import gradio as gr
import os
from pathlib import Path
import subprocess
import requests
import json
import time
import asyncio
from datetime import datetime
import textwrap
import google.generativeai as genai
from typing import Generator, AsyncGenerator, List
from openai import AsyncOpenAI
import dotenv
# Load environment variables
dotenv.load_dotenv()
# Metadata
CURRENT_TIME = "2025-05-23 12:57:22"
CURRENT_USER = "ErRickow"
# Default API Keys (fallback if user doesn't provide their own)
DEFAULT_XAI_KEY = os.getenv("XAI_API_KEY", "xai-vfjhklL384Z4HKdItsZomqpFlXubTZJAFnISQUpV7dE8lRnWwYBVPSCxSTlu08wDbAcv720bx2dDiQ9x")
DEFAULT_GEMINI_KEY = os.getenv("GEMINI_API_KEY")
# API settings
OLLAMA_API = os.environ.get("OLLAMA_API", "http://localhost:11434")
XAI_BASE_URL = "https://api.x.ai/v1"
# Model lists
OLLAMA_MODELS = [
"llama2",
"codellama",
"mistral",
"neural-chat",
"starling-lm",
"dolphin-phi",
"phi",
"orca-mini"
]
XAI_MODELS = [
"grok-2-latest",
"grok-1",
]
GEMINI_MODELS = [
"gemini-1.5-mini",
"gemini-pro-vision",
]
GITHUB_TOKEN_HELP = """
### Cara Mendapatkan GitHub Token:
1. Kunjungi [GitHub Token Settings](https://github.com/settings/tokens)
2. Klik "Generate new token" > "Generate new token (classic)"
3. Beri nama token Anda di "Note"
4. Pilih scope:
- `repo` (untuk akses repository private)
- `read:packages` (opsional, untuk akses package)
5. Klik "Generate token"
6. **PENTING**: Salin token segera! Token hanya ditampilkan sekali
Token diperlukan untuk:
- Mengakses repository private
- Clone repository dengan rate limit lebih tinggi
- Mengakses fitur GitHub API
"""
GEMINI_API_HELP = """
### Cara Mendapatkan Gemini API Key:
1. Kunjungi [Google AI Studio](https://makersuite.google.com/app/apikey)
2. Login dengan akun Google Anda
3. Klik "Create API Key"
4. Salin API Key yang dihasilkan
Catatan:
- Gemini memberikan kuota gratis setiap bulan
- Key bisa dibuat ulang jika diperlukan
- Monitor penggunaan di [Google Cloud Console](https://console.cloud.google.com/)
"""
OLLAMA_HELP = """
### Cara Menggunakan Ollama:
1. Install Ollama dari [ollama.ai](https://ollama.ai)
2. Jalankan Ollama di komputer Anda
3. Pastikan Ollama berjalan di http://localhost:11434
Catatan:
- Ollama berjalan secara lokal di komputer Anda
- Tidak memerlukan API key
- Ideal untuk privasi dan penggunaan offline
"""
# Help texts
XAI_API_HELP = """
### Cara Mendapatkan X.AI (Grok) API Key:
1. Kunjungi [X.AI Developer Portal](https://x.ai)
2. Daftar/Login ke akun Anda
3. Buat API Key baru
4. Salin API Key
Note:
- Jika tidak diisi, akan menggunakan API key default
- Masukkan API key Anda sendiri jika default mencapai limit
"""
class AIProvider:
OLLAMA = "ollama"
GEMINI = "gemini"
XAI = "xai"
class RepoAnalyzer:
def __init__(self):
self.current_repo = None
self.repo_content = {}
self.chat_history = []
async def stream_xai_response(self, prompt: str, api_key: str = None, model: str = "grok-2-latest") -> AsyncGenerator[str, None]:
"""Stream response dari X.AI (Grok) API"""
try:
# Use default key if none provided
actual_key = api_key if api_key else DEFAULT_XAI_KEY
if not actual_key:
yield "โš ๏ธ API Key X.AI diperlukan. Gunakan key Anda sendiri atau tunggu reset limit default key."
return
client = AsyncOpenAI(
api_key=actual_key,
base_url=XAI_BASE_URL
)
# Prepare messages with repository context if available
messages = [
{"role": "system", "content": "Anda adalah asisten AI yang membantu menganalisis repository code. Berikan respons dalam Bahasa Indonesia."}
]
if self.current_repo:
context = f"Repository: {self.current_repo}\n\n"
repo_files = "\n".join(list(self.repo_content.keys()))
context += f"Files in repository:\n{repo_files}\n\n"
messages.append({"role": "system", "content": context})
messages.append({"role": "user", "content": prompt})
stream = await client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
self.chat_history.append({"role": "user", "content": prompt})
self.chat_history.append({"role": "assistant", "content": full_response})
except Exception as e:
error_msg = f"โš ๏ธ Error dalam X.AI API: {str(e)}"
print(error_msg)
yield error_msg
async def stream_gemini_response(self, prompt: str, api_key: str) -> AsyncGenerator[str, None]:
"""Stream response dari Gemini API"""
try:
if not api_key:
yield "โš ๏ธ API Key Gemini diperlukan. Klik icon bantuan (?) di samping input API Key untuk panduan mendapatkan key."
return
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-pro')
# Tambahkan konteks repository jika ada
if self.current_repo:
context = f"Repository: {self.current_repo}\n\n"
repo_files = "\n".join(list(self.repo_content.keys()))
context += f"Files in repository:\n{repo_files}\n\n"
prompt = context + prompt
response = model.generate_content(
prompt,
generation_config={
"temperature": 0.7,
"top_p": 0.8,
"top_k": 40
},
stream=True
)
full_response = ""
async for chunk in response:
if chunk.text:
full_response += chunk.text
yield chunk.text
self.chat_history.append({"role": "user", "content": prompt})
self.chat_history.append({"role": "assistant", "content": full_response})
except Exception as e:
error_msg = f"โš ๏ธ Error dalam Gemini API: {str(e)}\n\nPastikan API Key valid dan memiliki kuota yang cukup."
print(error_msg)
yield error_msg
def clone_repository(self, repo_url: str, github_token: str, branch: str = None) -> tuple[bool, str]:
"""Clone repository GitHub dengan autentikasi"""
if not repo_url:
return False, "โš ๏ธ URL repository diperlukan"
repo_name = repo_url.split('/')[-1].replace('.git', '')
if os.path.exists(repo_name):
subprocess.run(['rm', '-rf', repo_name], check=True)
try:
owner_repo = '/'.join(repo_url.split('/')[-2:])
# Cek apakah repository private
headers = {'Authorization': f'token {github_token}'} if github_token else {}
repo_check = requests.get(f"https://api.github.com/repos/{owner_repo}", headers=headers)
if repo_check.status_code == 404:
return False, "โš ๏ธ Repository tidak ditemukan. Periksa URL repository."
elif repo_check.status_code == 401:
return False, "โš ๏ธ Token GitHub tidak valid. Klik icon bantuan (?) untuk panduan mendapatkan token."
elif repo_check.status_code == 403 and repo_check.json().get('private', False):
return False, "โš ๏ธ Ini adalah repository private. Token GitHub dengan akses 'repo' diperlukan."
auth_url = f"https://{github_token}@github.com/{owner_repo}" if github_token else f"https://github.com/{owner_repo}"
cmd = ['git', 'clone']
if branch:
cmd.extend(['--branch', branch])
cmd.append(auth_url)
process = subprocess.run(
cmd,
capture_output=True,
text=True,
env=dict(os.environ, GIT_ASKPASS='echo', GIT_TERMINAL_PROMPT='0')
)
if process.returncode == 0:
self.current_repo = repo_name
# Scan dan simpan konten repository
file_count = 0
for file_path in Path(repo_name).rglob('*'):
if file_path.is_file() and '.git' not in str(file_path):
success, content = self.read_file_safely(str(file_path))
if success:
self.repo_content[str(file_path)] = content
file_count += 1
return True, f"โœ… Repository berhasil di-clone!\n\nNama: {repo_name}\nJumlah file: {file_count}\n\nAnda sekarang bisa mengajukan pertanyaan tentang repository ini."
else:
return False, f"โš ๏ธ Gagal clone repository:\n{process.stderr}"
except Exception as e:
return False, f"โš ๏ธ Error: {str(e)}"
def read_file_safely(self, file_path: str) -> tuple[bool, str]:
"""Baca file dengan aman menggunakan berbagai encoding"""
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
return True, content
except Exception as e:
continue
return False, "Tidak dapat membaca file dengan encoding yang didukung"
def create_ui():
# Get UTC time
utc_time = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
analyzer = RepoAnalyzer()
with gr.Blocks(title="Repository Chat Analysis", theme=gr.themes.Soft()) as app:
gr.Markdown("""
<style>
.container { max-width: 100% !important; padding: 1rem; }
.mobile-full { width: 100% !important; }
.file-list { margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
.file-item { display: flex; justify-content: space-between; padding: 5px 0; }
.file-remove { color: red; cursor: pointer; }
/* Enhanced code block styling */
.wrapper-artifact {
border: 1px solid #e0e0e0;
border-radius: 8px;
margin: 16px 0;
background: #f8f9fa;
}
.header-artifact {
padding: 8px 16px;
background: #f1f3f4;
border-bottom: 1px solid #e0e0e0;
border-radius: 8px 8px 0 0;
font-family: monospace;
display: flex;
justify-content: space-between;
align-items: center;
}
.content-artifact {
padding: 16px;
overflow-x: auto;
background: #ffffff;
border-radius: 0 0 8px 8px;
}
.content-artifact pre {
margin: 0;
padding: 0;
}
.copy-button {
background: #e0e0e0;
border: none;
padding: 4px 8px;
border-radius: 4px;
cursor: pointer;
font-size: 12px;
}
.copy-button:hover {
background: #d0d0d0;
}
/* Chat interface styling */
.fullscreen-chat {
height: calc(100vh - 200px) !important;
margin: 20px 0;
}
.chat-input {
position: sticky;
bottom: 0;
background: white;
padding: 10px 0;
border-top: 1px solid #eee;
}
.chat-message {
margin: 8px 0;
}
@media (max-width: 768px) {
.gr-form { flex-direction: column !important; }
.gr-group { margin: 0.5rem 0 !important; }
.fullscreen-chat {
height: calc(100vh - 300px) !important;
}
}
</style>
<script>
function copyToClipboard(text) {
navigator.clipboard.writeText(text).then(() => {
const button = event.target;
button.textContent = 'Copied!';
setTimeout(() => {
button.textContent = 'Copy';
}, 2000);
});
}
</script>
""")
with gr.Row(elem_classes="container"):
gr.Markdown(f"""
# ๐Ÿค– Repository Chat Analysis
๐Ÿ“… Current Date and Time (UTC): {utc_time}
""")
with gr.Tabs() as tabs:
with gr.Tab("๐Ÿ› ๏ธ Konfigurasi", elem_classes="mobile-full"):
provider = gr.Radio(
choices=[AIProvider.XAI, AIProvider.GEMINI, AIProvider.OLLAMA],
label="Penyedia AI",
value=AIProvider.XAI,
interactive=True,
elem_classes="mobile-full"
)
with gr.Group() as api_settings:
with gr.Group():
with gr.Row():
xai_key = gr.Textbox(
label="X.AI (Grok) API Key",
type="password",
placeholder="Opsional - Klik icon (?) untuk info",
show_label=True,
scale=3
)
with gr.Column(scale=1):
gr.Markdown(XAI_API_HELP)
with gr.Group():
with gr.Row():
gemini_key = gr.Textbox(
label="Gemini API Key",
type="password",
placeholder="Opsional - Kosongkan untuk gunakan key default",
show_label=True,
scale=3
)
with gr.Column(scale=1):
gr.Markdown(GEMINI_API_HELP)
# Model selection with better layout
with gr.Row():
model_dropdown = gr.Dropdown(
label="Model AI",
choices=XAI_MODELS,
value="grok-2-latest",
interactive=True,
elem_classes="mobile-full"
)
with gr.Tab("๐Ÿ“Š Analisis Repository", elem_classes="mobile-full"):
with gr.Group():
with gr.Row():
repo_url = gr.Textbox(
label="URL Repository GitHub",
placeholder="https://github.com/username/repository",
elem_classes="mobile-full"
)
with gr.Row():
with gr.Column(scale=2):
github_token = gr.Textbox(
label="Token GitHub",
type="password",
placeholder="Klik icon (?) untuk panduan",
elem_classes="mobile-full"
)
with gr.Column(scale=1):
branch = gr.Textbox(
label="Branch (opsional)",
placeholder="main",
elem_classes="mobile-full"
)
clone_button = gr.Button(
"๐Ÿ”„ Clone Repository",
variant="primary",
elem_classes="mobile-full"
)
clone_status = gr.Markdown(
value="",
label="Status Repository",
elem_classes="mobile-full"
)
# File selection components
with gr.Group():
gr.Markdown("### ๐Ÿ“Ž File yang Dipilih")
with gr.Row():
file_selector = gr.Dropdown(
label="Pilih File dari Repository",
choices=[],
multiselect=True,
elem_classes="mobile-full"
)
file_list = gr.HTML(
value="<div class='file-list'>Belum ada file yang dipilih</div>",
label="Daftar File Terpilih"
)
# Enhanced chat interface with fullscreen support
with gr.Group():
chat_history = gr.Chatbot(
label="๐Ÿ“ Riwayat Chat",
elem_classes="fullscreen-chat",
height=None, # Let CSS handle the height
show_label=True
)
with gr.Group(elem_classes="chat-input"):
chat_input = gr.Textbox(
label="๐Ÿ’ญ Tanyakan tentang Repository",
placeholder="Ketik pertanyaan Anda di sini...",
lines=3,
elem_classes="mobile-full"
)
with gr.Row():
clear_button = gr.Button("๐Ÿงน Bersihkan", variant="secondary")
send_button = gr.Button("๐Ÿ“ค Kirim", variant="primary")
loading_indicator = gr.HTML(
'<div id="loading" style="display:none">Memproses permintaan...</div>'
)
async def handle_chat(message, history, provider_choice, model_name, xai_key, gemini_key, selected_files):
if not analyzer.current_repo:
yield history + [[message, "โš ๏ธ Mohon clone repository terlebih dahulu sebelum mengajukan pertanyaan."]]
return
history = history or []
history.append([message, ""])
try:
# Add context about selected files to the prompt
file_context = ""
if selected_files:
file_context = "\n\nFile yang dipilih:\n"
for file in selected_files:
content = analyzer.repo_content.get(file, "")
# Wrap code blocks in custom artifact styling
file_context += f"""
<div class="wrapper-artifact">
<div class="header-artifact">
<span>{file}</span>
<button class="copy-button" onclick="copyToClipboard(`{content.replace('`', '\\`')}`)">Copy</button>
</div>
<div class="content-artifact">
<pre><code>{content}</code></pre>
</div>
</div>
"""
enhanced_message = f"{message}\n{file_context}"
full_response = ""
if provider_choice == AIProvider.XAI:
async for chunk in analyzer.stream_xai_response(enhanced_message, xai_key, model_name):
# Wrap code blocks in custom styling
chunk = process_code_blocks(chunk)
full_response += chunk
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
elif provider_choice == AIProvider.GEMINI:
async for chunk in analyzer.stream_gemini_response(enhanced_message, gemini_key or DEFAULT_GEMINI_KEY):
chunk = process_code_blocks(chunk)
full_response += chunk
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
else: # OLLAMA
response = analyze_with_ollama(model_name, enhanced_message)
response = process_code_blocks(response)
words = response.split()
for i in range(len(words)):
full_response = " ".join(words[:i+1])
await asyncio.sleep(0.08)
history[-1][1] = full_response
yield history
except Exception as e:
history[-1][1] = f"โš ๏ธ Error: {str(e)}"
yield history
def process_code_blocks(text):
"""Process markdown code blocks to use custom artifact styling"""
import re
# Pattern for code blocks with language specification
pattern = r"```(\w+)\n(.*?)```"
def replace_code_block(match):
language = match.group(1)
code = match.group(2)
return f"""
<div class="wrapper-artifact">
<div class="header-artifact">
<span>{language}</span>
<button class="copy-button" onclick="copyToClipboard(`{code.replace('`', '\\`')}`)">Copy</button>
</div>
<div class="content-artifact">
<pre><code>{code}</code></pre>
</div>
</div>
"""
# Replace all code blocks in the text
processed_text = re.sub(pattern, replace_code_block, text, flags=re.DOTALL)
return processed_text
if __name__ == "__main__":
print(f"""
๐Ÿš€ Memulai Repository Chat Analysis
๐Ÿ“… Current Date and Time (UTC): {datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")}
""")
app = create_ui()
app.launch(
share=True,
show_error=True,
server_name="0.0.0.0",
server_port=7860
)