""" In-Memory Database with GitHub-only backup. All data lives in RAM. NO local file reads or writes. Updated: gevent-safe locks, session cache, zero threading imports. """ import time import gevent from gevent.lock import RLock from datetime import datetime from config import SESSION_CACHE_TTL class MemoryDB: """ Pure in-memory database. GitHub is the ONLY persistence layer. Zero local file I/O. All locks are gevent-native. """ _instance = None _init_lock = RLock() STORES = ['users', 'telegram', 'cards', 'chat_history'] STORE_FILES = { 'users': 'users.json', 'telegram': 'users_db.json', 'cards': 'cards.json', 'chat_history': 'chat_history_db.json', } @classmethod def get_instance(cls): if cls._instance is None: with cls._init_lock: if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): self._data = {} self._locks = {} self._last_backup = time.time() # Session validation cache: {session_id: (username, expire_time)} self._session_cache = {} self._session_cache_lock = RLock() for store_name in self.STORES: self._locks[store_name] = RLock() self._data[store_name] = {} self._initial_load() print(f" ✅ MemoryDB initialized (gevent-safe, GitHub-only, zero local files)") def _initial_load(self): """Load all data from GitHub on startup.""" try: from github_storage import get_github_storage gh = get_github_storage() if not gh._configured: print(f"\n âš ī¸ GitHub not configured - starting with EMPTY databases") print(f" Set GITHUB_TOKEN and GITHUB_REPO environment variables") return print(f"\n đŸ“Ĩ Loading databases from GitHub...") results, error = gh.pull_all() if results: loaded_any = False for store_name, data in results.items(): if data and isinstance(data, dict) and len(data) > 0: self._data[store_name] = data loaded_any = True if not loaded_any: print(f"\n â„šī¸ All databases empty on GitHub (first run)") total = sum(len(self._data[s]) for s in self.STORES) if total > 0: print(f"\n 📊 Total records loaded: {total}") else: print(f"\n 📊 Starting fresh with 0 records") except ImportError: print(f" ❌ github_storage module not found") print(f" âš ī¸ Starting with empty databases") except Exception as e: print(f" ❌ GitHub load error: {e}") print(f" âš ī¸ Starting with empty databases") # ─── SESSION CACHE ─── def cache_session(self, session_id, username): """Cache a valid session for fast lookup.""" with self._session_cache_lock: self._session_cache[session_id] = (username, time.time() + SESSION_CACHE_TTL) def check_session_cache(self, session_id): """ Check session cache. Returns username if valid, None if expired/missing. """ with self._session_cache_lock: entry = self._session_cache.get(session_id) if entry: username, expire_time = entry if time.time() < expire_time: return username else: del self._session_cache[session_id] return None def invalidate_session_cache(self, username): """Remove all cached sessions for a user (on login/logout).""" with self._session_cache_lock: stale = [sid for sid, (uname, _) in self._session_cache.items() if uname == username] for sid in stale: del self._session_cache[sid] # ─── READ OPERATIONS ─── def read(self, store_name): lock = self._locks.get(store_name) if not lock: return {} with lock: return dict(self._data.get(store_name, {})) def read_key(self, store_name, key, default=None): lock = self._locks.get(store_name) if not lock: return default with lock: return self._data.get(store_name, {}).get(key, default) def read_keys(self, store_name, keys): lock = self._locks.get(store_name) if not lock: return {} with lock: store = self._data.get(store_name, {}) return {k: store[k] for k in keys if k in store} def has_key(self, store_name, key): lock = self._locks.get(store_name) if not lock: return False with lock: return key in self._data.get(store_name, {}) def count(self, store_name): lock = self._locks.get(store_name) if not lock: return 0 with lock: return len(self._data.get(store_name, {})) # ─── WRITE OPERATIONS ─── def write(self, store_name, key, value): lock = self._locks.get(store_name) if not lock: return with lock: if store_name not in self._data: self._data[store_name] = {} self._data[store_name][key] = value def write_many(self, store_name, updates): lock = self._locks.get(store_name) if not lock: return with lock: if store_name not in self._data: self._data[store_name] = {} self._data[store_name].update(updates) def write_full(self, store_name, data): lock = self._locks.get(store_name) if not lock: return with lock: self._data[store_name] = data def update_key(self, store_name, key, update_fn): lock = self._locks.get(store_name) if not lock: return None with lock: current = self._data.get(store_name, {}).get(key, None) new_value = update_fn(current) if new_value is not None: if store_name not in self._data: self._data[store_name] = {} self._data[store_name][key] = new_value return new_value def delete(self, store_name, key): lock = self._locks.get(store_name) if not lock: return False with lock: if key in self._data.get(store_name, {}): del self._data[store_name][key] return True return False def delete_many(self, store_name, keys): lock = self._locks.get(store_name) if not lock: return 0 with lock: count = 0 store = self._data.get(store_name, {}) for key in keys: if key in store: del store[key] count += 1 return count # ─── QUERY OPERATIONS ─── def find(self, store_name, predicate): lock = self._locks.get(store_name) if not lock: return [] with lock: return [(k, v) for k, v in self._data.get(store_name, {}).items() if predicate(k, v)] def find_keys_by_prefix(self, store_name, prefix): lock = self._locks.get(store_name) if not lock: return {} with lock: store = self._data.get(store_name, {}) return {k: v for k, v in store.items() if k.startswith(prefix)} # ─── GITHUB BACKUP OPERATIONS ─── def push_to_github(self): """Push all in-memory data to GitHub.""" try: from github_storage import get_github_storage gh = get_github_storage() data_map = {} for store_name in self.STORES: with self._locks[store_name]: data_map[store_name] = dict(self._data.get(store_name, {})) success, errors = gh.push_all(data_map) if success: self._last_backup = time.time() return success, errors except Exception as e: err = f"GitHub push error: {e}" print(f" ❌ {err}") return False, [err] def pull_from_github(self): """Pull all data from GitHub and replace in-memory data.""" try: from github_storage import get_github_storage gh = get_github_storage() results, error = gh.pull_all() if error: return False, error if not results: return False, "No data returned from GitHub" for store_name, data in results.items(): if data and isinstance(data, dict): lock = self._locks.get(store_name) if lock: with lock: self._data[store_name] = data total = sum(len(v) for v in results.values()) print(f" ✅ Pulled {total} records from GitHub into memory") return True, None except Exception as e: err = f"GitHub pull error: {e}" print(f" ❌ {err}") return False, err def push_single_to_github(self, store_name): """Push a single store to GitHub.""" if store_name not in self.STORE_FILES: return False, f"Unknown store: {store_name}" try: from github_storage import get_github_storage gh = get_github_storage() filename = self.STORE_FILES[store_name] with self._locks[store_name]: data = dict(self._data.get(store_name, {})) success, error = gh.push_file(filename, data) return success, error except Exception as e: return False, f"Error pushing {store_name}: {e}" # ─── STATS ─── def get_stats(self): stats = {} for store_name in self.STORES: with self._locks[store_name]: stats[store_name] = { "records": len(self._data.get(store_name, {})), "github_file": self.STORE_FILES.get(store_name, ""), } stats["_meta"] = { "last_backup": datetime.fromtimestamp(self._last_backup).isoformat(), "mode": "github_only", "local_files": False, "session_cache_size": len(self._session_cache), } return stats def shutdown(self): """Graceful shutdown.""" print(" 🛑 MemoryDB shutting down...") total = sum(len(self._data.get(s, {})) for s in self.STORES) print(f" âš ī¸ {total} records in memory. Push to GitHub if needed: /backup_db") print(" ✅ MemoryDB shutdown complete") def get_db(): return MemoryDB.get_instance()