import gradio as gr import os from groq import Groq from sentence_transformers import SentenceTransformer import chromadb import pypdf import hashlib # ====================== # إعداد العملاء # ====================== groq_client = Groq(api_key=os.environ["GROQ_API_KEY"]) embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") chroma_client = chromadb.Client() collection = chroma_client.get_or_create_collection("knowledge") # ====================== # النماذج بالترتيب # ====================== MODELS = [ "llama-3.3-70b-versatile", "llama-3.1-8b-instant", "llama3-70b-8192", "llama3-8b-8192", ] def call_groq(messages): for model in MODELS: try: response = groq_client.chat.completions.create( model=model, messages=messages, max_tokens=2048 ) return response.choices[0].message.content, model except Exception as e: error = str(e) if "rate_limit_exceeded" in error or "429" in error: print(f"⚠️ {model} وصل للحد، جارٍ التبديل...") continue elif "decommissioned" in error or "model_not_found" in error: print(f"⚠️ {model} متوقف، جارٍ التبديل...") continue else: raise e return None, None # ====================== # تحميل ملفات المعرفة # ====================== def load_pdfs(folder="knowledge"): if not os.path.exists(folder): os.makedirs(folder) return for filename in os.listdir(folder): if filename.endswith(".pdf"): filepath = os.path.join(folder, filename) doc_id = hashlib.md5(filename.encode()).hexdigest() existing = collection.get(where={"source": filename}) if existing["ids"]: continue reader = pypdf.PdfReader(filepath) for i, page in enumerate(reader.pages): text = page.extract_text() if text and text.strip(): chunk_id = f"{doc_id}_page_{i}" embedding = embedding_model.encode(text).tolist() collection.add( ids=[chunk_id], embeddings=[embedding], documents=[text], metadatas=[{"source": filename, "page": i+1}] ) print(f"✅ تم تحميل: {filename}") load_pdfs() # ====================== # البحث في المعرفة # ====================== def search_knowledge(query, n_results=3): if collection.count() == 0: return "" embedding = embedding_model.encode(query).tolist() results = collection.query( query_embeddings=[embedding], n_results=min(n_results, collection.count()) ) if not results["documents"][0]: return "" context = "" for doc, meta in zip(results["documents"][0], results["metadatas"][0]): context += f"\n📄 من ملف ({meta['source']}) صفحة {meta['page']}:\n{doc}\n" return context # ====================== # System Prompt # ====================== SYSTEM_PROMPT = """أنت مساعد متخصص في الأمن السيبراني باللغة العربية. اسمك "درع" وأنت خبير في: - تحليل التهديدات السيبرانية - شرح الثغرات الأمنية وكيفية الحماية منها - مفاهيم اختبار الاختراق الأخلاقي - أمن الشبكات والأنظمة - التوعية الأمنية - معايير الأمن مثل ISO 27001 و NIST - الاستجابة للحوادث الأمنية - تحليل البرمجيات الخبيثة - أمن التطبيقات OWASP قواعد مهمة: - أجب دائماً بالعربية الفصحى الواضحة - إذا وجدت معلومات في السياق المرفق فاستخدمها أولاً - قدم معلومات تعليمية وأخلاقية فقط - لا تساعد في أي نشاط غير قانوني أو ضار - عند شرح الثغرات، ركز على الحماية وليس الاستغلال""" # ====================== # دالة المحادثة # ====================== def chat(message, history): if not message.strip(): return "", history context = search_knowledge(message) user_message = message if context: user_message = f"""السؤال: {message} معلومات ذات صلة من قاعدة المعرفة: {context} أجب على السؤال مستفيداً من المعلومات أعلاه إن كانت مفيدة.""" messages = [{"role": "system", "content": SYSTEM_PROMPT}] # آخر 10 عناصر فقط = 5 أسئلة + 5 أجوبة history_to_send = history[-10:] for item in history_to_send: if isinstance(item, dict): if item.get("role") in ["user", "assistant"]: messages.append({"role": item["role"], "content": item["content"]}) elif isinstance(item, (list, tuple)) and len(item) == 2: messages.append({"role": "user", "content": item[0]}) messages.append({"role": "assistant", "content": item[1]}) messages.append({"role": "user", "content": user_message}) try: reply, used_model = call_groq(messages) if reply is None: reply = "⏳ جميع النماذج وصلت للحد اليومي، يرجى المحاولة بعد ساعتين." else: print(f"✅ استخدم النموذج: {used_model}") history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": reply}) return "", history except Exception as e: history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": f"❌ خطأ غير متوقع: {str(e)}"}) return "", history # ====================== # رفع ملفات جديدة # ====================== def upload_file(file): if file is None: return "⚠️ لم يتم رفع أي ملف" os.makedirs("knowledge", exist_ok=True) filename = os.path.basename(file.name) dest = os.path.join("knowledge", filename) with open(file.name, "rb") as f: with open(dest, "wb") as out: out.write(f.read()) load_pdfs() count = collection.count() return f"✅ تم رفع وفهرسة: {filename} — إجمالي الصفحات: {count}" # ====================== # CSS # ====================== css = """ @import url('https://fonts.googleapis.com/css2?family=Tajawal:wght@400;700&display=swap'); .gradio-container { font-family: 'Tajawal', sans-serif !important; direction: rtl !important; } .prose, .prose p, .prose li { direction: rtl !important; text-align: right !important; font-family: 'Tajawal', sans-serif !important; } .main-header { background: linear-gradient(135deg, #0a0a0a, #1a1a2e, #16213e); padding: 30px; border-radius: 16px; text-align: center; margin-bottom: 20px; border: 1px solid #00ff88; box-shadow: 0 0 30px rgba(0,255,136,0.2); } .tip-box { background: #0d1117; border: 1px solid #30363d; border-right: 4px solid #00ff88; border-radius: 10px; padding: 15px; margin-top: 10px; } .developer-box { background: linear-gradient(135deg, #0d1117, #1a1a2e); border: 1px solid #00ff88; border-radius: 10px; padding: 15px; margin-top: 15px; text-align: center; } footer { display: none !important; } """ # ====================== # الواجهة # ====================== with gr.Blocks(title="درع - المساعد الأمني العربي") as demo: gr.HTML("""

