|
|
import re |
|
|
import difflib |
|
|
import numpy as np |
|
|
import torch |
|
|
import gradio as gr |
|
|
import pyarabic.araby as araby |
|
|
|
|
|
import stanza |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
from transformers import AutoTokenizer as HFTokenizer, AutoModelForSeq2SeqLM |
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
import arabert.preprocess |
|
|
import yake |
|
|
from bert_score import score as bertscore |
|
|
|
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
torch.set_grad_enabled(False) |
|
|
|
|
|
|
|
|
ARAELECTRA_NAME = "aubmindlab/araelectra-base-discriminator" |
|
|
SBERT_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" |
|
|
QG_MODEL = "Mihakram/AraT5-base-question-generation" |
|
|
|
|
|
|
|
|
stanza.download("ar", verbose=False) |
|
|
nlp = stanza.Pipeline(lang="ar", processors="tokenize,pos,lemma,depparse", tokenize_no_ssplit=False, verbose=False) |
|
|
|
|
|
|
|
|
arabert_prep = arabert.preprocess.ArabertPreprocessor(ARAELECTRA_NAME) |
|
|
|
|
|
|
|
|
tokenizer_electra = AutoTokenizer.from_pretrained(ARAELECTRA_NAME) |
|
|
model_electra = AutoModel.from_pretrained(ARAELECTRA_NAME).to(DEVICE) |
|
|
|
|
|
|
|
|
sbert = SentenceTransformer(SBERT_MODEL, device=DEVICE) |
|
|
|
|
|
|
|
|
qg_tokenizer = HFTokenizer.from_pretrained(QG_MODEL) |
|
|
qg_model = AutoModelForSeq2SeqLM.from_pretrained(QG_MODEL).to(DEVICE) |
|
|
|
|
|
|
|
|
def normalize(s: str) -> str: |
|
|
t = araby.strip_tashkeel(s) |
|
|
t = t.replace("آ","ا").replace("أ","ا").replace("إ","ا").replace("ى","ي") |
|
|
t = t.replace("ـ","") |
|
|
t = " ".join(t.split()) |
|
|
return t |
|
|
|
|
|
def build_char_map(src: str, tgt: str): |
|
|
sm = difflib.SequenceMatcher(a=src, b=tgt) |
|
|
src2tgt = [-1] * len(src) |
|
|
for tag, i1, i2, j1, j2 in sm.get_opcodes(): |
|
|
if tag == "equal": |
|
|
for k in range(i2 - i1): |
|
|
src2tgt[i1 + k] = j1 + k |
|
|
elif tag in ("replace", "delete"): |
|
|
for k in range(i2 - i1): |
|
|
src2tgt[i1 + k] = j1 |
|
|
last = 0 |
|
|
for i in range(len(src2tgt)): |
|
|
if src2tgt[i] == -1: |
|
|
src2tgt[i] = last |
|
|
else: |
|
|
last = src2tgt[i] |
|
|
return src2tgt |
|
|
|
|
|
def map_span_src_to_tgt(src2tgt, start, end, tgt_len): |
|
|
if start >= len(src2tgt): start = max(0, len(src2tgt)-1) |
|
|
if end == 0: end = 1 |
|
|
if end-1 >= len(src2tgt): end = len(src2tgt) |
|
|
ts = src2tgt[start]; te = src2tgt[end-1] + 1 |
|
|
ts = max(0, min(ts, max(0, tgt_len-1))) |
|
|
te = max(ts+1, min(te, tgt_len)) |
|
|
return ts, te |
|
|
|
|
|
def token_indices_overlapping_span(offsets, span_start, span_end): |
|
|
idxs = [] |
|
|
for i, (s, e) in enumerate(offsets): |
|
|
if e > span_start and s < span_end: |
|
|
idxs.append(i) |
|
|
return idxs |
|
|
|
|
|
def electra_hidden_states(prep_text): |
|
|
encoded = tokenizer_electra(prep_text, return_tensors="pt", return_offsets_mapping=True, padding=False, truncation=True).to(DEVICE) |
|
|
offsets = encoded.pop("offset_mapping")[0].tolist() |
|
|
with torch.no_grad(): |
|
|
out = model_electra(**encoded) |
|
|
H = out.last_hidden_state.squeeze(0) |
|
|
return offsets, H |
|
|
|
|
|
def electra_phrase_vec_via_offsets(span_start, span_end, src2tgt, prep_text, offsets, H): |
|
|
ts, te = map_span_src_to_tgt(src2tgt, span_start, span_end, len(prep_text)) |
|
|
tok_ids = token_indices_overlapping_span(offsets, ts, te) |
|
|
if not tok_ids: |
|
|
return None |
|
|
vecs = [H[i] for i in tok_ids] |
|
|
return torch.stack(vecs, dim=0).mean(dim=0) |
|
|
|
|
|
|
|
|
def build_noun_phrases(doc, text_norm): |
|
|
noun_phrases = [] |
|
|
for si, sent in enumerate(doc.sentences): |
|
|
words_info = [] |
|
|
for ti, tok in enumerate(sent.tokens): |
|
|
for w in tok.words: |
|
|
words_info.append({ |
|
|
"id": w.id, "text": w.text, "upos": w.upos, "deprel": w.deprel, |
|
|
"head": w.head, "start": tok.start_char, "end": tok.end_char, "tok_idx": ti |
|
|
}) |
|
|
for wi in words_info: |
|
|
if wi["upos"] not in {"NOUN","PROPN"}: |
|
|
continue |
|
|
head = wi |
|
|
left_mods, right_mods = [], [] |
|
|
for cj in words_info: |
|
|
if cj["head"] == head["id"] and cj["deprel"] in {"amod","compound","nmod"}: |
|
|
(left_mods if cj["start"] <= head["start"] else right_mods).append(cj) |
|
|
left_mods = sorted(left_mods, key=lambda x: x["start"]) |
|
|
right_mods = sorted(right_mods, key=lambda x: x["start"]) |
|
|
phrase_tokens = left_mods + [head] + right_mods |
|
|
if len(phrase_tokens) < 2 and head["upos"] != "PROPN": |
|
|
continue |
|
|
span_start = min(t["start"] for t in phrase_tokens); span_end = max(t["end"] for t in phrase_tokens) |
|
|
phrase_text = re.sub(r"\s+", " ", text_norm[span_start:span_end].strip()) |
|
|
if len(phrase_text) >= 2: |
|
|
noun_phrases.append({"text": phrase_text, "start": span_start, "end": span_end}) |
|
|
|
|
|
uniq = {} |
|
|
for np_item in noun_phrases: |
|
|
key = np_item["text"] |
|
|
if key not in uniq or (np_item["end"]-np_item["start"]) > (uniq[key]["end"]-uniq[key]["start"]): |
|
|
uniq[key] = np_item |
|
|
return list(uniq.values()) |
|
|
|
|
|
|
|
|
def mmr_select(doc_emb, cand_embs, candidates, k=10, lam=0.7): |
|
|
if not candidates: return [] |
|
|
chosen, rest = [], list(range(len(candidates))) |
|
|
sim_doc = util.cos_sim(doc_emb, cand_embs)[0].cpu().numpy() |
|
|
first = int(np.argmax(sim_doc)); chosen.append(first); rest.remove(first) |
|
|
sim_between = util.cos_sim(cand_embs, cand_embs).cpu().numpy() |
|
|
while len(chosen) < min(k, len(candidates)) and rest: |
|
|
best_i, best_score = None, -1e9 |
|
|
for i in rest: |
|
|
redundancy = max(sim_between[i, j] for j in chosen) if chosen else 0.0 |
|
|
score = 0.7*sim_doc[i] - 0.3*redundancy |
|
|
if score > best_score: best_score, best_i = score, i |
|
|
chosen.append(best_i); rest.remove(best_i) |
|
|
return [candidates[i] for i in chosen] |
|
|
|
|
|
def rank_keyphrases(text_norm, nps, alpha=0.8): |
|
|
phrases = [p["text"] for p in nps] |
|
|
if not phrases: return [], [] |
|
|
text_prep = arabert_prep.preprocess(text_norm) |
|
|
src2tgt = build_char_map(text_norm, text_prep) |
|
|
|
|
|
doc_emb = sbert.encode([text_prep], convert_to_tensor=True) |
|
|
phr_embs = sbert.encode(phrases, convert_to_tensor=True) |
|
|
sims_sbert = util.cos_sim(doc_emb, phr_embs).cpu().numpy()[0] |
|
|
|
|
|
offsets, H = electra_hidden_states(text_prep) |
|
|
doc_vec_electra = H.mean(dim=0) |
|
|
sims_electra = [] |
|
|
for p in nps: |
|
|
v = electra_phrase_vec_via_offsets(p["start"], p["end"], src2tgt, text_prep, offsets, H) |
|
|
if v is None: sims_electra.append(0.0) |
|
|
else: |
|
|
num = torch.dot(doc_vec_electra, v).item() |
|
|
den = float(doc_vec_electra.norm().item() * v.norm().item() + 1e-9) |
|
|
sims_electra.append(num/den) |
|
|
sims_electra = np.array(sims_electra) |
|
|
blended = alpha*sims_sbert + (1-alpha)*sims_electra |
|
|
order = np.argsort(-blended) |
|
|
ranked = [(phrases[i], float(blended[i]), float(sims_sbert[i]), float(sims_electra[i])) for i in order] |
|
|
diverse = mmr_select(doc_emb, phr_embs, phrases, k=min(12, len(phrases)), lam=0.7) |
|
|
return ranked, diverse |
|
|
|
|
|
|
|
|
def yake_scores_for_phrases(text_norm, phrases, max_ngram_size=5, lan="ar"): |
|
|
kw_extractor = yake.KeywordExtractor(lan=lan, n=max_ngram_size, dedupLim=0.9, top=1000) |
|
|
scored = kw_extractor.extract_keywords(text_norm) |
|
|
norm = lambda s: re.sub(r"\s+"," ", s).strip().lower() |
|
|
scored_norm = {norm(k): v for k, v in scored} |
|
|
res = {} |
|
|
for p in phrases: |
|
|
res[p] = scored_norm.get(norm(p)) |
|
|
return res |
|
|
|
|
|
def invert_and_minmax_yake(score_map): |
|
|
vals = [None if v is None else 1/(1+v) for v in score_map.values()] |
|
|
finite = [x for x in vals if x is not None] |
|
|
if not finite: return {k:0.0 for k in score_map.keys()} |
|
|
vmin, vmax = min(finite), max(finite); rng = (vmax-vmin) if vmax>vmin else 1.0 |
|
|
out = {} |
|
|
for (k,_), pos in zip(score_map.items(), vals): |
|
|
out[k] = 0.0 if pos is None else (pos - vmin)/rng |
|
|
return out |
|
|
|
|
|
def blend_semantic_with_yake(ranked_sem, yake_norm, w_sem=0.7, w_yake=0.3): |
|
|
merged = [] |
|
|
for phr, sem_sc, sb, el in ranked_sem: |
|
|
y = yake_norm.get(phr, 0.0) |
|
|
final = w_sem*sem_sc + w_yake*y |
|
|
merged.append((phr, final, sem_sc, y, sb, el)) |
|
|
merged.sort(key=lambda x: -x[1]) |
|
|
return merged |
|
|
|
|
|
|
|
|
def split_by_dots(text: str): |
|
|
parts = re.split(r"\.{1,}\s*", text) |
|
|
return [p.strip() for p in parts if p.strip()] |
|
|
|
|
|
def sentence_kind_from_root(stanza_sentence): |
|
|
root = next((w for w in stanza_sentence.words if w.deprel == "root"), None) |
|
|
if not root: return "unknown" |
|
|
return "verbal" if root.upos == "VERB" else "nominal" |
|
|
|
|
|
def split_and_tag_nominal_verbal_by_dots(text_norm): |
|
|
sents = split_by_dots(text_norm) |
|
|
tagged = [] |
|
|
for s in sents: |
|
|
doc_s = nlp(s) |
|
|
if not doc_s.sentences: |
|
|
tagged.append({"text": s, "kind": "unknown"}) |
|
|
else: |
|
|
tagged.append({"text": s, "kind": sentence_kind_from_root(doc_s.sentences[0])}) |
|
|
return tagged |
|
|
|
|
|
def best_support_sentence_by_dots(text_norm, phrase): |
|
|
sentences_tagged = split_and_tag_nominal_verbal_by_dots(text_norm) |
|
|
if not sentences_tagged: return "" |
|
|
sent_texts = [m["text"] for m in sentences_tagged] |
|
|
sent_embs = sbert.encode(sent_texts, convert_to_tensor=True) |
|
|
p_emb = sbert.encode([phrase], convert_to_tensor=True) |
|
|
sims = util.cos_sim(p_emb, sent_embs)[0].cpu().numpy() |
|
|
best_idx = int(np.argmax(sims)) |
|
|
return sent_texts[best_idx], sentences_tagged[best_idx]["kind"] |
|
|
|
|
|
|
|
|
def gen_unified_question_freeform(phrases, supports, context_text, max_len=96, num_beams=5): |
|
|
context_short = context_text.strip()[:600] |
|
|
items_block = "\n".join([f"- العبارة: {p}\n جملة داعمة: {s}" for p, s in zip(phrases, supports)]) |
|
|
prompt = ( |
|
|
"حوّل العبارات التالية إلى سؤال واحد شامل بالعربية يعتمد على السياق. " |
|
|
"يجب أن يغطي جميع العبارات بشكل موجز وواضح.\n" |
|
|
f"{items_block}\n" |
|
|
f"سياق: {context_short}\n" |
|
|
"السؤال الموحد:" |
|
|
) |
|
|
inputs = qg_tokenizer(prompt, return_tensors="pt", truncation=True).to(DEVICE) |
|
|
outputs = qg_model.generate( |
|
|
**inputs, max_length=max_len, num_beams=num_beams, |
|
|
early_stopping=True, no_repeat_ngram_size=3 |
|
|
) |
|
|
q = qg_tokenizer.decode(outputs[0], skip_special_tokens=True).strip() |
|
|
q = q.rstrip("?.؟") |
|
|
if q and not q.endswith("؟"): q += "؟" |
|
|
return q |
|
|
|
|
|
|
|
|
def run_pipeline(user_text): |
|
|
if not user_text or len(user_text.strip()) < 5: |
|
|
return "رجاءً أدخل نصًا عربيًا أطول.", "", "", "", "" |
|
|
|
|
|
text_norm = normalize(user_text) |
|
|
doc = nlp(text_norm) |
|
|
|
|
|
|
|
|
nps = build_noun_phrases(doc, text_norm) |
|
|
if not nps: |
|
|
return "لم تُستخرج عبارات اسمية.", "", "", "", "" |
|
|
|
|
|
|
|
|
ranked_sem, diverse = rank_keyphrases(text_norm, nps, alpha=0.8) |
|
|
|
|
|
|
|
|
phrases = [r[0] for r in ranked_sem] |
|
|
yake_raw = yake_scores_for_phrases(text_norm, phrases, max_ngram_size=5, lan="ar") |
|
|
yake_norm = invert_and_minmax_yake(yake_raw) |
|
|
ranked_blended = blend_semantic_with_yake(ranked_sem, yake_norm, w_sem=0.7, w_yake=0.3) |
|
|
|
|
|
|
|
|
top_n = min(5, len(ranked_blended)) |
|
|
top_phrases = [ranked_blended[i][0] for i in range(top_n)] |
|
|
supports = [] |
|
|
kinds = [] |
|
|
for p in top_phrases: |
|
|
s, kind = best_support_sentence_by_dots(text_norm, p) |
|
|
supports.append(s); kinds.append(kind) |
|
|
|
|
|
|
|
|
unified_q = gen_unified_question_freeform(top_phrases, supports, text_norm) |
|
|
|
|
|
|
|
|
nps_str = "\n".join(f"- {p['text']}" for p in nps[:20]) |
|
|
ranked_str = "\n".join(f"{i+1:>2}. {t[0]} (score={t[1]:.3f})" for i, t in enumerate(ranked_blended[:15])) |
|
|
support_str = "\n".join(f"{i+1:>2}. [{kinds[i]}] {top_phrases[i]} → {supports[i]}" for i in range(top_n)) |
|
|
diverse_str = "\n".join(f"- {d}" for d in diverse[:10]) |
|
|
|
|
|
return unified_q, ranked_str, support_str, diverse_str, nps_str |
|
|
|
|
|
title = "Arabic Main Question Generation (Hybrid Pipeline)" |
|
|
desc = "أدخل نصًا عربيًا؛ سنستخرج العبارات الاسمية، نرتّبها (sBERT + ELECTRA + YAKE + MMR)، نختار جملًا داعمة، ونولّد سؤالًا موحّدًا بـ AraT5." |
|
|
|
|
|
with gr.Blocks(title=title) as demo: |
|
|
gr.Markdown(f"# {title}\n{desc}") |
|
|
|
|
|
with gr.Row(): |
|
|
inp = gr.Textbox(lines=12, label="النص العربي") |
|
|
btn = gr.Button("تشغيل الـPipeline") |
|
|
|
|
|
out_unified = gr.Textbox(label="السؤال الموحد (AraT5)") |
|
|
out_ranked = gr.Textbox(label="Top Noun Phrases (Blended Ranking)") |
|
|
out_support = gr.Textbox(label="أفضل الجمل الداعمة لأول 5 عبارات") |
|
|
out_diverse = gr.Textbox(label="MMR Diverse Selection") |
|
|
out_nps = gr.Textbox(label="العبارات الاسمية المستخرجة (أول 20)") |
|
|
|
|
|
btn.click(run_pipeline, inputs=inp, outputs=[out_unified, out_ranked, out_support, out_diverse, out_nps]) |
|
|
|
|
|
demo.launch() |
|
|
|