Spaces:
Sleeping
Sleeping
| # augmentation utility agent | |
| import re | |
| import difflib | |
| import random | |
| from typing import Dict, Tuple | |
| import ftfy | |
| import langid | |
| P_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") | |
| P_PHONE = re.compile(r"(?:(?:\+?\d{1,3})?[\s-]?)?(?:\(?\d{2,4}\)?[\s-]?)?\d{3,4}[\s-]?\d{3,4}") | |
| P_URL = re.compile(r"https?://\S+|www\.\S+") | |
| P_IP = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") | |
| def fix_unicode(s: str) -> str: | |
| return ftfy.fix_text(s or "") | |
| def normalize_whitespace(s: str) -> str: | |
| s = s.replace("\u00A0", " ") | |
| s = re.sub(r"[ \t]+", " ", s) | |
| s = re.sub(r"\s+\n", "\n", s) | |
| s = re.sub(r"\n{3,}", "\n\n", s) | |
| return s.strip() | |
| def canonicalize_quotes(s: str) -> str: | |
| return s.replace("“", '"').replace("”", '"').replace("’", "'").replace("‘", "'") | |
| def ensure_terminal_punct(s: str) -> str: | |
| if not s: return s | |
| if s[-1] in ".!?": return s | |
| return s + "." | |
| def deidentify(s: str) -> str: | |
| s = P_EMAIL.sub("[REDACTED_EMAIL]", s) | |
| s = P_PHONE.sub("[REDACTED_PHONE]", s) | |
| s = P_URL.sub("[REDACTED_URL]", s) | |
| s = P_IP.sub("[REDACTED_IP]", s) | |
| return s | |
| def lang_is_english(s: str) -> bool: | |
| try: | |
| lang, _ = langid.classify((s or "")[:2000]) | |
| return lang == "en" | |
| except Exception: | |
| return True | |
| def length_cap(s: str, max_chars: int) -> str: | |
| if len(s) <= max_chars: | |
| return s | |
| # try to cut at sentence boundary | |
| cut = s[:max_chars] | |
| last_dot = cut.rfind(". ") | |
| if last_dot > 300: # don't cut too aggressively | |
| return cut[:last_dot+1] + " …" | |
| return cut + " …" | |
| def fingerprint(instr: str, user: str, out: str) -> str: | |
| # Simple, fast fingerprint for dedupe | |
| def norm(x: str) -> str: | |
| x = x.lower() | |
| x = re.sub(r"[^a-z0-9]+", " ", x) | |
| x = re.sub(r"\s+", " ", x).strip() | |
| return x | |
| core = "||".join([norm(instr), norm(user), norm(out)]) | |
| # lightweight hash | |
| import hashlib | |
| return hashlib.md5(core.encode("utf-8")).hexdigest() | |
| def style_standardize_answer(ans: str) -> str: | |
| if not ans: return ans | |
| ans = ans.strip() | |
| # Gentle guardrails, neutral voice | |
| prefix = "" | |
| # Avoid absolute guarantees | |
| ans = re.sub(r"\b(guarantee|100%|certainly|always|never)\b", "likely", ans, flags=re.I) | |
| # Remove sign-offs typical of forums | |
| ans = re.sub(r"\n*(thanks|thank you|regards|cheers)[^\n]*$", "", ans, flags=re.I) | |
| return ensure_terminal_punct(ans) | |
| def base_cleanup(s: str, max_chars: int, do_deid: bool) -> str: | |
| s = fix_unicode(s) | |
| s = canonicalize_quotes(s) | |
| s = normalize_whitespace(s) | |
| if do_deid: | |
| s = deidentify(s) | |
| s = length_cap(s, max_chars) | |
| return s | |
| def maybe_paraphrase(text: str, ratio: float, paraphraser, difficulty: str) -> Tuple[str, bool]: | |
| if ratio <= 0 or not text: return text, False | |
| if random.random() < ratio: | |
| return paraphraser.paraphrase(text, difficulty=difficulty), True | |
| return text, False | |
| def maybe_backtranslate(text: str, ratio: float, paraphraser) -> Tuple[str, bool]: | |
| if ratio <= 0 or not text: return text, False | |
| if random.random() < ratio: | |
| bt = paraphraser.backtranslate(text, via_lang="vi") | |
| if not bt: | |
| return text, False | |
| # Guardrails: reject if too short/long or too dissimilar/similar | |
| try: | |
| orig_len = max(1, len(text)) | |
| len_delta = abs(len(bt) - len(text)) / orig_len | |
| sim = difflib.SequenceMatcher(None, text, bt).ratio() | |
| # Accept if moderate change and not excessive drift | |
| if len_delta > 0.5: | |
| return text, False | |
| if sim < 0.45 or sim > 0.98: | |
| return text, False | |
| except Exception: | |
| pass | |
| return bt, True | |
| return text, False | |
| def consistency_ok(user: str, out: str, ratio: float, paraphraser) -> bool: | |
| if ratio <= 0 or (not user) or (not out): | |
| return True | |
| if random.random() >= ratio: | |
| return True | |
| return paraphraser.consistency_check(user, out) | |
| def is_invalid_response(text: str) -> bool: | |
| """Check if model response is invalid (Fail, Invalid, etc.)""" | |
| if not text or not isinstance(text, str): | |
| return True | |
| text_lower = text.lower().strip() | |
| invalid_patterns = [ | |
| "fail", "invalid", "i couldn't", "i can't", "i cannot", "unable to", | |
| "sorry", "error", "not available", "no answer", "insufficient", | |
| "don't know", "do not know", "not sure", "cannot determine", | |
| "unable to provide", "not possible", "not applicable", "n/a" | |
| ] | |
| # Check if response is too short or matches invalid patterns | |
| if len(text_lower) < 3: | |
| return True | |
| for pattern in invalid_patterns: | |
| if pattern in text_lower: | |
| return True | |
| return False | |
| def clean_invalid_response(text: str, fallback: str = "") -> str: | |
| """Clean invalid responses by returning fallback or empty string""" | |
| if is_invalid_response(text): | |
| return fallback | |
| return text | |
| def retry_invalid_response(text: str, paraphraser, max_retries: int = 3) -> str: | |
| """Retry generating valid response for invalid text, max 3 retries""" | |
| if not is_invalid_response(text): | |
| return text | |
| for attempt in range(max_retries): | |
| try: | |
| # Try paraphrasing with different difficulty levels | |
| difficulty = "easy" if attempt == 0 else "hard" if attempt == 1 else "easy" | |
| retry_text = paraphraser.paraphrase(text, difficulty=difficulty) | |
| if retry_text and not is_invalid_response(retry_text): | |
| return retry_text | |
| except Exception as e: | |
| logger.warning(f"Retry attempt {attempt + 1} failed: {e}") | |
| continue | |
| # If all retries failed, return empty string to indicate drop | |
| return "" | |