🛡️ درع

المساعد الذكي للأمن السيبراني

مدعوم بقاعدة معرفة متخصصة

""") with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( label="", height=500, show_label=False, rtl=True, avatar_images=( "https://api.dicebear.com/7.x/avataaars/svg?seed=user", "https://api.dicebear.com/7.x/bottts/svg?seed=shield" ) ) with gr.Row(): msg_input = gr.Textbox( label="", placeholder="اسأل عن أي موضوع في الأمن السيبراني...", scale=4, rtl=True, lines=2 ) send_btn = gr.Button("إرسال 🚀", scale=1, variant="primary") clear_btn = gr.Button("🗑️ محادثة جديدة", variant="secondary") with gr.Column(scale=1): gr.Markdown("### 📂 رفع ملفات المعرفة") file_upload = gr.File( label="ارفع PDF", file_types=[".pdf"] ) upload_status = gr.Textbox( label="", interactive=False, rtl=True ) file_upload.change( fn=upload_file, inputs=file_upload, outputs=upload_status ) gr.Markdown("### 💡 أسئلة مقترحة") gr.HTML("""

🔴 ما هي أخطر ثغرات OWASP؟

🔴 كيف أحمي شبكتي من الهجمات؟

🔴 ما هو الفرق بين IDS و IPS؟

🔴 كيف أبدأ تعلم اختبار الاختراق؟

🔴 ما هي أنواع هجمات التصيد؟

🔴 شرح هجوم SQL Injection

""") gr.HTML("""

⚙️ تطوير وإعداد

د. معاذ الصوفي

متخصص في الأمن السيبراني

""") send_btn.click( fn=chat, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot] ) msg_input.submit( fn=chat, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot] ) clear_btn.click( fn=lambda: ([], ""), outputs=[chatbot, msg_input] ) if __name__ == "__main__": demo.launch( css=css, theme=gr.themes.Soft( primary_hue="green", secondary_hue="blue", neutral_hue="slate" ) )