import os import re import json import time import requests import pandas as pd import gradio as gr from typing import Dict, Tuple, List, Any DEEPINFRA_URL = "https://api.deepinfra.com/v1/openai/chat/completions" DEFAULT_MODEL = "Qwen/Qwen3-14B" SENT_SPLIT_RE = re.compile(r'(?<=[\.\!\؟\?])\s+|\n+') VALID_PH_RE = re.compile(r"^(company|person|amount|percent)-\d{2,}$") def split_into_sentences(text: str) -> List[str]: text = (text or "").strip() if not text: return [] return [s.strip() for s in SENT_SPLIT_RE.split(text) if s.strip()] def chunk_sentences(sentences: List[str], chunk_size: int = 20) -> List[str]: return [" ".join(sentences[i:i+chunk_size]) for i in range(0, len(sentences), chunk_size)] def load_system_prompt() -> str: for path in ["پرامپت.txt", "/mnt/data/پرامپت.txt", "prompt.txt", "/mnt/data/prompt.txt"]: if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: content = f.read() m = re.search(r'return\s+"""(.*)"""', content, flags=re.DOTALL) if m: return m.group(1).strip() return content.strip() return "شما یک ناشناس‌ساز دقیق متون فارسی هستید. فقط person/company/amount/percent را ناشناس کن." def deepinfra_post(payload: dict, api_key: str, retries: int = 6) -> dict: headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} last = None for i in range(retries): try: r = requests.post(DEEPINFRA_URL, headers=headers, json=payload, timeout=90) except Exception as e: last = e time.sleep(min(2**i, 30)) continue if r.status_code == 200: return r.json() if r.status_code in (429, 503): ra = r.headers.get("Retry-After") if ra: try: time.sleep(float(ra)) except: time.sleep(min(2**i, 30)) else: time.sleep(min(2**i, 30)) last = r.text[:400] continue raise RuntimeError(f"DeepInfra API Error {r.status_code}: {r.text[:800]}") raise RuntimeError(f"Rate limit retries exhausted. Last={last}") def normalize(s: str) -> str: if s is None: return "" s = str(s) s = s.replace("ي", "ی").replace("ك", "ک").replace("ة", "ه") fa_to_en = str.maketrans("۰۱۲۳۴۵۶۷۸۹", "0123456789") s = s.translate(fa_to_en) s = s.replace("\u200c", " ") s = s.replace("\u200f", " ") s = s.replace("\u202a", " ").replace("\u202b", " ").replace("\u202c", " ") s = re.sub(r"\s+", " ", s).strip() s = re.sub(r"(\d+)\s*%", r"\1 درصد", s) s = s.replace("ریالی", "ریال") return s.strip(" .،؛:!؟\t\r\n") def fix_percent_mapping(mapping: Dict[str, str]) -> Dict[str, str]: out = dict(mapping) for k, v in list(out.items()): if str(k).startswith("percent-"): vv = str(v).strip() if not re.search(r"(درصد|%|درصدی)", vv): out[k] = f"{vv} درصد" return out def sanitize_mapping(mapping: Any) -> Tuple[Dict[str, str], List[Tuple[str, str]]]: clean = {} dropped = [] if not isinstance(mapping, dict): return {}, [("<>", str(mapping))] for k, v in mapping.items(): kk = str(k).strip() vv = str(v).strip() if VALID_PH_RE.match(kk): clean[kk] = vv else: dropped.append((kk, vv)) return clean, dropped def extract_json_object(text: str) -> str: """ Extract first {...} JSON object from text. Handles cases where model adds extra text. """ if not text: return "" t = text.strip() t = t.replace("```json", "").replace("```JSON", "").replace("```", "").strip() # find first '{' and last '}' (simple but effective) start = t.find("{") end = t.rfind("}") if start == -1 or end == -1 or end <= start: return "" return t[start:end+1] def collapse_duplicates(mapping: Dict[str, str]) -> Tuple[Dict[str, str], Dict[str, str], List[str]]: groups: Dict[str, List[str]] = {} for ph, val in mapping.items(): groups.setdefault(normalize(val), []).append(ph) remap: Dict[str, str] = {} logs: List[str] = [] new_map = dict(mapping) def sort_key(ph: str): t = ph.split("-")[0] try: n = int(ph.split("-")[1]) except: n = 10**9 return (t, n) for norm_val, ph_list in groups.items(): if len(ph_list) <= 1: continue ph_list = sorted(ph_list, key=sort_key) canonical = ph_list[0] for dup in ph_list[1:]: remap[dup] = canonical new_map.pop(dup, None) logs.append(f"INTRA-DEDUP: {dup} -> {canonical} | value='{norm_val}'") return new_map, remap, logs def remap_text(text: str, remap: Dict[str, str]) -> str: for old, new in sorted(remap.items(), key=lambda x: -len(x[0])): text = text.replace(old, new) return text def jaccard(a: str, b: str) -> float: sa = set(normalize(a).split()) sb = set(normalize(b).split()) if not sa or not sb: return 0.0 return len(sa & sb) / len(sa | sb) def max_counter(mapping: Dict[str, str], typ: str) -> int: m = 0 for ph in mapping.keys(): if ph.startswith(typ + "-"): try: m = max(m, int(ph.split("-")[1])) except: pass return m def merge_tables(t1: Dict[str, str], t2: Dict[str, str], threshold=0.75) -> Tuple[Dict[str, str], Dict[str, str], List[str]]: merged = dict(t1) exact = {normalize(v): k for k, v in merged.items()} counters = { "company": max_counter(merged, "company"), "person": max_counter(merged, "person"), "amount": max_counter(merged, "amount"), "percent": max_counter(merged, "percent"), } remap2: Dict[str, str] = {} logs: List[str] = [] def existing_companies(): return [(ph, val) for ph, val in merged.items() if ph.startswith("company-")] for ph2, val2 in t2.items(): typ = ph2.split("-")[0] if typ not in counters: continue nv = normalize(val2) if nv in exact: remap2[ph2] = exact[nv] continue if typ == "company": best_ph = None best_score = 0.0 best_val = "" for ph_exist, val_exist in existing_companies(): score = jaccard(val2, val_exist) if score > best_score: best_score = score best_ph = ph_exist best_val = val_exist if best_ph and best_score >= threshold: remap2[ph2] = best_ph exact[nv] = best_ph logs.append(f"FUZZY MATCH: '{val2}' ~ '{best_val}' score={best_score:.2f} => {best_ph}") continue elif best_ph: logs.append(f"FUZZY NO-MATCH: '{val2}' best='{best_val}' score={best_score:.2f} < {threshold:.2f}") counters[typ] += 1 new_ph = f"{typ}-{counters[typ]:02d}" merged[new_ph] = val2 exact[nv] = new_ph remap2[ph2] = new_ph return merged, remap2, logs def anonymize_and_map(chunk_text: str, api_key: str, system_prompt: str, model: str) -> Tuple[str, Dict[str, str], List[str]]: """ Returns: anonymized_text, mapping_clean, logs logs include: dropped + parse errors + intra-dedup actions """ logs: List[str] = [] if not chunk_text.strip(): return "", {}, logs # Step1: anonymize (text only) resp1 = deepinfra_post( { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"متن زیر را ناشناس کن و فقط متن ناشناس‌شده را برگردان:\n\n{chunk_text}"} ], "max_tokens": 3000, "temperature": 0.1, }, api_key, ) anon = resp1["choices"][0]["message"]["content"].strip() # Step2: mapping JSON only (must include original + anonymized) prompt2 = f""" متن اصلی: {chunk_text} متن ناشناس شده: {anon} فقط و فقط یک JSON object برگردان که کلیدهای آن دقیقاً یکی از این الگوها باشد: company-01, company-02, ... person-01, person-02, ... amount-01, amount-02, ... percent-01, percent-02, ... هیچ کلید دیگری مجاز نیست. هیچ توضیح اضافی و هیچ کدبلاک ننویس. """.strip() resp2 = deepinfra_post( { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt2} ], "max_tokens": 2000, "temperature": 0.1, }, api_key, ) raw = resp2["choices"][0]["message"]["content"].strip() json_str = extract_json_object(raw) if not json_str: logs.append("ERROR: Mapping output did not contain a JSON object.") logs.append("RAW_MAPPING_OUTPUT:") logs.append(raw[:2000]) return anon, {}, logs try: mapping_obj = json.loads(json_str) except Exception as e: logs.append(f"ERROR: JSON parsing failed: {e}") logs.append("RAW_MAPPING_JSON_EXTRACT:") logs.append(json_str[:2000]) logs.append("RAW_MAPPING_OUTPUT:") logs.append(raw[:2000]) return anon, {}, logs mapping_obj = {str(k).strip(): str(v).strip() for k, v in mapping_obj.items()} mapping_obj = fix_percent_mapping(mapping_obj) mapping_clean, dropped = sanitize_mapping(mapping_obj) for k, v in dropped[:100]: logs.append(f"DROPPED KEY: {k} = {v}") # Intra-dedup mapping_clean, intra_remap, intra_logs = collapse_duplicates(mapping_clean) anon = remap_text(anon, intra_remap) logs.extend(intra_logs) return anon, mapping_clean, logs def run_pipeline(text: str, threshold: float, model: str): api_key = os.getenv("DEEPINFRA_API_KEY", "").strip() if not api_key: return "DEEPINFRA_API_KEY is not set.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "" system_prompt = load_system_prompt() sentences = split_into_sentences(text) if len(sentences) < 1: return "متن خالی است.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "" chunks = chunk_sentences(sentences, 20) if len(chunks) < 2: return "متن کمتر از 20 جمله است؛ برای ساخت دو chunk، متن طولانی‌تر بده.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "" chunk1 = chunks[0] chunk2 = chunks[1] anon1, map1, log1 = anonymize_and_map(chunk1, api_key, system_prompt, model) anon2, map2, log2 = anonymize_and_map(chunk2, api_key, system_prompt, model) merged, remap2, fuzzy_log = merge_tables(map1, map2, threshold) anon2_fixed = remap_text(anon2, remap2) final_text = (anon1 + "\n\n" + anon2_fixed).strip() df1 = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(map1.items())]) df2 = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(map2.items())]) dfm = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(merged.items())]) combined_logs: List[str] = [] if log1: combined_logs.append("LOGS CHUNK 1:") combined_logs.extend([" " + x for x in log1]) if log2: combined_logs.append("\nLOGS CHUNK 2:") combined_logs.extend([" " + x for x in log2]) if fuzzy_log: combined_logs.append("\nFUZZY LOG (companies):") combined_logs.extend([" " + x for x in fuzzy_log]) if remap2: combined_logs.append("\nREMAP (table2 -> global):") combined_logs.extend([" " + f"{k} -> {v}" for k, v in sorted(remap2.items())]) log_text = "\n".join(combined_logs).strip() return final_text, df1, df2, dfm, log_text with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("DeepInfra (Qwen3-14B) Chunking (20 sentences) + 2 Mapping Tables + Merge + Remap") input_text = gr.Textbox(lines=12, label="متن ورودی") with gr.Row(): thr = gr.Slider(0.60, 0.95, value=0.75, step=0.05, label="Company Fuzzy Threshold (Jaccard)") model = gr.Textbox(value=DEFAULT_MODEL, label="Model") btn = gr.Button("Run") out_text = gr.Textbox(lines=12, label="متن ناشناس‌شده نهایی") with gr.Row(): df1 = gr.Dataframe(label="Mapping Table #1", interactive=False) df2 = gr.Dataframe(label="Mapping Table #2", interactive=False) dfm = gr.Dataframe(label="Merged Global Mapping", interactive=False) logs = gr.Textbox(lines=14, label="Logs (Dropped + Intra-Dedup + Fuzzy + Remap + Raw mapping errors)") btn.click(run_pipeline, inputs=[input_text, thr, model], outputs=[out_text, df1, df2, dfm, logs]) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)