| """
|
| In-Memory Database with GitHub-only backup.
|
| All data lives in RAM. NO local file reads or writes.
|
| Updated: gevent-safe locks, session cache, zero threading imports.
|
| """
|
|
|
| import time
|
| import gevent
|
| from gevent.lock import RLock
|
| from datetime import datetime
|
| from config import SESSION_CACHE_TTL
|
|
|
|
|
| class MemoryDB:
|
| """
|
| Pure in-memory database.
|
| GitHub is the ONLY persistence layer.
|
| Zero local file I/O.
|
| All locks are gevent-native.
|
| """
|
|
|
| _instance = None
|
| _init_lock = RLock()
|
|
|
| STORES = ['users', 'telegram', 'cards', 'chat_history']
|
|
|
| STORE_FILES = {
|
| 'users': 'users.json',
|
| 'telegram': 'users_db.json',
|
| 'cards': 'cards.json',
|
| 'chat_history': 'chat_history_db.json',
|
| }
|
|
|
| @classmethod
|
| def get_instance(cls):
|
| if cls._instance is None:
|
| with cls._init_lock:
|
| if cls._instance is None:
|
| cls._instance = cls()
|
| return cls._instance
|
|
|
| def __init__(self):
|
| self._data = {}
|
| self._locks = {}
|
| self._last_backup = time.time()
|
|
|
|
|
| self._session_cache = {}
|
| self._session_cache_lock = RLock()
|
|
|
| for store_name in self.STORES:
|
| self._locks[store_name] = RLock()
|
| self._data[store_name] = {}
|
|
|
| self._initial_load()
|
|
|
| print(f" β
MemoryDB initialized (gevent-safe, GitHub-only, zero local files)")
|
|
|
| def _initial_load(self):
|
| """Load all data from GitHub on startup."""
|
| try:
|
| from github_storage import get_github_storage
|
| gh = get_github_storage()
|
|
|
| if not gh._configured:
|
| print(f"\n β οΈ GitHub not configured - starting with EMPTY databases")
|
| print(f" Set GITHUB_TOKEN and GITHUB_REPO environment variables")
|
| return
|
|
|
| print(f"\n π₯ Loading databases from GitHub...")
|
| results, error = gh.pull_all()
|
|
|
| if results:
|
| loaded_any = False
|
| for store_name, data in results.items():
|
| if data and isinstance(data, dict) and len(data) > 0:
|
| self._data[store_name] = data
|
| loaded_any = True
|
|
|
| if not loaded_any:
|
| print(f"\n βΉοΈ All databases empty on GitHub (first run)")
|
|
|
| total = sum(len(self._data[s]) for s in self.STORES)
|
| if total > 0:
|
| print(f"\n π Total records loaded: {total}")
|
| else:
|
| print(f"\n π Starting fresh with 0 records")
|
|
|
| except ImportError:
|
| print(f" β github_storage module not found")
|
| print(f" β οΈ Starting with empty databases")
|
| except Exception as e:
|
| print(f" β GitHub load error: {e}")
|
| print(f" β οΈ Starting with empty databases")
|
|
|
|
|
|
|
| def cache_session(self, session_id, username):
|
| """Cache a valid session for fast lookup."""
|
| with self._session_cache_lock:
|
| self._session_cache[session_id] = (username, time.time() + SESSION_CACHE_TTL)
|
|
|
| def check_session_cache(self, session_id):
|
| """
|
| Check session cache. Returns username if valid, None if expired/missing.
|
| """
|
| with self._session_cache_lock:
|
| entry = self._session_cache.get(session_id)
|
| if entry:
|
| username, expire_time = entry
|
| if time.time() < expire_time:
|
| return username
|
| else:
|
| del self._session_cache[session_id]
|
| return None
|
|
|
| def invalidate_session_cache(self, username):
|
| """Remove all cached sessions for a user (on login/logout)."""
|
| with self._session_cache_lock:
|
| stale = [sid for sid, (uname, _) in self._session_cache.items() if uname == username]
|
| for sid in stale:
|
| del self._session_cache[sid]
|
|
|
|
|
|
|
| def read(self, store_name):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return {}
|
| with lock:
|
| return dict(self._data.get(store_name, {}))
|
|
|
| def read_key(self, store_name, key, default=None):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return default
|
| with lock:
|
| return self._data.get(store_name, {}).get(key, default)
|
|
|
| def read_keys(self, store_name, keys):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return {}
|
| with lock:
|
| store = self._data.get(store_name, {})
|
| return {k: store[k] for k in keys if k in store}
|
|
|
| def has_key(self, store_name, key):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return False
|
| with lock:
|
| return key in self._data.get(store_name, {})
|
|
|
| def count(self, store_name):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return 0
|
| with lock:
|
| return len(self._data.get(store_name, {}))
|
|
|
|
|
|
|
| def write(self, store_name, key, value):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return
|
| with lock:
|
| if store_name not in self._data:
|
| self._data[store_name] = {}
|
| self._data[store_name][key] = value
|
|
|
| def write_many(self, store_name, updates):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return
|
| with lock:
|
| if store_name not in self._data:
|
| self._data[store_name] = {}
|
| self._data[store_name].update(updates)
|
|
|
| def write_full(self, store_name, data):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return
|
| with lock:
|
| self._data[store_name] = data
|
|
|
| def update_key(self, store_name, key, update_fn):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return None
|
| with lock:
|
| current = self._data.get(store_name, {}).get(key, None)
|
| new_value = update_fn(current)
|
| if new_value is not None:
|
| if store_name not in self._data:
|
| self._data[store_name] = {}
|
| self._data[store_name][key] = new_value
|
| return new_value
|
|
|
| def delete(self, store_name, key):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return False
|
| with lock:
|
| if key in self._data.get(store_name, {}):
|
| del self._data[store_name][key]
|
| return True
|
| return False
|
|
|
| def delete_many(self, store_name, keys):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return 0
|
| with lock:
|
| count = 0
|
| store = self._data.get(store_name, {})
|
| for key in keys:
|
| if key in store:
|
| del store[key]
|
| count += 1
|
| return count
|
|
|
|
|
|
|
| def find(self, store_name, predicate):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return []
|
| with lock:
|
| return [(k, v) for k, v in self._data.get(store_name, {}).items() if predicate(k, v)]
|
|
|
| def find_keys_by_prefix(self, store_name, prefix):
|
| lock = self._locks.get(store_name)
|
| if not lock:
|
| return {}
|
| with lock:
|
| store = self._data.get(store_name, {})
|
| return {k: v for k, v in store.items() if k.startswith(prefix)}
|
|
|
|
|
|
|
| def push_to_github(self):
|
| """Push all in-memory data to GitHub."""
|
| try:
|
| from github_storage import get_github_storage
|
| gh = get_github_storage()
|
|
|
| data_map = {}
|
| for store_name in self.STORES:
|
| with self._locks[store_name]:
|
| data_map[store_name] = dict(self._data.get(store_name, {}))
|
|
|
| success, errors = gh.push_all(data_map)
|
| if success:
|
| self._last_backup = time.time()
|
| return success, errors
|
|
|
| except Exception as e:
|
| err = f"GitHub push error: {e}"
|
| print(f" β {err}")
|
| return False, [err]
|
|
|
| def pull_from_github(self):
|
| """Pull all data from GitHub and replace in-memory data."""
|
| try:
|
| from github_storage import get_github_storage
|
| gh = get_github_storage()
|
|
|
| results, error = gh.pull_all()
|
| if error:
|
| return False, error
|
|
|
| if not results:
|
| return False, "No data returned from GitHub"
|
|
|
| for store_name, data in results.items():
|
| if data and isinstance(data, dict):
|
| lock = self._locks.get(store_name)
|
| if lock:
|
| with lock:
|
| self._data[store_name] = data
|
|
|
| total = sum(len(v) for v in results.values())
|
| print(f" β
Pulled {total} records from GitHub into memory")
|
| return True, None
|
|
|
| except Exception as e:
|
| err = f"GitHub pull error: {e}"
|
| print(f" β {err}")
|
| return False, err
|
|
|
| def push_single_to_github(self, store_name):
|
| """Push a single store to GitHub."""
|
| if store_name not in self.STORE_FILES:
|
| return False, f"Unknown store: {store_name}"
|
|
|
| try:
|
| from github_storage import get_github_storage
|
| gh = get_github_storage()
|
|
|
| filename = self.STORE_FILES[store_name]
|
|
|
| with self._locks[store_name]:
|
| data = dict(self._data.get(store_name, {}))
|
|
|
| success, error = gh.push_file(filename, data)
|
| return success, error
|
|
|
| except Exception as e:
|
| return False, f"Error pushing {store_name}: {e}"
|
|
|
|
|
|
|
| def get_stats(self):
|
| stats = {}
|
| for store_name in self.STORES:
|
| with self._locks[store_name]:
|
| stats[store_name] = {
|
| "records": len(self._data.get(store_name, {})),
|
| "github_file": self.STORE_FILES.get(store_name, ""),
|
| }
|
|
|
| stats["_meta"] = {
|
| "last_backup": datetime.fromtimestamp(self._last_backup).isoformat(),
|
| "mode": "github_only",
|
| "local_files": False,
|
| "session_cache_size": len(self._session_cache),
|
| }
|
| return stats
|
|
|
| def shutdown(self):
|
| """Graceful shutdown."""
|
| print(" π MemoryDB shutting down...")
|
| total = sum(len(self._data.get(s, {})) for s in self.STORES)
|
| print(f" β οΈ {total} records in memory. Push to GitHub if needed: /backup_db")
|
| print(" β
MemoryDB shutdown complete")
|
|
|
|
|
| def get_db():
|
| return MemoryDB.get_instance() |