chunking_test / app.py
leilaghomashchi's picture
Rename app-chunk (1).py to app.py
f66415c verified
import os
import re
import json
import time
import requests
import pandas as pd
import gradio as gr
from typing import Dict, Tuple, List, Any
DEEPINFRA_URL = "https://api.deepinfra.com/v1/openai/chat/completions"
DEFAULT_MODEL = "Qwen/Qwen3-14B"
SENT_SPLIT_RE = re.compile(r'(?<=[\.\!\؟\?])\s+|\n+')
VALID_PH_RE = re.compile(r"^(company|person|amount|percent)-\d{2,}$")
def split_into_sentences(text: str) -> List[str]:
text = (text or "").strip()
if not text:
return []
return [s.strip() for s in SENT_SPLIT_RE.split(text) if s.strip()]
def chunk_sentences(sentences: List[str], chunk_size: int = 20) -> List[str]:
return [" ".join(sentences[i:i+chunk_size]) for i in range(0, len(sentences), chunk_size)]
def load_system_prompt() -> str:
for path in ["پرامپت.txt", "/mnt/data/پرامپت.txt", "prompt.txt", "/mnt/data/prompt.txt"]:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
content = f.read()
m = re.search(r'return\s+"""(.*)"""', content, flags=re.DOTALL)
if m:
return m.group(1).strip()
return content.strip()
return "شما یک ناشناس‌ساز دقیق متون فارسی هستید. فقط person/company/amount/percent را ناشناس کن."
def deepinfra_post(payload: dict, api_key: str, retries: int = 6) -> dict:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
last = None
for i in range(retries):
try:
r = requests.post(DEEPINFRA_URL, headers=headers, json=payload, timeout=90)
except Exception as e:
last = e
time.sleep(min(2**i, 30))
continue
if r.status_code == 200:
return r.json()
if r.status_code in (429, 503):
ra = r.headers.get("Retry-After")
if ra:
try:
time.sleep(float(ra))
except:
time.sleep(min(2**i, 30))
else:
time.sleep(min(2**i, 30))
last = r.text[:400]
continue
raise RuntimeError(f"DeepInfra API Error {r.status_code}: {r.text[:800]}")
raise RuntimeError(f"Rate limit retries exhausted. Last={last}")
def normalize(s: str) -> str:
if s is None:
return ""
s = str(s)
s = s.replace("ي", "ی").replace("ك", "ک").replace("ة", "ه")
fa_to_en = str.maketrans("۰۱۲۳۴۵۶۷۸۹", "0123456789")
s = s.translate(fa_to_en)
s = s.replace("\u200c", " ")
s = s.replace("\u200f", " ")
s = s.replace("\u202a", " ").replace("\u202b", " ").replace("\u202c", " ")
s = re.sub(r"\s+", " ", s).strip()
s = re.sub(r"(\d+)\s*%", r"\1 درصد", s)
s = s.replace("ریالی", "ریال")
return s.strip(" .،؛:!؟\t\r\n")
def fix_percent_mapping(mapping: Dict[str, str]) -> Dict[str, str]:
out = dict(mapping)
for k, v in list(out.items()):
if str(k).startswith("percent-"):
vv = str(v).strip()
if not re.search(r"(درصد|%|درصدی)", vv):
out[k] = f"{vv} درصد"
return out
def sanitize_mapping(mapping: Any) -> Tuple[Dict[str, str], List[Tuple[str, str]]]:
clean = {}
dropped = []
if not isinstance(mapping, dict):
return {}, [("<<non-dict>>", str(mapping))]
for k, v in mapping.items():
kk = str(k).strip()
vv = str(v).strip()
if VALID_PH_RE.match(kk):
clean[kk] = vv
else:
dropped.append((kk, vv))
return clean, dropped
def extract_json_object(text: str) -> str:
"""
Extract first {...} JSON object from text.
Handles cases where model adds extra text.
"""
if not text:
return ""
t = text.strip()
t = t.replace("```json", "").replace("```JSON", "").replace("```", "").strip()
# find first '{' and last '}' (simple but effective)
start = t.find("{")
end = t.rfind("}")
if start == -1 or end == -1 or end <= start:
return ""
return t[start:end+1]
def collapse_duplicates(mapping: Dict[str, str]) -> Tuple[Dict[str, str], Dict[str, str], List[str]]:
groups: Dict[str, List[str]] = {}
for ph, val in mapping.items():
groups.setdefault(normalize(val), []).append(ph)
remap: Dict[str, str] = {}
logs: List[str] = []
new_map = dict(mapping)
def sort_key(ph: str):
t = ph.split("-")[0]
try:
n = int(ph.split("-")[1])
except:
n = 10**9
return (t, n)
for norm_val, ph_list in groups.items():
if len(ph_list) <= 1:
continue
ph_list = sorted(ph_list, key=sort_key)
canonical = ph_list[0]
for dup in ph_list[1:]:
remap[dup] = canonical
new_map.pop(dup, None)
logs.append(f"INTRA-DEDUP: {dup} -> {canonical} | value='{norm_val}'")
return new_map, remap, logs
def remap_text(text: str, remap: Dict[str, str]) -> str:
for old, new in sorted(remap.items(), key=lambda x: -len(x[0])):
text = text.replace(old, new)
return text
def jaccard(a: str, b: str) -> float:
sa = set(normalize(a).split())
sb = set(normalize(b).split())
if not sa or not sb:
return 0.0
return len(sa & sb) / len(sa | sb)
def max_counter(mapping: Dict[str, str], typ: str) -> int:
m = 0
for ph in mapping.keys():
if ph.startswith(typ + "-"):
try:
m = max(m, int(ph.split("-")[1]))
except:
pass
return m
def merge_tables(t1: Dict[str, str], t2: Dict[str, str], threshold=0.75) -> Tuple[Dict[str, str], Dict[str, str], List[str]]:
merged = dict(t1)
exact = {normalize(v): k for k, v in merged.items()}
counters = {
"company": max_counter(merged, "company"),
"person": max_counter(merged, "person"),
"amount": max_counter(merged, "amount"),
"percent": max_counter(merged, "percent"),
}
remap2: Dict[str, str] = {}
logs: List[str] = []
def existing_companies():
return [(ph, val) for ph, val in merged.items() if ph.startswith("company-")]
for ph2, val2 in t2.items():
typ = ph2.split("-")[0]
if typ not in counters:
continue
nv = normalize(val2)
if nv in exact:
remap2[ph2] = exact[nv]
continue
if typ == "company":
best_ph = None
best_score = 0.0
best_val = ""
for ph_exist, val_exist in existing_companies():
score = jaccard(val2, val_exist)
if score > best_score:
best_score = score
best_ph = ph_exist
best_val = val_exist
if best_ph and best_score >= threshold:
remap2[ph2] = best_ph
exact[nv] = best_ph
logs.append(f"FUZZY MATCH: '{val2}' ~ '{best_val}' score={best_score:.2f} => {best_ph}")
continue
elif best_ph:
logs.append(f"FUZZY NO-MATCH: '{val2}' best='{best_val}' score={best_score:.2f} < {threshold:.2f}")
counters[typ] += 1
new_ph = f"{typ}-{counters[typ]:02d}"
merged[new_ph] = val2
exact[nv] = new_ph
remap2[ph2] = new_ph
return merged, remap2, logs
def anonymize_and_map(chunk_text: str, api_key: str, system_prompt: str, model: str) -> Tuple[str, Dict[str, str], List[str]]:
"""
Returns: anonymized_text, mapping_clean, logs
logs include: dropped + parse errors + intra-dedup actions
"""
logs: List[str] = []
if not chunk_text.strip():
return "", {}, logs
# Step1: anonymize (text only)
resp1 = deepinfra_post(
{
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"متن زیر را ناشناس کن و فقط متن ناشناس‌شده را برگردان:\n\n{chunk_text}"}
],
"max_tokens": 3000,
"temperature": 0.1,
},
api_key,
)
anon = resp1["choices"][0]["message"]["content"].strip()
# Step2: mapping JSON only (must include original + anonymized)
prompt2 = f"""
متن اصلی:
{chunk_text}
متن ناشناس شده:
{anon}
فقط و فقط یک JSON object برگردان که کلیدهای آن دقیقاً یکی از این الگوها باشد:
company-01, company-02, ...
person-01, person-02, ...
amount-01, amount-02, ...
percent-01, percent-02, ...
هیچ کلید دیگری مجاز نیست.
هیچ توضیح اضافی و هیچ کدبلاک ننویس.
""".strip()
resp2 = deepinfra_post(
{
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt2}
],
"max_tokens": 2000,
"temperature": 0.1,
},
api_key,
)
raw = resp2["choices"][0]["message"]["content"].strip()
json_str = extract_json_object(raw)
if not json_str:
logs.append("ERROR: Mapping output did not contain a JSON object.")
logs.append("RAW_MAPPING_OUTPUT:")
logs.append(raw[:2000])
return anon, {}, logs
try:
mapping_obj = json.loads(json_str)
except Exception as e:
logs.append(f"ERROR: JSON parsing failed: {e}")
logs.append("RAW_MAPPING_JSON_EXTRACT:")
logs.append(json_str[:2000])
logs.append("RAW_MAPPING_OUTPUT:")
logs.append(raw[:2000])
return anon, {}, logs
mapping_obj = {str(k).strip(): str(v).strip() for k, v in mapping_obj.items()}
mapping_obj = fix_percent_mapping(mapping_obj)
mapping_clean, dropped = sanitize_mapping(mapping_obj)
for k, v in dropped[:100]:
logs.append(f"DROPPED KEY: {k} = {v}")
# Intra-dedup
mapping_clean, intra_remap, intra_logs = collapse_duplicates(mapping_clean)
anon = remap_text(anon, intra_remap)
logs.extend(intra_logs)
return anon, mapping_clean, logs
def run_pipeline(text: str, threshold: float, model: str):
api_key = os.getenv("DEEPINFRA_API_KEY", "").strip()
if not api_key:
return "DEEPINFRA_API_KEY is not set.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), ""
system_prompt = load_system_prompt()
sentences = split_into_sentences(text)
if len(sentences) < 1:
return "متن خالی است.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), ""
chunks = chunk_sentences(sentences, 20)
if len(chunks) < 2:
return "متن کمتر از 20 جمله است؛ برای ساخت دو chunk، متن طولانی‌تر بده.", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), ""
chunk1 = chunks[0]
chunk2 = chunks[1]
anon1, map1, log1 = anonymize_and_map(chunk1, api_key, system_prompt, model)
anon2, map2, log2 = anonymize_and_map(chunk2, api_key, system_prompt, model)
merged, remap2, fuzzy_log = merge_tables(map1, map2, threshold)
anon2_fixed = remap_text(anon2, remap2)
final_text = (anon1 + "\n\n" + anon2_fixed).strip()
df1 = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(map1.items())])
df2 = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(map2.items())])
dfm = pd.DataFrame([{"Placeholder": k, "Original": v} for k, v in sorted(merged.items())])
combined_logs: List[str] = []
if log1:
combined_logs.append("LOGS CHUNK 1:")
combined_logs.extend([" " + x for x in log1])
if log2:
combined_logs.append("\nLOGS CHUNK 2:")
combined_logs.extend([" " + x for x in log2])
if fuzzy_log:
combined_logs.append("\nFUZZY LOG (companies):")
combined_logs.extend([" " + x for x in fuzzy_log])
if remap2:
combined_logs.append("\nREMAP (table2 -> global):")
combined_logs.extend([" " + f"{k} -> {v}" for k, v in sorted(remap2.items())])
log_text = "\n".join(combined_logs).strip()
return final_text, df1, df2, dfm, log_text
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("DeepInfra (Qwen3-14B) Chunking (20 sentences) + 2 Mapping Tables + Merge + Remap")
input_text = gr.Textbox(lines=12, label="متن ورودی")
with gr.Row():
thr = gr.Slider(0.60, 0.95, value=0.75, step=0.05, label="Company Fuzzy Threshold (Jaccard)")
model = gr.Textbox(value=DEFAULT_MODEL, label="Model")
btn = gr.Button("Run")
out_text = gr.Textbox(lines=12, label="متن ناشناس‌شده نهایی")
with gr.Row():
df1 = gr.Dataframe(label="Mapping Table #1", interactive=False)
df2 = gr.Dataframe(label="Mapping Table #2", interactive=False)
dfm = gr.Dataframe(label="Merged Global Mapping", interactive=False)
logs = gr.Textbox(lines=14, label="Logs (Dropped + Intra-Dedup + Fuzzy + Remap + Raw mapping errors)")
btn.click(run_pipeline, inputs=[input_text, thr, model], outputs=[out_text, df1, df2, dfm, logs])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)