File size: 5,848 Bytes
80cb919
 
a7fd3ba
80cb919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19d62ff
a7fd3ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80cb919
 
 
 
 
 
 
 
5dcfc82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# augmentation utility agent
import re
import difflib
import random
from typing import Dict, Tuple
import ftfy
import langid

P_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
P_PHONE = re.compile(r"(?:(?:\+?\d{1,3})?[\s-]?)?(?:\(?\d{2,4}\)?[\s-]?)?\d{3,4}[\s-]?\d{3,4}")
P_URL   = re.compile(r"https?://\S+|www\.\S+")
P_IP    = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")

def fix_unicode(s: str) -> str:
    return ftfy.fix_text(s or "")

def normalize_whitespace(s: str) -> str:
    s = s.replace("\u00A0", " ")
    s = re.sub(r"[ \t]+", " ", s)
    s = re.sub(r"\s+\n", "\n", s)
    s = re.sub(r"\n{3,}", "\n\n", s)
    return s.strip()

def canonicalize_quotes(s: str) -> str:
    return s.replace("“", '"').replace("”", '"').replace("’", "'").replace("‘", "'")

def ensure_terminal_punct(s: str) -> str:
    if not s: return s
    if s[-1] in ".!?": return s
    return s + "."

def deidentify(s: str) -> str:
    s = P_EMAIL.sub("[REDACTED_EMAIL]", s)
    s = P_PHONE.sub("[REDACTED_PHONE]", s)
    s = P_URL.sub("[REDACTED_URL]", s)
    s = P_IP.sub("[REDACTED_IP]", s)
    return s

def lang_is_english(s: str) -> bool:
    try:
        lang, _ = langid.classify((s or "")[:2000])
        return lang == "en"
    except Exception:
        return True

def length_cap(s: str, max_chars: int) -> str:
    if len(s) <= max_chars:
        return s
    # try to cut at sentence boundary
    cut = s[:max_chars]
    last_dot = cut.rfind(". ")
    if last_dot > 300:  # don't cut too aggressively
        return cut[:last_dot+1] + " …"
    return cut + " …"

def fingerprint(instr: str, user: str, out: str) -> str:
    # Simple, fast fingerprint for dedupe
    def norm(x: str) -> str:
        x = x.lower()
        x = re.sub(r"[^a-z0-9]+", " ", x)
        x = re.sub(r"\s+", " ", x).strip()
        return x
    core = "||".join([norm(instr), norm(user), norm(out)])
    # lightweight hash
    import hashlib
    return hashlib.md5(core.encode("utf-8")).hexdigest()

def style_standardize_answer(ans: str) -> str:
    if not ans: return ans
    ans = ans.strip()
    # Gentle guardrails, neutral voice
    prefix = ""
    # Avoid absolute guarantees
    ans = re.sub(r"\b(guarantee|100%|certainly|always|never)\b", "likely", ans, flags=re.I)
    # Remove sign-offs typical of forums
    ans = re.sub(r"\n*(thanks|thank you|regards|cheers)[^\n]*$", "", ans, flags=re.I)
    return ensure_terminal_punct(ans)

def base_cleanup(s: str, max_chars: int, do_deid: bool) -> str:
    s = fix_unicode(s)
    s = canonicalize_quotes(s)
    s = normalize_whitespace(s)
    if do_deid:
        s = deidentify(s)
    s = length_cap(s, max_chars)
    return s

def maybe_paraphrase(text: str, ratio: float, paraphraser, difficulty: str) -> Tuple[str, bool]:
    if ratio <= 0 or not text: return text, False
    if random.random() < ratio:
        return paraphraser.paraphrase(text, difficulty=difficulty), True
    return text, False

def maybe_backtranslate(text: str, ratio: float, paraphraser) -> Tuple[str, bool]:
    if ratio <= 0 or not text: return text, False
    if random.random() < ratio:
        bt = paraphraser.backtranslate(text, via_lang="vi")
        if not bt:
            return text, False
        # Guardrails: reject if too short/long or too dissimilar/similar
        try:
            orig_len = max(1, len(text))
            len_delta = abs(len(bt) - len(text)) / orig_len
            sim = difflib.SequenceMatcher(None, text, bt).ratio()
            # Accept if moderate change and not excessive drift
            if len_delta > 0.5:
                return text, False
            if sim < 0.45 or sim > 0.98:
                return text, False
        except Exception:
            pass
        return bt, True
    return text, False

def consistency_ok(user: str, out: str, ratio: float, paraphraser) -> bool:
    if ratio <= 0 or (not user) or (not out):
        return True
    if random.random() >= ratio:
        return True
    return paraphraser.consistency_check(user, out)

def is_invalid_response(text: str) -> bool:
    """Check if model response is invalid (Fail, Invalid, etc.)"""
    if not text or not isinstance(text, str):
        return True
    
    text_lower = text.lower().strip()
    invalid_patterns = [
        "fail", "invalid", "i couldn't", "i can't", "i cannot", "unable to",
        "sorry", "error", "not available", "no answer", "insufficient",
        "don't know", "do not know", "not sure", "cannot determine",
        "unable to provide", "not possible", "not applicable", "n/a"
    ]
    
    # Check if response is too short or matches invalid patterns
    if len(text_lower) < 3:
        return True
    
    for pattern in invalid_patterns:
        if pattern in text_lower:
            return True
    
    return False

def clean_invalid_response(text: str, fallback: str = "") -> str:
    """Clean invalid responses by returning fallback or empty string"""
    if is_invalid_response(text):
        return fallback
    return text

def retry_invalid_response(text: str, paraphraser, max_retries: int = 3) -> str:
    """Retry generating valid response for invalid text, max 3 retries"""
    if not is_invalid_response(text):
        return text
    
    for attempt in range(max_retries):
        try:
            # Try paraphrasing with different difficulty levels
            difficulty = "easy" if attempt == 0 else "hard" if attempt == 1 else "easy"
            retry_text = paraphraser.paraphrase(text, difficulty=difficulty)
            
            if retry_text and not is_invalid_response(retry_text):
                return retry_text
        except Exception as e:
            logger.warning(f"Retry attempt {attempt + 1} failed: {e}")
            continue
    
    # If all retries failed, return empty string to indicate drop
    return ""