Spaces:
Sleeping
Sleeping
File size: 12,642 Bytes
4fa75b3 1026172 6f0b2dc 15242ce 1026172 4fa75b3 1026172 397e24f 1026172 397e24f 1026172 397e24f 1026172 6f0b2dc 397e24f 6f0b2dc e298e9f 6f0b2dc e298e9f 818d0d7 d808d33 818d0d7 6f0b2dc 15242ce 6f0b2dc 15242ce 6f0b2dc 397e24f 6f0b2dc 397e24f 1026172 6f0b2dc e298e9f 6f0b2dc e298e9f d808d33 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | import os
import zipfile
import time
import json
import re
import numpy as np
import requests
from bs4 import BeautifulSoup
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from sentence_transformers import CrossEncoder
from rank_bm25 import BM25Okapi
from groq import Groq
from config import *
from utils import normalize_arabic, detect_lang
CATEGORY_KEYWORDS = {
"A1": ["عليا", "superieur", "أ1"],
"A2": ["متوسطة", "moyen", "أ2", "cadre"],
"A3": ["أعوان", "a3", "صنف"],
}
# Keywords that signal a different context (to avoid topic collision)
CONTEXT_PENALTIES = {
"journal": ["مجلة", "مقال", "نشر", "ملكية فكرية", "revue", "article"],
"competition": ["مناظرة", "دخول", "مرحلة", "concours", "cycle", "تسجيل"]
}
class ENAEngine:
def __init__(self, groq_token=None):
# Auto-extract DB if missing
if os.path.exists("chroma_ena_db.zip") and not os.path.exists(CHROMA_PATH):
try:
with zipfile.ZipFile("chroma_ena_db.zip", 'r') as zip_ref:
zip_ref.extractall(".")
except:
pass
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBED_MODEL,
model_kwargs={"device":"cpu"},
encode_kwargs={"normalize_embeddings":True}
)
self.vectordb = Chroma(
persist_directory=CHROMA_PATH,
collection_name=COLLECTION_NAME,
embedding_function=self.embeddings
)
self.reranker = CrossEncoder(RERANK_MODEL, device="cpu")
self.llm = Groq(api_key=groq_token) if groq_token else None
self.bm25 = self._load_bm25()
def _load_bm25(self):
try:
col = self.vectordb._collection.get(include=["documents"])
chunks = col["documents"]
if not chunks: return None
return BM25Okapi([c.lower().split() for c in chunks])
except:
return None
def hybrid_search(self, query: str, k: int = TOP_K_SEARCH):
qn = normalize_arabic(query)
vw, bw = (0.7, 0.3) if detect_lang(qn) == "ar" else (0.8, 0.2)
# Vector Search
vdocs = self.vectordb.similarity_search(qn, k=k)
# If BM25 is not ready, return vector results in the correct format
if not self.bm25:
return [{"content": d.page_content, "meta": d.metadata, "rrf_score": 0.5} for d in vdocs]
vrank = {d.page_content: i for i, d in enumerate(vdocs)}
# BM25 Search
try:
col = self.vectordb._collection.get(include=["documents", "metadatas"])
chunks, metas = col["documents"], col["metadatas"]
bsc = self.bm25.get_scores(qn.lower().split())
btop = np.argsort(bsc)[::-1][:k]
brank = {chunks[i]: j for j, i in enumerate(btop)}
# 1. Detection: Is this about a specific stage?
target_cat = None
for cat, bits in CATEGORY_KEYWORDS.items():
if any(b in qn.lower() for b in bits):
target_cat = cat
break
# 2. Metadata Reconstruction mapping
tmeta = {d.page_content: d.metadata for d in vdocs}
for i in btop:
if chunks[i] not in tmeta: tmeta[chunks[i]] = metas[i]
# 3. RRF Fusion with Categorical Boosting
texts = set(vrank)|set(brank)
fused = {}
for t in texts:
# Rank score
score = vw/(vrank.get(t,k+10)+RRF_K) + bw/(brank.get(t,k+10)+RRF_K)
# Metadata-based Boost/Penalty
m = tmeta.get(t, {})
m_cat = m.get("category", "")
m_url = m.get("url", "")
# Boost صفحات المناظرة عند سؤال الشروط
if any(kw in qn for kw in ["شروط", "ترشح", "condition", "candidature"]):
if m_cat in ("concours_ar", "concours_fr"):
score *= 2.0
# عقوبة للصفحة الرئيسية
if m_url.rstrip("/") in ("https://www.ena.tn/ar", "https://www.ena.tn/fr"):
score *= 0.3
# Boost المعلومات العامة للشروط
if "شروط" in qn or "condition" in qn.lower():
if "informations-generales" in m_url:
score *= 1.5
content_lower = t.lower()
# Boost الاستثناءات القانونية للسن — مهمة جداً
EXCEPTION_KEYWORDS = ["استثناء", "مكتب تشغيل", "مكتب التشغيل",
"سنوات العمل", "الجماعات المحلية",
"الأمر عدد 1031", "dérogation", "bureau d'emploi"]
is_conditions_q = any(kw in qn for kw in ["شروط", "سن", "عمر", "condition", "age"])
has_exception = any(kw in content_lower for kw in EXCEPTION_KEYWORDS)
if is_conditions_q and has_exception:
score *= 3.0 # رفع قوي لضمان ظهور الاستثناءات دائماً
# 4. Contextual Penalty (The "Journal vs Competition" fix)
is_competition_q = any(b in qn.lower() for b in CONTEXT_PENALTIES["competition"])
is_journal_q = any(b in qn.lower() for b in CONTEXT_PENALTIES["journal"])
has_journal_terms = any(b in content_lower for b in CONTEXT_PENALTIES["journal"])
# If it's a competition query, penalize journal content heavily
if is_competition_q and has_journal_terms:
score *= 0.1
# If it's specifically a documents query, boost known registration terms
if "وثائق" in qn or "ملف" in qn:
if "استمارة" in content_lower or "بطاقة تعريف" in content_lower:
score *= 1.5
fused[t] = score
ranked_texts = sorted(fused, key=fused.get, reverse=True)[:k]
# Reconstruct results
results = []
for t in ranked_texts:
results.append({
"content": t,
"meta": tmeta.get(t, {}),
"rrf_score": fused[t],
"is_global": tmeta.get(t, {}).get("category") == "other"
})
return results
except Exception:
# Fallback if anything goes wrong with BM25
return [{"content": d.page_content, "meta": d.metadata, "rrf_score": 0.5} for d in vdocs]
def rerank(self, query: str, results: list):
if not results: return []
cands = results[:20]
scores = self.reranker.predict([(query, r["content"][:1024]) for r in cands])
scored = []
for r, s in zip(cands, scores):
conf = 1 / (1 + np.exp(-float(s))) # Sigmoid
scored.append({**r, "confidence": conf})
return sorted(scored, key=lambda x:x["confidence"], reverse=True)[:TOP_K_RERANK]
def expand_query(self, query: str):
if not self.llm: return [query]
try:
resp = self.llm.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role":"user","content":f'Generate 3 short alternative search queries for: "{query}" in Arabic and French. Return JSON list only.'}],
max_tokens=100
)
m = re.search(r'\[.*?\]', resp.choices[0].message.content, re.DOTALL)
if m: return [query] + json.loads(m.group())
except:
pass
return [query]
def should_scrape(self, query: str) -> str:
"""Agentic decision: Should I fetch a live page?"""
# Simplified for now, can be LLM-based later
q = query.lower()
triggers = ["جديد", "موعد", "متى", "date", "nouvelle", "actualité"]
if any(t in q for t in triggers):
# Find best URL match
for key, url in PAGE_URLS.items():
if key.replace('_',' ') in q: return url
return None
def scrape_url(self, url: str):
try:
r = requests.get(url, headers={"User-Agent":"Mozilla/5.0"}, timeout=10)
soup = BeautifulSoup(r.text, "html.parser")
for t in soup(["script","style","nav","footer","header"]): t.decompose()
return soup.get_text(" ", strip=True)[:3500]
except:
return None
def get_system_prompt(self, lang="ar"):
if lang == "ar":
return """أنت 'خبير قانوني' مختص حصرياً في قوانين المدرسة الوطنية للإدارة (ENA) بتونس.
### قواعد صارمة للإجابة:
1. **الالتزام بالسياق**: أجب فقط بناءً على النصوص المرفقة (Context). لا تستخدم معلوماتك العامة أبداً.
2. **شروط الترشح — الأركان الأربعة الإلزامية**:
عند أي سؤال عن "شروط الترشح" أو "شروط المناظرة"، يجب ذكر الأركان الأربعة كاملة:
- [السن]: اذكر الرقم بدقة (35 للمرحلة العليا، 40 لأ2 وأ3).
- [الشهادات]: اذكر جميع الشهادات المذكورة في السياق.
- [الجنسية]: الجنسية التونسية.
- [الحقوق المدنية]: التمتع بالحقوق المدنية.
- [الاستثناءات]: ذكر استثناءات السن (مكتب التشغيل + سنوات العمل الإداري) وجوبي إذا وُجدت في السياق.
3. **التمييز بين المناظرة الخارجية والداخلية**:
- الخارجية: للطلبة وحاملي الشهادات من خارج الإدارة.
- الداخلية: للموظفين والأعوان العموميين المرسمين.
ميّز بينهما دائماً إذا وُجدا في السياق.
4. **التمييز بين ملفات الترشح**:
- ملف مناظرة (concours): استمارة + نسخة ب.ت.و + شهادة علمية. هذا المقصود في 99% من الأسئلة.
- ملف المجلة (Journal/Revue): مقال + سيرة داخلية + حقوق ملكية. لا تخلط بينهما أبداً!
5. **التثبت من المصدر**: إذا كان النص يتحدث عن 'المجلة التونسية للإدارة'، فهو ليس ملف المناظرة.
6. **في حال فقدان المعلومة**: إذا لم تجد الإجابة في النصوص المرفقة، قل صراحةً:
"هذه المعلومة غير متوفرة في وثائقي الحالية، يرجى التواصل مع ENA مباشرة: info@ena.tn أو 71 848 300"
7. **الدقة الرقمية**: يُمنع تجاهل أي رقم أو سن أو استثناء قانوني مذكور في السياق.
8. **المصادر**: اذكر رقم المصدر [1] بعد كل معلومة مباشرة."""
else:
return """Tu es un Expert Juridique ENA Tunisie.
### Règles strictes :
1. **Contexte uniquement** : Ne réponds que sur la base du contexte fourni. N'utilise JAMAIS tes connaissances générales.
2. **Conditions de candidature — 4 éléments obligatoires** :
Pour toute question sur les "conditions", mentionner impérativement :
- [Âge] : 35 ans max (Cycle Supérieur), 40 ans (A2/A3).
- [Diplômes] : Tous les diplômes mentionnés dans le contexte.
- [Nationalité] : Nationalité tunisienne.
- [Droits civils] : Jouissance des droits civils.
- [Dérogations] : Dérogations d'âge (bureau d'emploi + service public) si présentes.
3. **Concours Externe vs Interne** :
- Externe : Pour les étudiants et diplômés hors administration.
- Interne : Pour les fonctionnaires titulaires.
4. **Information manquante** : Si l'info n'est pas dans le contexte :
"Information non disponible. Contactez l'ENA : info@ena.tn ou 71 848 300"
5. **Rigueur numérique** : Ne jamais omettre un chiffre, un âge ou une dérogation légale.
6. **Sources** : Citer la référence [1] après chaque information.""" |