|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import tempfile |
|
|
import zipfile |
|
|
from pathlib import Path |
|
|
import gradio as gr |
|
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
|
|
|
ALLOWED_EXTENSIONS = { |
|
|
'.py', '.js', '.html', '.css', '.txt', '.json', '.md', |
|
|
'.php', '.xml', '.csv', '.zip', '.png', '.jpg', '.svg', '.ico' |
|
|
} |
|
|
MODEL_ID = os.environ.get('HF_MODEL_ID', 'Qwen/Qwen3-Coder-480B-A35B-Instruct-FP8') |
|
|
HF_TOKEN = os.environ.get('HF_TOKEN') |
|
|
|
|
|
class ProjectTank: |
|
|
def __init__(self): |
|
|
self.work_dir = tempfile.mkdtemp(prefix="tank_dev_") |
|
|
self.files_cache = {} |
|
|
self.structure = "" |
|
|
|
|
|
def sync_files(self, file_objs): |
|
|
if not file_objs: return "⚠️ لم يتم اختيار ملفات.", [] |
|
|
|
|
|
if os.path.exists(self.work_dir): |
|
|
shutil.rmtree(self.work_dir) |
|
|
self.work_dir = tempfile.mkdtemp(prefix="tank_dev_") |
|
|
|
|
|
files = file_objs if isinstance(file_objs, list) else [file_objs] |
|
|
for f in files: |
|
|
name = os.path.basename(f.name) |
|
|
if name.lower().endswith('.zip'): |
|
|
with zipfile.ZipFile(f.name, 'r') as z: |
|
|
z.extractall(self.work_dir) |
|
|
else: |
|
|
dest = os.path.join(self.work_dir, name) |
|
|
shutil.copy(f.name, dest) |
|
|
|
|
|
self._reload_memory() |
|
|
return f"🚀 تم تحميل {len(self.files_cache)} ملف بنجاح.", list(self.files_cache.keys()) |
|
|
|
|
|
def _reload_memory(self): |
|
|
self.files_cache = {} |
|
|
structure_list = [] |
|
|
for p in Path(self.work_dir).rglob('*'): |
|
|
if p.is_file(): |
|
|
rel = str(p.relative_to(self.work_dir)).replace("\\", "/") |
|
|
structure_list.append(f"- {rel}") |
|
|
try: |
|
|
if p.suffix.lower() in ['.html', '.css', '.js', '.json', '.txt', '.md', '.py']: |
|
|
self.files_cache[rel] = p.read_text(encoding='utf-8', errors='ignore') |
|
|
except: pass |
|
|
self.structure = "\n".join(structure_list) |
|
|
|
|
|
def update_file_content(self, filename, content): |
|
|
path = os.path.join(self.work_dir, filename) |
|
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
with open(path, 'w', encoding='utf-8') as f: |
|
|
f.write(content) |
|
|
self.files_cache[filename] = content |
|
|
|
|
|
def build_preview(self): |
|
|
"""رادار ذكي للبحث عن ملف التشغيل في أي عمق""" |
|
|
all_keys = list(self.files_cache.keys()) |
|
|
|
|
|
entry = next((k for k in all_keys if k.lower().endswith('index.html')), None) |
|
|
|
|
|
if not entry: |
|
|
|
|
|
entry = next((k for k in all_keys if k.endswith('.html')), None) |
|
|
|
|
|
if not entry: |
|
|
return f"<div style='text-align:center; padding:50px;'>⚠️ لم يتم العثور على ملف HTML للمعاينة.<br>الملفات المكتشفة: {str(all_keys[:3])}</div>" |
|
|
|
|
|
html = self.files_cache[entry] |
|
|
base_dir = os.path.dirname(entry) |
|
|
|
|
|
|
|
|
for name, code in self.files_cache.items(): |
|
|
fname = os.path.basename(name) |
|
|
if name.endswith('.css'): |
|
|
html = re.sub(rf'<link.*?href=["\'].*?{fname}["\'].*?>', f"<style>\n{code}\n</style>", html) |
|
|
if name.endswith('.js'): |
|
|
html = re.sub(rf'<script.*?src=["\'].*?{fname}["\'].*?></script>', f"<script>\n{code}\n</script>", html) |
|
|
|
|
|
return html |
|
|
|
|
|
tank = ProjectTank() |
|
|
client = InferenceClient(api_key=HF_TOKEN) if HF_TOKEN else None |
|
|
|
|
|
|
|
|
|
|
|
def chat_and_code(message, history): |
|
|
if not client: |
|
|
yield history + [{"role": "assistant", "content": "⚠️ مفقود HF_TOKEN"}] |
|
|
return |
|
|
|
|
|
context = f"### PROJECT STRUCTURE ###\n{tank.structure}\n\n### FULL FILE CONTENTS ###\n" |
|
|
for name, content in tank.files_cache.items(): |
|
|
context += f"--- FILE: {name} ---\n{content}\n" |
|
|
|
|
|
system_instr = ( |
|
|
"You are an AI IDE. Provide short explanations then output the FULL modified files using this format:\n" |
|
|
"[TARGET: filename]\n```\nUpdated Code\n```" |
|
|
) |
|
|
|
|
|
messages = [{"role": "system", "content": system_instr}] |
|
|
for m in history: |
|
|
messages.append({"role": m["role"], "content": str(m["content"])}) |
|
|
messages.append({"role": "user", "content": f"{context}\n\nUser Request: {message}"}) |
|
|
|
|
|
response = "" |
|
|
history.append({"role": "user", "content": message}) |
|
|
history.append({"role": "assistant", "content": ""}) |
|
|
|
|
|
try: |
|
|
stream = client.chat_completion(model=MODEL_ID, messages=messages, max_tokens=6000, stream=True) |
|
|
for chunk in stream: |
|
|
if hasattr(chunk, 'choices') and chunk.choices: |
|
|
token = chunk.choices[0].delta.content |
|
|
if token: |
|
|
response += token |
|
|
history[-1]["content"] = response |
|
|
yield history |
|
|
except Exception as e: |
|
|
history[-1]["content"] = f"❌ خطأ: {str(e)}" |
|
|
yield history |
|
|
|
|
|
def apply_and_preview(history): |
|
|
if not history: return "لا توجد بيانات", None, [] |
|
|
content = history[-1]["content"] |
|
|
matches = re.findall(r"\[TARGET:\s*(.*?)\]\s*```.*?\n(.*?)```", content, re.DOTALL | re.IGNORECASE) |
|
|
|
|
|
for filename, code in matches: |
|
|
tank.update_file_content(filename.strip(), code.strip()) |
|
|
|
|
|
tank._reload_memory() |
|
|
return "✅ تم التحديث بنجاح!", tank.build_preview(), list(tank.files_cache.keys()) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Monochrome(), title="AI Tank IDE Pro") as demo: |
|
|
gr.Markdown("# 🛡️ AI Tank IDE Pro\n**نظام المعاينة العميقة والتحصين ضد الأخطاء**") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
upload = gr.UploadButton("📁 ارفع المشروع (ZIP/Files)", file_count="multiple") |
|
|
file_list = gr.Dropdown(label="ملفات المشروع") |
|
|
status = gr.Markdown("🟢 جاهز") |
|
|
|
|
|
chat = gr.Chatbot(type="messages", height=400) |
|
|
msg = gr.Textbox(placeholder="اطلب تعديلاً...", show_label=False) |
|
|
with gr.Row(): |
|
|
send = gr.Button("إرسال", variant="primary") |
|
|
apply = gr.Button("⚡ تطبيق المعاينة", variant="stop") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("🌐 المعاينة الفورية"): |
|
|
preview = gr.HTML() |
|
|
with gr.TabItem("📝 المحرر"): |
|
|
editor = gr.Code(label="كود الملف", lines=25, interactive=True) |
|
|
|
|
|
|
|
|
upload.upload(tank.sync_files, upload, [status, file_list]).then(tank.build_preview, None, preview) |
|
|
send.click(chat_and_code, [msg, chat], chat).then(lambda: "", None, msg) |
|
|
apply.click(apply_and_preview, chat, [status, preview, file_list]) |
|
|
file_list.change(lambda n: tank.files_cache.get(n, ""), file_list, editor) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue().launch(server_name="0.0.0.0", server_port=7860) |