Spaces:
Running
Running
| # admin_patterns.py | |
| import os | |
| import json | |
| import re | |
| import shutil | |
| import pandas as pd | |
| from datetime import datetime | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| PATTERN_FILE = os.path.join(BASE_DIR, "patterns.json") | |
| pattern_config = None | |
| DEFAULT_PATTERN_CONFIG = {"global": [], "states": {}} | |
| def _make_backup_of_patterns(): | |
| """ | |
| If PATTERN_FILE exists, copy it to patterns_backup_{timestamp}.json | |
| Returns backup path or None on failure/if not exists. | |
| """ | |
| try: | |
| if os.path.exists(PATTERN_FILE): | |
| ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") | |
| backup_name = f"patterns_backup_{ts}.json" | |
| backup_path = os.path.join(BASE_DIR, backup_name) | |
| shutil.copyfile(PATTERN_FILE, backup_path) | |
| return backup_path | |
| except Exception as e: | |
| print("ERROR creating patterns.json backup:", e) | |
| return None | |
| def load_pattern_config(debug: bool = False): | |
| """ | |
| Simplified config loader for Hugging Face Spaces: | |
| - Checks common locations in order | |
| - Returns first valid config found | |
| - Falls back to default if none found | |
| """ | |
| global pattern_config | |
| # Search paths in priority order | |
| search_paths = [ | |
| PATTERN_FILE, # Same dir as module | |
| "/app/patterns.json", # HF Spaces mount point | |
| os.path.join(os.getcwd(), "patterns.json") # Current directory | |
| ] | |
| # Try each path | |
| for path in search_paths: | |
| if not os.path.exists(path): | |
| continue | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| cfg = json.load(f) | |
| # Validate it has expected structure | |
| if isinstance(cfg, dict) and ("global" in cfg or "states" in cfg): | |
| pattern_config = cfg | |
| if debug: | |
| print(f"Loaded config from: {path}") | |
| return cfg | |
| except (json.JSONDecodeError, IOError) as e: | |
| if debug: | |
| print(f"Failed to load {path}: {e}") | |
| continue | |
| # No valid config found - use default | |
| pattern_config = DEFAULT_PATTERN_CONFIG.copy() | |
| if debug: | |
| print("No valid config found, using defaults") | |
| return pattern_config | |
| def save_pattern_config(cfg: dict): | |
| """ | |
| Save the given config to PATTERN_FILE with a timestamped backup of the previous file. | |
| """ | |
| global pattern_config | |
| # create backup first | |
| backup_path = _make_backup_of_patterns() | |
| if backup_path: | |
| print(f"INFO: patterns.json backed up to {backup_path}") | |
| # write new file | |
| with open(PATTERN_FILE, "w", encoding="utf-8") as f: | |
| json.dump(cfg, f, indent=2, ensure_ascii=False) | |
| pattern_config = cfg.copy() | |
| return True | |
| def build_patterns_from_config(cfg: dict, state_key: str | None): | |
| global_list = [(p["pattern"], p["replacement"]) for p in cfg.get("global", [])] | |
| state_list = [] | |
| if state_key: | |
| state_key_up = state_key.upper().strip() | |
| state_patterns = cfg.get("states", {}).get(state_key_up, []) | |
| state_list = [(p["pattern"], p["replacement"]) for p in state_patterns] | |
| return global_list, state_list | |
| def normalize_with_patterns_dynamic(s: str, state_key: str | None): | |
| global pattern_config | |
| if not isinstance(s, str): | |
| return "" | |
| s = s.upper() | |
| if pattern_config is None: | |
| load_pattern_config(debug=False) | |
| cfg = pattern_config or DEFAULT_PATTERN_CONFIG | |
| global_patterns, state_patterns = build_patterns_from_config(cfg, state_key) | |
| for pat, repl in global_patterns: | |
| try: | |
| s = re.sub(pat, repl, s) | |
| except re.error: | |
| continue | |
| for pat, repl in state_patterns: | |
| try: | |
| s = re.sub(pat, repl, s) | |
| except re.error: | |
| continue | |
| s = re.sub(r"[^A-Z0-9]+", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| # Admin helpers for the UI | |
| def load_global_patterns_for_editor(): | |
| cfg = load_pattern_config(debug=False) | |
| return pd.DataFrame(cfg.get("global", [])) | |
| def load_state_patterns_for_editor(selected_state: str | None, new_state_name: str | None): | |
| cfg = load_pattern_config(debug=False) | |
| key = None | |
| if new_state_name and new_state_name.strip(): | |
| key = new_state_name.strip().upper() | |
| elif selected_state: | |
| key = selected_state.strip().upper() | |
| if not key: | |
| return pd.DataFrame(columns=["pattern", "replacement"]) | |
| state_patterns = cfg.get("states", {}).get(key, []) | |
| return pd.DataFrame(state_patterns) | |
| def save_global_patterns_from_editor(df: pd.DataFrame, admin_password: str, expected_password: str): | |
| if expected_password is None: | |
| return "β ADMIN password not configured in environment." | |
| if admin_password != expected_password: | |
| return "β Invalid admin password. Global patterns NOT saved." | |
| cfg = load_pattern_config(debug=False) | |
| cfg["global"] = df.fillna("").to_dict(orient="records") | |
| save_pattern_config(cfg) | |
| gcount = len(cfg.get("global", [])) | |
| skeys = sorted(list(cfg.get("states", {}).keys())) | |
| return f"β Global patterns saved β global={gcount}, state_keys={skeys}" | |
| def save_state_patterns_from_editor(selected_state: str | None, new_state_name: str | None, df: pd.DataFrame, admin_password: str, expected_password: str): | |
| if expected_password is None: | |
| return "β ADMIN password not configured in environment." | |
| if admin_password != expected_password: | |
| return "β Invalid admin password. State patterns NOT saved." | |
| key = None | |
| if new_state_name and new_state_name.strip(): | |
| key = new_state_name.strip().upper() | |
| elif selected_state: | |
| key = selected_state.strip().upper() | |
| if not key: | |
| return "β Please select a state or type a new state key." | |
| cfg = load_pattern_config(debug=False) | |
| cfg.setdefault("states", {})[key] = df.fillna("").to_dict(orient="records") | |
| save_pattern_config(cfg) | |
| gcount = len(cfg.get("global", [])) | |
| skeys = sorted(list(cfg.get("states", {}).keys())) | |
| return f"β Patterns for {key} saved β global={gcount}, state_keys={skeys}" | |
| def refresh_pattern_config(): | |
| cfg = load_pattern_config(debug=True) | |
| gcount = len(cfg.get("global", [])) | |
| skeys = sorted(list(cfg.get("states", {}).keys())) | |
| return f"Refreshed patterns.json β global={gcount}, state_keys={skeys}" | |
| def show_patterns_file_info(): | |
| info_lines = [] | |
| info_lines.append(f"PATTERN_FILE: {PATTERN_FILE}") | |
| info_lines.append(f"Exists: {os.path.exists(PATTERN_FILE)}") | |
| if os.path.exists(PATTERN_FILE): | |
| try: | |
| size = os.path.getsize(PATTERN_FILE) | |
| info_lines.append(f"Size (bytes): {size}") | |
| with open(PATTERN_FILE, "r", encoding="utf-8") as f: | |
| txt = f.read(1000) | |
| info_lines.append("Preview (first 1000 chars):") | |
| info_lines.append("```json\n" + txt + ("\n... (truncated)" if len(txt) >= 1000 else "") + "\n```") | |
| except Exception as e: | |
| info_lines.append("ERROR reading file: " + str(e)) | |
| try: | |
| listing = os.listdir(BASE_DIR) | |
| info_lines.append("Files in BASE_DIR: " + ", ".join(listing)) | |
| except Exception as e: | |
| info_lines.append("ERROR listing BASE_DIR: " + str(e)) | |
| return "\n\n".join(info_lines) | |