# ============================================================ # CodeAgent - Sub-agent for code generation and execution # Modified to log inputs & outputs (file + jsonl) # Author: Mustafa Albakkar (modified) # ============================================================ import os import json import atexit import logging import traceback import io import sys import contextlib import tempfile from logging.handlers import RotatingFileHandler import gradio as gr from llama_cpp import Llama # ------------------------------------------------------------ # 🔧 إعداد التسجيل (Logs) — ملفي و console # ------------------------------------------------------------ LOG_DIR = os.environ.get("CODEAGENT_LOG_DIR", "logs") os.makedirs(LOG_DIR, exist_ok=True) log_file = os.path.join(LOG_DIR, "codeagent.log") jsonl_file = os.path.join(LOG_DIR, "records.jsonl") logger = logging.getLogger("CodeAgent") logger.setLevel(logging.INFO) # Console handler ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") ch.setFormatter(ch_formatter) logger.addHandler(ch) # Rotating file handler fh = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8") fh.setLevel(logging.INFO) fh_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") fh.setFormatter(fh_formatter) logger.addHandler(fh) # ------------------------------------------------------------ # 🧠 تحميل النموذج (Qwen2.5-Coder-14B-Instruct-GGUF) # ------------------------------------------------------------ llm = None try: logger.info("🔄 Attempting to load model...") llm = Llama.from_pretrained( repo_id="Qwen/Qwen2.5-Coder-14B-Instruct-GGUF", filename="qwen2.5-coder-14b-instruct-q6_k.gguf", n_gpu_layers=-1, n_ctx=4096, n_threads=4, ) logger.info("✅ Model loaded successfully.") except Exception as e: logger.exception("❌ Model load failed: %s", e) llm = None # ------------------------------------------------------------ # 🧩 دوال مساعدة # ------------------------------------------------------------ def safe_text(x): """تحويل النصوص أو القيم المعقدة إلى نص نظيف وصالح للعرض.""" if isinstance(x, str): return x.strip() try: return json.dumps(x, ensure_ascii=False) except Exception: return str(x) def compute_safe_max_tokens(prompt: str, max_ctx=4096): """تقدير عدد التوكنات الآمنة حسب طول الإدخال.""" try: l = max(1, len(prompt.split())) avail = max_ctx - l - 1 return max(128, min(1024, avail)) except Exception: return 256 def append_log_record(prompt: str, model_raw: str, code_clean: str, exec_result: str, extra: dict = None): """ يضيف سطر JSON (JSONL) في ملف للمعالجة اللاحقة. كل سطر يمثل طلب واحد مع الطوابع الزمنية. """ try: record = { "ts": logging.Formatter().formatTime(logging.LogRecord("x", logging.INFO, "", 0, "", (), None)), "prompt": prompt, "model_raw": model_raw, "code_clean": code_clean, "exec_result": exec_result, } if extra: record.update(extra) with open(jsonl_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") except Exception: logger.exception("Failed to append JSONL record.") # ------------------------------------------------------------ # 🧮 أداة تنفيذ الكود بأمان # ------------------------------------------------------------ def execute_python_code(code_str: str) -> str: """ تقوم هذه الدالة بتنفيذ الكود البرمجي الناتج من النموذج داخل بيئة معزولة، وتعيد المخرجات النصية أو الأخطاء. """ code_str = code_str.strip() # تنظيف الكود من علامات أو أمثلة غير ضرورية code_str = code_str.replace("```python", "").replace("```", "").strip() buffer = io.StringIO() try: # إعادة توجيه stdout و stderr with contextlib.redirect_stdout(buffer), contextlib.redirect_stderr(buffer): # نحدد local_env فقط (لا نعطي globals حساسة) local_env = {} # تنفيذ الكود — ضع في اعتبارك مخاطر exec عند تشغيل كود غير موثوق exec(code_str, {}, local_env) result = buffer.getvalue().strip() if not result: result = "✅ Code executed successfully (no printed output)." return result except Exception: tb = traceback.format_exc() return f"❌ Code execution error:\n{tb}" finally: buffer.close() # ------------------------------------------------------------ # 🎯 الدالة الأساسية للتوليد والتنفيذ (مع لوق مفصّل) # ------------------------------------------------------------ def generate_and_execute_fn(prompt: str) -> str: """ - توليد الكود البرمجي باستخدام نموذج Qwen2.5-Coder - تنفيذ الكود مباشرة - تسجيل المدخلات والمخرجات """ try: prompt = safe_text(prompt) if not prompt: return "⚠️ No prompt provided." if llm is None: logger.error("Model not loaded when requested.") return "❌ Model not loaded." # --- تحضير إدخال النموذج بصيغة محادثة --- formatted_prompt = ( f"<|im_start|>system\n" f"You are CodeAgent, a skilled Python coding assistant. " f"Generate correct, fully functional Python code that solves the given task.\n" f"Always wrap your code in triple backticks and do not include explanations.\n" f"<|im_end|>\n" f"<|im_start|>user\n{prompt}\n<|im_end|>\n" f"<|im_start|>assistant\n" ) max_tokens = compute_safe_max_tokens(formatted_prompt) # سجل المدخل (prompt) logger.info("🔔 New request received.") logger.info("➡️ Prompt: %s", prompt) # --- تنفيذ التوليد --- try: out = llm( formatted_prompt, max_tokens=max_tokens, temperature=0.2, stop=["<|im_end|>"] ) except TypeError: out = llm( formatted_prompt, max_new_tokens=max_tokens, temperature=0.2, stop=["<|im_end|>"] ) except Exception: logger.exception("Model generation failed.") return "❌ Model generation failed." # --- استخراج النص الناتج --- if isinstance(out, dict): # قد يختلف هيكل الإرجاع باختلاف binding try: text = out.get("choices", [{}])[0].get("text", "") or out.get("text", "") except Exception: text = json.dumps(out, ensure_ascii=False) else: text = str(out) model_raw = text or "" if not model_raw.strip(): logger.warning("Empty response from model.") return "⚠️ Empty response from model." logger.info("🧾 Model raw output (truncated 1000 chars):\n%s", model_raw[:1000]) # --- تنظيف النص لاستخراج الكود فقط --- # إذا كان النموذج يلف الكود بثلاثي backticks، نفرّغها code_candidate = model_raw.replace("```python", "").replace("```", "").strip() logger.info("🛠 Cleaned code (first 1000 chars):\n%s", code_candidate[:1000]) # --- تنفيذ الكود وإعادة النتيجة --- exec_result = execute_python_code(code_candidate) logger.info("📤 Execution result (truncated 2000 chars):\n%s", exec_result[:2000]) # إضافة سجل JSONL try: append_log_record(prompt=prompt, model_raw=model_raw, code_clean=code_candidate, exec_result=exec_result) except Exception: logger.exception("Failed to write jsonl record.") final_output = ( f"🧠 **Prompt:** {prompt}\n\n" f"💻 **Generated Code:**\n{code_candidate}\n\n" f"🧾 **Execution Result:**\n{exec_result}" ) return final_output except Exception: logger.exception("Generation/Execution error") return "❌ Internal error during generation/execution." # ------------------------------------------------------------ # 🧱 واجهة Gradio # ------------------------------------------------------------ iface = gr.Interface( fn=generate_and_execute_fn, inputs=gr.Textbox(lines=4, placeholder="💬 اكتب سؤالك البرمجي هنا..."), outputs=gr.Textbox(lines=15, label="💡 ناتج CodeAgent"), title="🤖 CodeAgent — Code Generator & Executor", description="وكيل يقوم بتوليد الكود البرمجي بلغة بايثون وتنفيذه مباشرة باستخدام نموذج Qwen2.5-Coder-14B-Instruct-GGUF." ) # ------------------------------------------------------------ # 🧹 إغلاق آمن للنموذج # ------------------------------------------------------------ def safe_close(): global llm try: if llm is not None: llm.close() logger.info("🧩 Model closed successfully.") except Exception: logger.exception("Error closing model.") finally: llm = None atexit.register(safe_close) # ------------------------------------------------------------ # 🚀 الإطلاق المتوافق مع الوكيل الأساسي # ------------------------------------------------------------ if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) logger.info(f"🚀 Launching CodeAgent on port {port}") iface.launch(server_name="0.0.0.0", server_port=port)