Spaces:
Build error
Build error
| import os | |
| import re | |
| import json | |
| import time | |
| import requests | |
| import pandas as pd | |
| import gradio as gr | |
| from typing import Dict, Tuple, List, Any | |
| DEEPINFRA_URL = "https://api.deepinfra.com/v1/openai/chat/completions" | |
| DEFAULT_MODEL = "Qwen/Qwen3-14B" | |
| SENT_SPLIT_RE = re.compile(r'(?<=[\.\!\؟\?])\s+|\n+') | |
| VALID_PH_RE = re.compile(r"^(company|person|amount|percent)-\d{2,}$") | |
| def split_into_sentences(text: str) -> List[str]: | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| return [s.strip() for s in SENT_SPLIT_RE.split(text) if s.strip()] | |
| def chunk_sentences(sentences: List[str], chunk_size: int = 20) -> List[str]: | |
| return [" ".join(sentences[i:i+chunk_size]) for i in range(0, len(sentences), chunk_size)] | |
| def load_system_prompt() -> str: | |
| for path in ["پرامپت.txt", "/mnt/data/پرامپت.txt", "prompt.txt", "/mnt/data/prompt.txt"]: | |
| if os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| m = re.search(r'return\s+"""(.*)"""', content, flags=re.DOTALL) | |
| if m: | |
| return m.group(1).strip() | |
| return content.strip() | |
| return "شما یک ناشناسساز دقیق متون فارسی هستید. فقط person/company/amount/percent را ناشناس کن." | |
| def deepinfra_post(payload: dict, api_key: str, retries: int = 6) -> dict: | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| last = None | |
| for i in range(retries): | |
| try: | |
| r = requests.post(DEEPINFRA_URL, headers=headers, json=payload, timeout=90) | |
| except Exception as e: | |
| last = e | |
| time.sleep(min(2**i, 30)) | |
| continue | |
| if r.status_code == 200: | |
| return r.json() | |
| if r.status_code in (429, 503): | |
| ra = r.headers.get("Retry-After") | |
| if ra: | |
| try: | |
| time.sleep(float(ra)) | |
| except: | |
| time.sleep(min(2**i, 30)) | |
| else: | |
| time.sleep(min(2**i, 30)) | |
| last = r.text[:400] | |
| continue | |
| raise RuntimeError(f"DeepInfra API Error {r.status_code}: {r.text[:800]}") | |
| raise RuntimeError(f"Rate limit retries exhausted. Last={last}") | |
| def normalize(s: str) -> str: | |
| if s is None: | |
| return "" | |
| s = str(s) | |
| s = s.replace("ي", "ی").replace("ك", "ک").replace("ة", "ه") | |
| fa_to_en = str.maketrans("۰۱۲۳۴۵۶۷۸۹", "0123456789") | |
| s = s.translate(fa_to_en) | |
| s = s.replace("\u200c", " ") | |
| s = s.replace("\u200f", " ") | |
| s = s.replace("\u202a", " ").replace("\u202b", " ").replace("\u202c", " ") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| s = re.sub(r"(\d+)\s*%", r"\1 درصد", s) | |
| s = s.replace("ریالی", "ریال") | |
| return s.strip(" .،؛:!؟\t\r\n") | |
| def fix_percent_mapping(mapping: Dict[str, str]) -> Dict[str, str]: | |
| out = dict(mapping) | |
| for k, v in list(out.items()): | |
| if str(k).startswith("percent-"): | |
| vv = str(v).strip() | |
| if not re.search(r"(درصد|%|درصدی)", vv): | |
| out[k] = f"{vv} درصد" | |
| return out | |
| def sanitize_mapping(mapping: Any) -> Tuple[Dict[str, str], List[Tuple[str, str]]]: | |
| clean = {} | |
| dropped = [] | |
| if not isinstance(mapping, dict): | |
| return {}, [("<<non-dict>>", str(mapping))] | |
| for k, v in mapping.items(): | |
| kk = str(k).strip() | |
| vv = str(v).strip() | |
| if VALID_PH_RE.match(kk): | |
| clean[kk] = vv | |
| else: | |
| dropped.append((kk, vv)) | |
| return clean, dropped | |
| def extract_json_object(text: str) -> str: | |
| """ | |
| Extract first {...} JSON object from text. | |
| Handles cases where model adds extra text. | |
| """ | |
| if not text: | |
| return "" | |
| t = text.strip() | |
| t = t.replace("```json", "").replace("```JSON", "").replace("```", "").strip() | |
| # find first '{' and last '}' (simple but effective) | |
| start = t.find("{") | |
| end = t.rfind("}") | |
| if start == -1 or end == -1 or end <= start: | |
| return "" | |
| return t[start:end+1] | |
| def collapse_duplicates(mapping: Dict[str, str]) -> Tuple[Dict[str, str], Dict[str, str], List[str]]: | |
| groups: Dict[str, List[str]] = {} | |
| for ph, val in mapping.items(): | |
| groups.setdefault(normalize(val), []).append(ph) | |
| remap: Dict[str, str] = {} | |
| logs: List[str] = [] | |
| new_map = dict(mapping) | |
| def sort_key(ph: str): | |
| t = ph.split("-")[0] | |
| try: | |
| n = int(ph.split("-")[1]) | |
| except: | |
| n = 10**9 | |
| return (t, n) | |
| for norm_val, ph_list in groups.items(): | |
| if len(ph_list) <= 1: | |
| continue | |
| ph_list = sorted(ph_list, key=sort_key) | |
| canonical = ph_list[0] | |
| for dup in ph_list[1:]: | |
| remap[dup] = canonical | |
| new_map.pop(dup, None) | |
| logs.append(f"INTRA-DEDUP: {dup} -> {canonical} | value='{norm_val}'") | |
| return new_map, remap, logs | |
| def remap_text(text: str, remap: Dict[str, str]) -> str: | |
| for old, new in sorted(remap.items(), key=lambda x: -len(x[0])): | |
| text = text.replace(old, new) | |
| return text | |
| def jaccard(a: str, b: str) -> float: | |
| sa = set(normalize(a).split()) | |
| sb = set(normalize(b).split()) | |
| if not sa or not sb: | |
| return 0.0 | |
| return len(sa & sb) / len(sa | sb) | |
| def max_counter(mapping: Dict[str, str], typ: str) -> int: | |
| m = 0 | |
| for ph in mapping.keys(): | |
| if ph.startswith(typ + "-"): | |
| try: | |
| m = max(m, int(ph.split("-")[1])) | |
| except: | |
| pass | |
| return m | |
| def merge_tables(t1: Dict[str, str], t2: Dict[str, str], threshold=0.75) -> Tuple[Dict[str, str], Dict[str, str], List[str]]: | |
| merged = dict(t1) | |
| exact = {normalize(v): k for k, v in merged.items()} | |
| counters = { | |
| "company": max_counter(merged, "company"), | |
| "person": max_counter(merged, "person"), | |
| "amount": max_counter(merged, "amount"), | |
| "percent": max_counter(merged, "percent"), | |
| } | |
| remap2: Dict[str, str] = {} | |
| logs: List[str] = [] | |
| def existing_companies(): | |
| return [(ph, val) for ph, val in merged.items() if ph.startswith("company-")] | |
| for ph2, val2 in t2.items(): | |
| typ = ph2.split("-")[0] | |
| if typ not in counters: | |
| continue | |
| nv = normalize(val2) | |
| if nv in exact: | |
| remap2[ph2] = exact[nv] | |
| continue | |
| if typ == "company": | |
| best_ph = None | |
| best_score = 0.0 | |
| best_val = "" | |
| for ph_exist, val_exist in existing_companies(): | |
| score = jaccard(val2, val_exist) | |
| if score > best_score: | |
| best_score = score | |
| best_ph = ph_exist | |
| best_val = val_exist | |
| if best_ph and best_score >= threshold: | |
| remap2[ph2] = best_ph | |
| exact[nv] = best_ph | |
| logs.append(f"FUZZY MATCH: '{val2}' ~ '{best_val}' score={best_score:.2f} => {best_ph}") | |
| continue | |
| elif best_ph: | |
| logs.append(f"FUZZY NO-MATCH: '{val2}' best='{best_val}' score={best_score:.2f} < {threshold:.2f}") | |
| counters[typ] += 1 | |
| new_ph = f"{typ}-{counters[typ]:02d}" | |
| merged[new_ph] = val2 | |
| exact[nv] = new_ph | |
| remap2[ph2] = new_ph | |
| return merged, remap2, logs | |
| def anonymize_and_map(chunk_text: str, api_key: str, system_prompt: str, model: str) -> Tuple[str, Dict[str, str], List[str]]: | |
| """ | |
| Returns: anonymized_text, mapping_clean, logs | |
| logs include: dropped + parse errors + intra-dedup actions | |
| """ | |
| logs: List[str] = [] | |
| if not chunk_text.strip(): | |
| return "", {}, logs | |
| # Step1: anonymize (text only) | |
| resp1 = deepinfra_post( | |
| { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"متن زیر را ناشناس کن و فقط متن ناشناسشده را برگردان:\n\n{chunk_text}"} | |
| ], | |
| "max_tokens": 3000, | |
| "temperature": 0.1, | |
| }, | |
| api_key, | |
| ) | |
| anon = resp1["choices"][0]["message"]["content"].strip() | |
| # Step2: mapping JSON only (must include original + anonymized) | |
| prompt2 = f""" | |
| متن اصلی: | |
| {chunk_text} | |
| متن ناشناس شده: | |
| {anon} | |
| فقط و فقط یک JSON object برگردان که کلیدهای آن دقیقاً یکی از این الگوها باشد: | |
| company-01, company-02, ... | |
| person-01, person-02, ... | |
| amount-01, amount-02, ... | |
| percent-01, percent-02, ... | |
| هیچ کلید دیگری مجاز نیست. | |
| هیچ توضیح اضافی و هیچ کدبلاک ننویس. | |
| """.strip() | |
| resp2 = deepinfra_post( | |
| { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt2} | |
| ], | |
| "max_tokens": 2000, | |
| "temperature": 0.1, | |
| }, | |
| api_key, | |
| ) | |
| raw = resp2["choices"][0]["message"]["content"].strip() | |
| json_str = extract_json_object(raw) | |
| if not json_str: | |
| logs.append("ERROR: Mapping output did not contain a JSON object.") | |
| logs.append("RAW_MAPPING_OUTPUT:") | |
| logs.append(raw[:2000]) | |
| return anon, {}, logs | |
| try: | |
| mapping_obj = json.loads(json_str) | |
| except Exception as e: | |
| logs.append(f"ERROR: JSON parsing failed: {e}") | |
| logs.append("RAW_MAPPING_JSON_EXTRACT:") | |
| logs.append(json_str[:2000]) | |
| logs.append("RAW_MAPPING_OUTPUT:") | |
| logs.append(raw[:2000]) | |
| return anon, {}, logs | |
| mapping_obj = {str(k).strip(): str(v).strip() for k, v in mapping_obj.items()} | |
| mapping_obj = fix_percent_mapping(mapping_obj) | |
| mapping_clean, dropped = sanitize_mapping(mapping_obj) | |
| for k, v in dropped[:100]: | |
| logs.append(f"DROPPED KEY: {k} = {v}") | |
| # Intra-dedup | |
| mapping_clean, intra_remap, intra_logs = collapse_duplicates(mapping_clean) | |
| anon = remap_text(anon, intra_remap) | |
| logs.extend(intra_logs) | |
| return anon, mapping_clean, logs | |
| def run_pipeline(text: str, threshold: float, model: str): | |
| api_key = os.getenv("DEEPINFRA_API_KEY", "").strip() | |
| if not api_key: | |
| return "DEEPINFRA_API_KEY is not set.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "" | |
| system_prompt = load_system_prompt() | |
| sentences = split_into_sentences(text) | |
| if len(sentences) < 1: | |
| return "متن خالی است.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "" | |
| chunks = chunk_sentences(sentences, 20) | |
| if len(chunks) < 2: | |
| return "متن کمتر از 20 جمله است؛ برای ساخت دو chunk، متن طولانیتر بده.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "" | |
| chunk1 = chunks[0] | |
| chunk2 = chunks[1] | |
| anon1, map1, log1 = anonymize_and_map(chunk1, api_key, system_prompt, model) | |
| anon2, map2, log2 = anonymize_and_map(chunk2, api_key, system_prompt, model) | |
| merged, remap2, fuzzy_log = merge_tables(map1, map2, threshold) | |
| anon2_fixed = remap_text(anon2, remap2) | |
| final_text = (anon1 + "\n\n" + anon2_fixed).strip() | |
| df1 = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(map1.items())]) | |
| df2 = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(map2.items())]) | |
| dfm = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(merged.items())]) | |
| combined_logs: List[str] = [] | |
| if log1: | |
| combined_logs.append("LOGS CHUNK 1:") | |
| combined_logs.extend([" " + x for x in log1]) | |
| if log2: | |
| combined_logs.append("\nLOGS CHUNK 2:") | |
| combined_logs.extend([" " + x for x in log2]) | |
| if fuzzy_log: | |
| combined_logs.append("\nFUZZY LOG (companies):") | |
| combined_logs.extend([" " + x for x in fuzzy_log]) | |
| if remap2: | |
| combined_logs.append("\nREMAP (table2 -> global):") | |
| combined_logs.extend([" " + f"{k} -> {v}" for k, v in sorted(remap2.items())]) | |
| log_text = "\n".join(combined_logs).strip() | |
| return final_text, df1, df2, dfm, log_text | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("DeepInfra (Qwen3-14B) Chunking (20 sentences) + 2 Mapping Tables + Merge + Remap") | |
| input_text = gr.Textbox(lines=12, label="متن ورودی") | |
| with gr.Row(): | |
| thr = gr.Slider(0.60, 0.95, value=0.75, step=0.05, label="Company Fuzzy Threshold (Jaccard)") | |
| model = gr.Textbox(value=DEFAULT_MODEL, label="Model") | |
| btn = gr.Button("Run") | |
| out_text = gr.Textbox(lines=12, label="متن ناشناسشده نهایی") | |
| with gr.Row(): | |
| df1 = gr.Dataframe(label="Mapping Table #1", interactive=False) | |
| df2 = gr.Dataframe(label="Mapping Table #2", interactive=False) | |
| dfm = gr.Dataframe(label="Merged Global Mapping", interactive=False) | |
| logs = gr.Textbox(lines=14, label="Logs (Dropped + Intra-Dedup + Fuzzy + Remap + Raw mapping errors)") | |
| btn.click(run_pipeline, inputs=[input_text, thr, model], outputs=[out_text, df1, df2, dfm, logs]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |