Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| import os | |
| import re | |
| import json | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModel, | |
| AutoModelForTokenClassification | |
| ) | |
| from huggingface_hub import snapshot_download | |
| # ------------------------- | |
| # تنظیمات کلی | |
| # ------------------------- | |
| device = torch.device("cpu") | |
| FRAME_DET_REPO = "PooryaPiroozfar/frame-detection-parsbert" | |
| FE_REPO = "PooryaPiroozfar/srl-frame-elements-parsbert" | |
| FRAME_DET_DIR = "models/frame_detection" | |
| FE_BASE_DIR = "models/frame_elements" | |
| TRIPLES_PATH = "frame_triples.xlsx" | |
| THRESHOLD = 0.25 | |
| frame_names = [ | |
| "Activity_finish","Activity_start","Aging","Attaching","Attempt", | |
| "Becoming","Being_born","Borrowing","Causation","Chatting", | |
| "Choosing","Closure","Clothing","Cutting","Damaging","Desiring","Discussion", | |
| "Emphasizing","Food","Installing","Locating","Memory","Morality_evaluation", | |
| "Motion","Offering","Practice","Project","Publishing","Religious_belief", | |
| "Removing","Request","Residence","Sharing","Taking","Telling","Travel", | |
| "Using","Visiting","Waiting","Work" | |
| ] | |
| # ------------------------- | |
| # دانلود مدلها | |
| # ------------------------- | |
| if not os.path.exists(FRAME_DET_DIR): | |
| snapshot_download(repo_id=FRAME_DET_REPO, local_dir=FRAME_DET_DIR) | |
| if not os.path.exists(FE_BASE_DIR): | |
| snapshot_download(repo_id=FE_REPO, local_dir=FE_BASE_DIR) | |
| # ------------------------- | |
| # Encoder | |
| # ------------------------- | |
| encoder_name = "HooshvareLab/bert-base-parsbert-uncased" | |
| sent_tokenizer = AutoTokenizer.from_pretrained(encoder_name) | |
| sent_encoder = AutoModel.from_pretrained(encoder_name).to(device) | |
| sent_encoder.eval() | |
| def get_embedding(text): | |
| inputs = sent_tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| padding=True, | |
| max_length=128 | |
| ).to(device) | |
| with torch.no_grad(): | |
| outputs = sent_encoder(**inputs) | |
| token_embeddings = outputs.last_hidden_state | |
| mask = inputs["attention_mask"].unsqueeze(-1).float() | |
| summed = torch.sum(token_embeddings * mask, dim=1) | |
| lengths = torch.clamp(mask.sum(dim=1), min=1e-9) | |
| return (summed / lengths).squeeze(0) | |
| # ------------------------- | |
| # Frame Detection Model | |
| # ------------------------- | |
| class FrameSimilarityModel(nn.Module): | |
| def __init__(self, emb_dim, frame_emb_init): | |
| super().__init__() | |
| self.proj = nn.Linear(emb_dim, emb_dim) | |
| self.frame_embeddings = nn.Parameter( | |
| torch.tensor(frame_emb_init, dtype=torch.float32) | |
| ) | |
| def forward(self, sent_emb): | |
| sent_proj = F.normalize(self.proj(sent_emb), dim=-1) | |
| frames = F.normalize(self.frame_embeddings, dim=-1) | |
| return torch.matmul(sent_proj, frames.T) | |
| frame_embs = np.load(os.path.join(FRAME_DET_DIR, "trained_frame_embeddings.npy")) | |
| frame_model = FrameSimilarityModel( | |
| emb_dim=768, | |
| frame_emb_init=frame_embs | |
| ).to(device) | |
| frame_model.load_state_dict( | |
| torch.load( | |
| os.path.join(FRAME_DET_DIR, "best_frame_margin_model.pt"), | |
| map_location="cpu" | |
| ) | |
| ) | |
| frame_model.eval() | |
| def predict_frame(sentence): | |
| emb = get_embedding(sentence).unsqueeze(0) | |
| with torch.no_grad(): | |
| sims = frame_model(emb) | |
| max_sim, idx = torch.max(sims, dim=1) | |
| if max_sim.item() < THRESHOLD: | |
| return None, max_sim.item() | |
| return frame_names[idx.item()], max_sim.item() | |
| # ------------------------- | |
| # Frame Elements | |
| # ------------------------- | |
| def predict_frame_elements(sentence, frame_name): | |
| frame_dir = os.path.join(FE_BASE_DIR, frame_name) | |
| if not os.path.exists(frame_dir): | |
| return [] | |
| with open(os.path.join(frame_dir, "label2id.json"), encoding="utf-8") as f: | |
| label2id = json.load(f) | |
| id2label = {int(v): k for k, v in label2id.items()} | |
| tokenizer = AutoTokenizer.from_pretrained(frame_dir) | |
| model = AutoModelForTokenClassification.from_pretrained( | |
| frame_dir, | |
| num_labels=len(label2id), | |
| id2label=id2label, | |
| label2id=label2id | |
| ).to(device) | |
| model.eval() | |
| inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=128) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| preds = torch.argmax(outputs.logits, dim=-1).squeeze(0).numpy() | |
| tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze(0)) | |
| elements = [] | |
| for tok, lab_id in zip(tokens, preds): | |
| if tok in {"[CLS]", "[SEP]", "[PAD]"}: | |
| continue | |
| label = id2label[lab_id] | |
| if label != "O": | |
| elements.append((tok, label)) | |
| return elements | |
| # ------------------------- | |
| # Triple Extraction | |
| # ------------------------- | |
| triples_df = pd.read_excel(TRIPLES_PATH) | |
| def group_elements(elements): | |
| d = {} | |
| for tok, lab in elements: | |
| d.setdefault(lab, []).append(tok) | |
| return d | |
| def extract_relations(frame_name, elements): | |
| fe_dict = group_elements(elements) | |
| rows = triples_df[triples_df["Frame"] == frame_name] | |
| relations = [] | |
| for _, r in rows.iterrows(): | |
| if r["Subject"] in fe_dict and r["Object"] in fe_dict: | |
| for s in fe_dict[r["Subject"]]: | |
| for o in fe_dict[r["Object"]]: | |
| relations.append({ | |
| "subject": s, | |
| "relation": r["Relation"], | |
| "object": o, | |
| "subject_fe": r["Subject"], | |
| "object_fe": r["Object"] | |
| }) | |
| return relations | |
| # ------------------------- | |
| # Sentence Utilities | |
| # ------------------------- | |
| def split_sentences(text): | |
| sents = re.split(r'[.!؟\n]+', text) | |
| return [s.strip() for s in sents if s.strip()] | |
| CONDITIONAL_PATTERNS = [ | |
| r'^اگر\s', r'\sاگر\s', r'^در صورت\s', | |
| r'^چنانچه\s', r'^هرگاه\s' | |
| ] | |
| def detect_conditional(sentence): | |
| for p in CONDITIONAL_PATTERNS: | |
| if re.search(p, sentence): | |
| return True | |
| return False | |
| def split_condition(sentence): | |
| for sep in ['،', ',']: | |
| if sep in sentence: | |
| return sentence.split(sep, 1) | |
| return None, None | |
| def build_spin_rule(if_triples, then_triples, rule_id): | |
| return { | |
| "rule_id": f"Rule_{rule_id}", | |
| "type": "SPIN", | |
| "if": if_triples, | |
| "then": then_triples | |
| } | |
| # ------------------------- | |
| # Pipeline | |
| # ------------------------- | |
| def analyze_sentence(sentence): | |
| frame, sim = predict_frame(sentence) | |
| if frame is None: | |
| return { | |
| "sentence": sentence, | |
| "frame": "خارج از دامنه", | |
| "similarity": round(sim, 3), | |
| "elements": [], | |
| "relations": [], | |
| "is_conditional": False, | |
| "rule": None | |
| } | |
| elements = predict_frame_elements(sentence, frame) | |
| relations = extract_relations(frame, elements) | |
| is_cond = detect_conditional(sentence) | |
| rule = None | |
| if is_cond: | |
| cond_part, res_part = split_condition(sentence) | |
| if cond_part and res_part: | |
| cond_res = analyze_sentence(cond_part) | |
| res_res = analyze_sentence(res_part) | |
| rule = build_spin_rule( | |
| cond_res["relations"], | |
| res_res["relations"], | |
| rule_id=abs(hash(sentence)) % 10000 | |
| ) | |
| return { | |
| "sentence": sentence, | |
| "frame": frame, | |
| "similarity": round(sim, 3), | |
| "elements": elements, | |
| "relations": relations, | |
| "is_conditional": is_cond, | |
| "rule": rule | |
| } | |
| def analyze_text(text): | |
| sentences = split_sentences(text) | |
| return { | |
| "input_text": text, | |
| "sentences_analysis": [ | |
| analyze_sentence(s) for s in sentences | |
| ] | |
| } | |
| # ------------------------- | |
| # Gradio UI | |
| # ------------------------- | |
| demo = gr.Interface( | |
| fn=analyze_text, | |
| inputs=gr.Textbox( | |
| label="متن فارسی", | |
| placeholder="مثال: اگر علی از تهران به مشهد سفر کند، در هتل اقامت میکند." | |
| ), | |
| outputs=gr.JSON(label="خروجی"), | |
| title="Persian Semantic Frame, Triple & Rule Extractor", | |
| description="تشخیص فریم، عناصر فریم، استخراج triple و قوانین شرطی (SPIN)" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |