Spaces:
Sleeping
Sleeping
File size: 5,897 Bytes
dc06d4c 5c074ff dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 dc06d4c c6a3f44 5c074ff dc06d4c 5c074ff dc06d4c 5c074ff dc06d4c 5c074ff dc06d4c c6a3f44 dc06d4c 5c074ff dc06d4c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | from pathlib import Path
import re
import unicodedata
from src.config import HF_TOKEN, SPACE_ID
def strip_degrees_for_search(text):
"""Remove common degree words before matching institution names."""
if not isinstance(text, str): return text
degree_pattern = r'\b(MSc|MBA|BBA|BSc|Ph\.?D\.?|BA|MA|BS|MS|EMBA|Master|Bachelor|Masters|Bachelors|Licence)\b'
cleaned = re.sub(degree_pattern, '', text, flags=re.IGNORECASE)
cleaned = re.sub(r'\s+', ' ', cleaned)
cleaned = cleaned.strip(' -.,&/|')
if not cleaned: return text.strip()
return cleaned
def smart_format(text):
"""Title-case free text while preserving common academic/business acronyms."""
if not isinstance(text, str): return text
res = text.title()
acronyms = ['Ma', 'Ba', 'Mba', 'Bba', 'Hr', 'It', 'Bs', 'Ms', 'Phd', 'Bsc', 'Msc', 'Llm', 'Pge', 'Cems']
for ac in acronyms:
res = re.sub(rf'\b{ac}\b', lambda m: m.group(0).upper(), res)
res = res.replace("PHD", "PhD").replace("BSC", "BSc").replace("MSC", "MSc")
res = re.sub(r"\b(L|D|Qu)'([A-Z])", lambda m: f"{m.group(1)}'{m.group(2).lower()}", res)
return res.strip()
def clean_degree_text(text):
"""Normalize degree titles before within-school clustering."""
if not isinstance(text, str): return ""
text = re.sub(r'\band\b', '&', text, flags=re.IGNORECASE)
text = re.sub(r'\bet\b', '&', text, flags=re.IGNORECASE)
text = re.sub(r'[^\w\s\-&\+\']', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return smart_format(text)
def normalize_text(text):
"""Normalize text for accent-insensitive, case-insensitive comparisons."""
if not isinstance(text, str): return ""
normalized = ''.join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn')
return normalized.strip().lower()
def normalize_ref(value):
"""Normalize a reference value or alias for dictionary/set lookups."""
return normalize_text(str(value))
def iter_ref_values(ref_data):
"""Yield all searchable strings from list-style or dict-style references."""
if isinstance(ref_data, dict):
yield from (item for item in ref_data.keys() if isinstance(item, str))
yield from (item for item in ref_data.values() if isinstance(item, str))
elif isinstance(ref_data, list):
yield from (item for item in ref_data if isinstance(item, str))
def ref_contains(ref_data, value):
"""Return whether a reference bucket already contains a value/alias."""
needle = normalize_ref(value)
return any(normalize_ref(item) == needle for item in iter_ref_values(ref_data))
def prune_manual_refs_against_official(manual_refs, official_refs):
"""Remove manual values that are duplicates of official references."""
removed_count = 0
for column_name, manual_bucket in list(manual_refs.items()):
official_bucket = official_refs.get(column_name, [])
if isinstance(manual_bucket, list):
kept = []
seen = set()
for value in manual_bucket:
if not isinstance(value, str):
removed_count += 1
continue
key = normalize_ref(value)
if not key or key in seen:
removed_count += 1
continue
if ref_contains(official_bucket, value):
removed_count += 1
continue
seen.add(key)
kept.append(value)
manual_refs[column_name] = kept
elif isinstance(manual_bucket, dict):
kept = {}
seen_values = set()
for alias, value in manual_bucket.items():
candidate = value if isinstance(value, str) else alias
key = normalize_ref(candidate)
if not key or key in seen_values:
removed_count += 1
continue
if ref_contains(official_bucket, candidate):
removed_count += 1
continue
seen_values.add(key)
kept[normalize_ref(alias)] = value
manual_refs[column_name] = kept
return removed_count
MANUAL_REFERENCES_REPO_PATH = "refdata/manual_references.json"
def reference_sync_status():
"""Report whether the app can commit manual refs back to Hugging Face."""
if not SPACE_ID:
return {
"enabled": False,
"space_id": "",
"reason": "Reference sync is only available on Hugging Face Spaces.",
}
if not HF_TOKEN:
return {
"enabled": False,
"space_id": SPACE_ID,
"reason": "HF_TOKEN secret is missing from this Space.",
}
return {
"enabled": True,
"space_id": SPACE_ID,
"reason": "",
}
def save_manual_references_to_hub(app_root: Path):
"""Commit the current manual references file back to the Space repository."""
status = reference_sync_status()
if not status["enabled"]:
raise RuntimeError(status["reason"])
manual_refs_path = app_root / MANUAL_REFERENCES_REPO_PATH
if not manual_refs_path.is_file():
raise FileNotFoundError(f"Manual references file not found: {manual_refs_path}")
try:
from huggingface_hub import HfApi
except ImportError as exc:
raise RuntimeError("huggingface_hub is not installed.") from exc
api = HfApi(token=HF_TOKEN)
commit_info = api.upload_file(
path_or_fileobj=str(manual_refs_path),
path_in_repo=MANUAL_REFERENCES_REPO_PATH,
repo_id=status["space_id"],
repo_type="space",
commit_message="Update manual references",
)
return {
"space_id": status["space_id"],
"path": MANUAL_REFERENCES_REPO_PATH,
"commit_url": str(commit_info),
}
|