import os import zipfile import time import json import re import numpy as np import requests from bs4 import BeautifulSoup from langchain_chroma import Chroma from langchain_huggingface import HuggingFaceEmbeddings from sentence_transformers import CrossEncoder from rank_bm25 import BM25Okapi from groq import Groq from config import * from utils import normalize_arabic, detect_lang CATEGORY_KEYWORDS = { "A1": ["عليا", "superieur", "أ1"], "A2": ["متوسطة", "moyen", "أ2", "cadre"], "A3": ["أعوان", "a3", "صنف"], } # Keywords that signal a different context (to avoid topic collision) CONTEXT_PENALTIES = { "journal": ["مجلة", "مقال", "نشر", "ملكية فكرية", "revue", "article"], "competition": ["مناظرة", "دخول", "مرحلة", "concours", "cycle", "تسجيل"] } class ENAEngine: def __init__(self, groq_token=None): # Auto-extract DB if missing if os.path.exists("chroma_ena_db.zip") and not os.path.exists(CHROMA_PATH): try: with zipfile.ZipFile("chroma_ena_db.zip", 'r') as zip_ref: zip_ref.extractall(".") except: pass self.embeddings = HuggingFaceEmbeddings( model_name=EMBED_MODEL, model_kwargs={"device":"cpu"}, encode_kwargs={"normalize_embeddings":True} ) self.vectordb = Chroma( persist_directory=CHROMA_PATH, collection_name=COLLECTION_NAME, embedding_function=self.embeddings ) self.reranker = CrossEncoder(RERANK_MODEL, device="cpu") self.llm = Groq(api_key=groq_token) if groq_token else None self.bm25 = self._load_bm25() def _load_bm25(self): try: col = self.vectordb._collection.get(include=["documents"]) chunks = col["documents"] if not chunks: return None return BM25Okapi([c.lower().split() for c in chunks]) except: return None def hybrid_search(self, query: str, k: int = TOP_K_SEARCH): qn = normalize_arabic(query) vw, bw = (0.7, 0.3) if detect_lang(qn) == "ar" else (0.8, 0.2) # Vector Search vdocs = self.vectordb.similarity_search(qn, k=k) # If BM25 is not ready, return vector results in the correct format if not self.bm25: return [{"content": d.page_content, "meta": d.metadata, "rrf_score": 0.5} for d in vdocs] vrank = {d.page_content: i for i, d in enumerate(vdocs)} # BM25 Search try: col = self.vectordb._collection.get(include=["documents", "metadatas"]) chunks, metas = col["documents"], col["metadatas"] bsc = self.bm25.get_scores(qn.lower().split()) btop = np.argsort(bsc)[::-1][:k] brank = {chunks[i]: j for j, i in enumerate(btop)} # 1. Detection: Is this about a specific stage? target_cat = None for cat, bits in CATEGORY_KEYWORDS.items(): if any(b in qn.lower() for b in bits): target_cat = cat break # 2. Metadata Reconstruction mapping tmeta = {d.page_content: d.metadata for d in vdocs} for i in btop: if chunks[i] not in tmeta: tmeta[chunks[i]] = metas[i] # 3. RRF Fusion with Categorical Boosting texts = set(vrank)|set(brank) fused = {} for t in texts: # Rank score score = vw/(vrank.get(t,k+10)+RRF_K) + bw/(brank.get(t,k+10)+RRF_K) # Metadata-based Boost/Penalty m = tmeta.get(t, {}) m_cat = m.get("category", "") m_url = m.get("url", "") # Boost صفحات المناظرة عند سؤال الشروط if any(kw in qn for kw in ["شروط", "ترشح", "condition", "candidature"]): if m_cat in ("concours_ar", "concours_fr"): score *= 2.0 # عقوبة للصفحة الرئيسية if m_url.rstrip("/") in ("https://www.ena.tn/ar", "https://www.ena.tn/fr"): score *= 0.3 # Boost المعلومات العامة للشروط if "شروط" in qn or "condition" in qn.lower(): if "informations-generales" in m_url: score *= 1.5 content_lower = t.lower() # Boost الاستثناءات القانونية للسن — مهمة جداً EXCEPTION_KEYWORDS = ["استثناء", "مكتب تشغيل", "مكتب التشغيل", "سنوات العمل", "الجماعات المحلية", "الأمر عدد 1031", "dérogation", "bureau d'emploi"] is_conditions_q = any(kw in qn for kw in ["شروط", "سن", "عمر", "condition", "age"]) has_exception = any(kw in content_lower for kw in EXCEPTION_KEYWORDS) if is_conditions_q and has_exception: score *= 3.0 # رفع قوي لضمان ظهور الاستثناءات دائماً # 4. Contextual Penalty (The "Journal vs Competition" fix) is_competition_q = any(b in qn.lower() for b in CONTEXT_PENALTIES["competition"]) is_journal_q = any(b in qn.lower() for b in CONTEXT_PENALTIES["journal"]) has_journal_terms = any(b in content_lower for b in CONTEXT_PENALTIES["journal"]) # If it's a competition query, penalize journal content heavily if is_competition_q and has_journal_terms: score *= 0.1 # If it's specifically a documents query, boost known registration terms if "وثائق" in qn or "ملف" in qn: if "استمارة" in content_lower or "بطاقة تعريف" in content_lower: score *= 1.5 fused[t] = score ranked_texts = sorted(fused, key=fused.get, reverse=True)[:k] # Reconstruct results results = [] for t in ranked_texts: results.append({ "content": t, "meta": tmeta.get(t, {}), "rrf_score": fused[t], "is_global": tmeta.get(t, {}).get("category") == "other" }) return results except Exception: # Fallback if anything goes wrong with BM25 return [{"content": d.page_content, "meta": d.metadata, "rrf_score": 0.5} for d in vdocs] def rerank(self, query: str, results: list): if not results: return [] cands = results[:20] scores = self.reranker.predict([(query, r["content"][:1024]) for r in cands]) scored = [] for r, s in zip(cands, scores): conf = 1 / (1 + np.exp(-float(s))) # Sigmoid scored.append({**r, "confidence": conf}) return sorted(scored, key=lambda x:x["confidence"], reverse=True)[:TOP_K_RERANK] def expand_query(self, query: str): if not self.llm: return [query] try: resp = self.llm.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role":"user","content":f'Generate 3 short alternative search queries for: "{query}" in Arabic and French. Return JSON list only.'}], max_tokens=100 ) m = re.search(r'\[.*?\]', resp.choices[0].message.content, re.DOTALL) if m: return [query] + json.loads(m.group()) except: pass return [query] def should_scrape(self, query: str) -> str: """Agentic decision: Should I fetch a live page?""" # Simplified for now, can be LLM-based later q = query.lower() triggers = ["جديد", "موعد", "متى", "date", "nouvelle", "actualité"] if any(t in q for t in triggers): # Find best URL match for key, url in PAGE_URLS.items(): if key.replace('_',' ') in q: return url return None def scrape_url(self, url: str): try: r = requests.get(url, headers={"User-Agent":"Mozilla/5.0"}, timeout=10) soup = BeautifulSoup(r.text, "html.parser") for t in soup(["script","style","nav","footer","header"]): t.decompose() return soup.get_text(" ", strip=True)[:3500] except: return None def get_system_prompt(self, lang="ar"): if lang == "ar": return """أنت 'خبير قانوني' مختص حصرياً في قوانين المدرسة الوطنية للإدارة (ENA) بتونس. ### قواعد صارمة للإجابة: 1. **الالتزام بالسياق**: أجب فقط بناءً على النصوص المرفقة (Context). لا تستخدم معلوماتك العامة أبداً. 2. **شروط الترشح — الأركان الأربعة الإلزامية**: عند أي سؤال عن "شروط الترشح" أو "شروط المناظرة"، يجب ذكر الأركان الأربعة كاملة: - [السن]: اذكر الرقم بدقة (35 للمرحلة العليا، 40 لأ2 وأ3). - [الشهادات]: اذكر جميع الشهادات المذكورة في السياق. - [الجنسية]: الجنسية التونسية. - [الحقوق المدنية]: التمتع بالحقوق المدنية. - [الاستثناءات]: ذكر استثناءات السن (مكتب التشغيل + سنوات العمل الإداري) وجوبي إذا وُجدت في السياق. 3. **التمييز بين المناظرة الخارجية والداخلية**: - الخارجية: للطلبة وحاملي الشهادات من خارج الإدارة. - الداخلية: للموظفين والأعوان العموميين المرسمين. ميّز بينهما دائماً إذا وُجدا في السياق. 4. **التمييز بين ملفات الترشح**: - ملف مناظرة (concours): استمارة + نسخة ب.ت.و + شهادة علمية. هذا المقصود في 99% من الأسئلة. - ملف المجلة (Journal/Revue): مقال + سيرة داخلية + حقوق ملكية. لا تخلط بينهما أبداً! 5. **التثبت من المصدر**: إذا كان النص يتحدث عن 'المجلة التونسية للإدارة'، فهو ليس ملف المناظرة. 6. **في حال فقدان المعلومة**: إذا لم تجد الإجابة في النصوص المرفقة، قل صراحةً: "هذه المعلومة غير متوفرة في وثائقي الحالية، يرجى التواصل مع ENA مباشرة: info@ena.tn أو 71 848 300" 7. **الدقة الرقمية**: يُمنع تجاهل أي رقم أو سن أو استثناء قانوني مذكور في السياق. 8. **المصادر**: اذكر رقم المصدر [1] بعد كل معلومة مباشرة.""" else: return """Tu es un Expert Juridique ENA Tunisie. ### Règles strictes : 1. **Contexte uniquement** : Ne réponds que sur la base du contexte fourni. N'utilise JAMAIS tes connaissances générales. 2. **Conditions de candidature — 4 éléments obligatoires** : Pour toute question sur les "conditions", mentionner impérativement : - [Âge] : 35 ans max (Cycle Supérieur), 40 ans (A2/A3). - [Diplômes] : Tous les diplômes mentionnés dans le contexte. - [Nationalité] : Nationalité tunisienne. - [Droits civils] : Jouissance des droits civils. - [Dérogations] : Dérogations d'âge (bureau d'emploi + service public) si présentes. 3. **Concours Externe vs Interne** : - Externe : Pour les étudiants et diplômés hors administration. - Interne : Pour les fonctionnaires titulaires. 4. **Information manquante** : Si l'info n'est pas dans le contexte : "Information non disponible. Contactez l'ENA : info@ena.tn ou 71 848 300" 5. **Rigueur numérique** : Ne jamais omettre un chiffre, un âge ou une dérogation légale. 6. **Sources** : Citer la référence [1] après chaque information."""