Spaces:
Sleeping
Sleeping
| import json | |
| import time | |
| from typing import Dict, List, Any, Optional | |
| from sqlalchemy.orm import Session | |
| from .models import ConfigItem, SensitiveWord, WhitelistItem, SessionLocal | |
| class ConfigManager: | |
| _instance = None | |
| _cache = {} | |
| _last_refresh = 0 | |
| CACHE_TTL = 60 # 1 minute cache | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._refresh_cache_if_needed(force=True) | |
| return cls._instance | |
| def get_db(self): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def _refresh_cache_if_needed(self, force=False): | |
| if force or time.time() - self._last_refresh > self.CACHE_TTL: | |
| self.refresh_cache() | |
| def refresh_cache(self): | |
| # print("Refreshing config cache from DB...") | |
| db = SessionLocal() | |
| try: | |
| # 1. Refresh Sensitive Words | |
| words = db.query(SensitiveWord).filter(SensitiveWord.is_active == True).all() | |
| sensitive_dict = {} | |
| # Reconstruct nested structure: category -> subcategory -> list of words | |
| for w in words: | |
| if w.category not in sensitive_dict: | |
| sensitive_dict[w.category] = {} | |
| # If subcategory is empty/null, use 'default' or handle appropriately | |
| # Original JSON structure: "discrimination": {"age": [...], "gender": [...]} | |
| sub = w.subcategory if w.subcategory else "general" | |
| if sub not in sensitive_dict[w.category]: | |
| sensitive_dict[w.category][sub] = [] | |
| sensitive_dict[w.category][sub].append(w.word) | |
| # 2. Refresh Whitelist | |
| whitelist = db.query(WhitelistItem).filter(WhitelistItem.is_active == True).all() | |
| whitelist_list = [w.word for w in whitelist] | |
| # 3. Refresh Configs | |
| configs = db.query(ConfigItem).filter(ConfigItem.is_active == True).all() | |
| config_dict = {} | |
| for c in configs: | |
| try: | |
| if c.type in ['json', 'list', 'dict', 'bool']: | |
| val = json.loads(c.value) | |
| elif c.type == 'int': | |
| val = int(c.value) | |
| elif c.type == 'float': | |
| val = float(c.value) | |
| else: | |
| val = c.value | |
| config_dict[c.key] = val | |
| except Exception as e: | |
| print(f"Error parsing config {c.key}: {e}") | |
| config_dict[c.key] = c.value | |
| self._cache = { | |
| "sensitive_words": sensitive_dict, | |
| "whitelist": whitelist_list, | |
| "configs": config_dict | |
| } | |
| self._last_refresh = time.time() | |
| except Exception as e: | |
| # Silently handle database not initialized yet (first startup) | |
| # Tables will be created in main.py lifespan | |
| if "no such table" not in str(e): | |
| print(f"Error refreshing cache: {e}") | |
| finally: | |
| db.close() | |
| def get_sensitive_words(self) -> Dict: | |
| self._refresh_cache_if_needed() | |
| return self._cache.get("sensitive_words", {}) | |
| def get_whitelist(self) -> List[str]: | |
| self._refresh_cache_if_needed() | |
| return self._cache.get("whitelist", []) | |
| def get_config(self, key: str, default: Any = None) -> Any: | |
| self._refresh_cache_if_needed() | |
| if key == "all": | |
| return self._cache.get("configs", {}) | |
| return self._cache.get("configs", {}).get(key, default) | |
| def get_all_configs(self) -> Dict: | |
| self._refresh_cache_if_needed() | |
| return self._cache.get("configs", {}) | |
| # CRUD Operations (Direct DB access, clears cache) | |
| def add_sensitive_word(self, word: str, category: str, subcategory: Optional[str] = None, severity: str = 'medium'): | |
| db = SessionLocal() | |
| try: | |
| # Check if exists to update or add | |
| existing = db.query(SensitiveWord).filter( | |
| SensitiveWord.word == word, | |
| SensitiveWord.category == category, | |
| SensitiveWord.subcategory == subcategory | |
| ).first() | |
| if existing: | |
| existing.severity = severity | |
| existing.is_active = True | |
| else: | |
| item = SensitiveWord(word=word, category=category, subcategory=subcategory, severity=severity) | |
| db.add(item) | |
| db.commit() | |
| self._last_refresh = 0 # Invalidate cache | |
| return True | |
| except Exception as e: | |
| db.rollback() | |
| print(f"Error adding sensitive word: {e}") | |
| return False | |
| finally: | |
| db.close() | |
| def add_whitelist_item(self, word: str, category: str = 'general'): | |
| db = SessionLocal() | |
| try: | |
| existing = db.query(WhitelistItem).filter(WhitelistItem.word == word).first() | |
| if existing: | |
| existing.is_active = True | |
| else: | |
| item = WhitelistItem(word=word, category=category) | |
| db.add(item) | |
| db.commit() | |
| self._last_refresh = 0 | |
| return True | |
| except Exception as e: | |
| db.rollback() | |
| print(f"Error adding whitelist item: {e}") | |
| return False | |
| finally: | |
| db.close() | |
| def remove_sensitive_word(self, word: str) -> bool: | |
| db = SessionLocal() | |
| try: | |
| item = db.query(SensitiveWord).filter(SensitiveWord.word == word).first() | |
| if item: | |
| item.is_active = False # Soft delete | |
| db.commit() | |
| self._last_refresh = 0 | |
| return True | |
| return False | |
| except Exception as e: | |
| db.rollback() | |
| return False | |
| finally: | |
| db.close() | |
| def update_config(self, key: str, value: Any, type_: str = 'string', group: str = 'general') -> bool: | |
| db = SessionLocal() | |
| try: | |
| str_val = value | |
| if type_ in ['json', 'list', 'dict', 'bool']: | |
| str_val = json.dumps(value, ensure_ascii=False) | |
| elif type_ in ['int', 'float']: | |
| str_val = str(value) | |
| item = db.query(ConfigItem).filter(ConfigItem.key == key).first() | |
| if item: | |
| item.value = str_val | |
| item.type = type_ | |
| # item.group_name = group # Optional: update group | |
| else: | |
| item = ConfigItem(key=key, value=str_val, type=type_, group_name=group) | |
| db.add(item) | |
| db.commit() | |
| self._last_refresh = 0 | |
| return True | |
| except Exception as e: | |
| db.rollback() | |
| print(f"Error updating config: {e}") | |
| return False | |
| finally: | |
| db.close() | |
| config_manager = ConfigManager() | |