Spaces:
Sleeping
Sleeping
File size: 5,848 Bytes
80cb919 a7fd3ba 80cb919 19d62ff a7fd3ba 80cb919 5dcfc82 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# augmentation utility agent
import re
import difflib
import random
from typing import Dict, Tuple
import ftfy
import langid
P_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
P_PHONE = re.compile(r"(?:(?:\+?\d{1,3})?[\s-]?)?(?:\(?\d{2,4}\)?[\s-]?)?\d{3,4}[\s-]?\d{3,4}")
P_URL = re.compile(r"https?://\S+|www\.\S+")
P_IP = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
def fix_unicode(s: str) -> str:
return ftfy.fix_text(s or "")
def normalize_whitespace(s: str) -> str:
s = s.replace("\u00A0", " ")
s = re.sub(r"[ \t]+", " ", s)
s = re.sub(r"\s+\n", "\n", s)
s = re.sub(r"\n{3,}", "\n\n", s)
return s.strip()
def canonicalize_quotes(s: str) -> str:
return s.replace("“", '"').replace("”", '"').replace("’", "'").replace("‘", "'")
def ensure_terminal_punct(s: str) -> str:
if not s: return s
if s[-1] in ".!?": return s
return s + "."
def deidentify(s: str) -> str:
s = P_EMAIL.sub("[REDACTED_EMAIL]", s)
s = P_PHONE.sub("[REDACTED_PHONE]", s)
s = P_URL.sub("[REDACTED_URL]", s)
s = P_IP.sub("[REDACTED_IP]", s)
return s
def lang_is_english(s: str) -> bool:
try:
lang, _ = langid.classify((s or "")[:2000])
return lang == "en"
except Exception:
return True
def length_cap(s: str, max_chars: int) -> str:
if len(s) <= max_chars:
return s
# try to cut at sentence boundary
cut = s[:max_chars]
last_dot = cut.rfind(". ")
if last_dot > 300: # don't cut too aggressively
return cut[:last_dot+1] + " …"
return cut + " …"
def fingerprint(instr: str, user: str, out: str) -> str:
# Simple, fast fingerprint for dedupe
def norm(x: str) -> str:
x = x.lower()
x = re.sub(r"[^a-z0-9]+", " ", x)
x = re.sub(r"\s+", " ", x).strip()
return x
core = "||".join([norm(instr), norm(user), norm(out)])
# lightweight hash
import hashlib
return hashlib.md5(core.encode("utf-8")).hexdigest()
def style_standardize_answer(ans: str) -> str:
if not ans: return ans
ans = ans.strip()
# Gentle guardrails, neutral voice
prefix = ""
# Avoid absolute guarantees
ans = re.sub(r"\b(guarantee|100%|certainly|always|never)\b", "likely", ans, flags=re.I)
# Remove sign-offs typical of forums
ans = re.sub(r"\n*(thanks|thank you|regards|cheers)[^\n]*$", "", ans, flags=re.I)
return ensure_terminal_punct(ans)
def base_cleanup(s: str, max_chars: int, do_deid: bool) -> str:
s = fix_unicode(s)
s = canonicalize_quotes(s)
s = normalize_whitespace(s)
if do_deid:
s = deidentify(s)
s = length_cap(s, max_chars)
return s
def maybe_paraphrase(text: str, ratio: float, paraphraser, difficulty: str) -> Tuple[str, bool]:
if ratio <= 0 or not text: return text, False
if random.random() < ratio:
return paraphraser.paraphrase(text, difficulty=difficulty), True
return text, False
def maybe_backtranslate(text: str, ratio: float, paraphraser) -> Tuple[str, bool]:
if ratio <= 0 or not text: return text, False
if random.random() < ratio:
bt = paraphraser.backtranslate(text, via_lang="vi")
if not bt:
return text, False
# Guardrails: reject if too short/long or too dissimilar/similar
try:
orig_len = max(1, len(text))
len_delta = abs(len(bt) - len(text)) / orig_len
sim = difflib.SequenceMatcher(None, text, bt).ratio()
# Accept if moderate change and not excessive drift
if len_delta > 0.5:
return text, False
if sim < 0.45 or sim > 0.98:
return text, False
except Exception:
pass
return bt, True
return text, False
def consistency_ok(user: str, out: str, ratio: float, paraphraser) -> bool:
if ratio <= 0 or (not user) or (not out):
return True
if random.random() >= ratio:
return True
return paraphraser.consistency_check(user, out)
def is_invalid_response(text: str) -> bool:
"""Check if model response is invalid (Fail, Invalid, etc.)"""
if not text or not isinstance(text, str):
return True
text_lower = text.lower().strip()
invalid_patterns = [
"fail", "invalid", "i couldn't", "i can't", "i cannot", "unable to",
"sorry", "error", "not available", "no answer", "insufficient",
"don't know", "do not know", "not sure", "cannot determine",
"unable to provide", "not possible", "not applicable", "n/a"
]
# Check if response is too short or matches invalid patterns
if len(text_lower) < 3:
return True
for pattern in invalid_patterns:
if pattern in text_lower:
return True
return False
def clean_invalid_response(text: str, fallback: str = "") -> str:
"""Clean invalid responses by returning fallback or empty string"""
if is_invalid_response(text):
return fallback
return text
def retry_invalid_response(text: str, paraphraser, max_retries: int = 3) -> str:
"""Retry generating valid response for invalid text, max 3 retries"""
if not is_invalid_response(text):
return text
for attempt in range(max_retries):
try:
# Try paraphrasing with different difficulty levels
difficulty = "easy" if attempt == 0 else "hard" if attempt == 1 else "easy"
retry_text = paraphraser.paraphrase(text, difficulty=difficulty)
if retry_text and not is_invalid_response(retry_text):
return retry_text
except Exception as e:
logger.warning(f"Retry attempt {attempt + 1} failed: {e}")
continue
# If all retries failed, return empty string to indicate drop
return ""
|