app / memory_db.py
Dooratre's picture
Upload 216 files
78c0e6a verified
"""
In-Memory Database with GitHub-only backup.
All data lives in RAM. NO local file reads or writes.
REMOVED: telegram store (using remote HuggingFace API)
REMOVED: cards store (using remote Cards Microservice)
"""
import time
import gevent
from gevent.lock import RLock
from datetime import datetime
from config import SESSION_CACHE_TTL
class MemoryDB:
"""
Pure in-memory database.
GitHub is the ONLY persistence layer.
Zero local file I/O.
Cards and Telegram are handled by external microservices.
"""
_instance = None
_init_lock = RLock()
# REMOVED 'telegram' and 'cards' from STORES
STORES = ['users', 'chat_history', 'board_saves', 'notifications']
STORE_FILES = {
'users': 'users.json',
'chat_history': 'chat_history_db.json',
'board_saves': 'board_saves.json',
'notifications': 'notifications.json',
}
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self._data = {}
self._locks = {}
self._last_backup = time.time()
# Session validation cache: {session_id: (username, expire_time)}
self._session_cache = {}
self._session_cache_lock = RLock()
for store_name in self.STORES:
self._locks[store_name] = RLock()
self._data[store_name] = {}
self._initial_load()
print(f" βœ… MemoryDB initialized (no cards, no telegram - external services)")
def _initial_load(self):
"""Load all data from GitHub on startup."""
try:
from github_storage import get_github_storage
gh = get_github_storage()
if not gh._configured:
print(f"\n ⚠️ GitHub not configured - starting with EMPTY databases")
return
print(f"\n πŸ“₯ Loading databases from GitHub...")
results, error = gh.pull_all()
if results:
loaded_any = False
for store_name, data in results.items():
if store_name not in self.STORES:
continue
if data and isinstance(data, dict) and len(data) > 0:
self._data[store_name] = data
loaded_any = True
if not loaded_any:
print(f"\n ℹ️ All databases empty on GitHub (first run)")
total = sum(len(self._data[s]) for s in self.STORES)
if total > 0:
print(f"\n πŸ“Š Total records loaded: {total}")
else:
print(f"\n πŸ“Š Starting fresh with 0 records")
except ImportError:
print(f" ❌ github_storage module not found")
except Exception as e:
print(f" ❌ GitHub load error: {e}")
# ─── SESSION CACHE ───
def cache_session(self, session_id, username):
with self._session_cache_lock:
self._session_cache[session_id] = (username, time.time() + SESSION_CACHE_TTL)
def check_session_cache(self, session_id):
with self._session_cache_lock:
entry = self._session_cache.get(session_id)
if entry:
username, expire_time = entry
if time.time() < expire_time:
return username
else:
del self._session_cache[session_id]
return None
def invalidate_session_cache(self, username):
with self._session_cache_lock:
stale = [sid for sid, (uname, _) in self._session_cache.items() if uname == username]
for sid in stale:
del self._session_cache[sid]
# ─── READ OPERATIONS ───
def read(self, store_name):
lock = self._locks.get(store_name)
if not lock:
return {}
with lock:
return dict(self._data.get(store_name, {}))
def read_key(self, store_name, key, default=None):
lock = self._locks.get(store_name)
if not lock:
return default
with lock:
return self._data.get(store_name, {}).get(key, default)
def read_keys(self, store_name, keys):
lock = self._locks.get(store_name)
if not lock:
return {}
with lock:
store = self._data.get(store_name, {})
return {k: store[k] for k in keys if k in store}
def has_key(self, store_name, key):
lock = self._locks.get(store_name)
if not lock:
return False
with lock:
return key in self._data.get(store_name, {})
def count(self, store_name):
lock = self._locks.get(store_name)
if not lock:
return 0
with lock:
return len(self._data.get(store_name, {}))
# ─── WRITE OPERATIONS ───
def write(self, store_name, key, value):
lock = self._locks.get(store_name)
if not lock:
return
with lock:
if store_name not in self._data:
self._data[store_name] = {}
self._data[store_name][key] = value
def write_many(self, store_name, updates):
lock = self._locks.get(store_name)
if not lock:
return
with lock:
if store_name not in self._data:
self._data[store_name] = {}
self._data[store_name].update(updates)
def write_full(self, store_name, data):
lock = self._locks.get(store_name)
if not lock:
return
with lock:
self._data[store_name] = data
def update_key(self, store_name, key, update_fn):
lock = self._locks.get(store_name)
if not lock:
return None
with lock:
current = self._data.get(store_name, {}).get(key, None)
new_value = update_fn(current)
if new_value is not None:
if store_name not in self._data:
self._data[store_name] = {}
self._data[store_name][key] = new_value
return new_value
def delete(self, store_name, key):
lock = self._locks.get(store_name)
if not lock:
return False
with lock:
if key in self._data.get(store_name, {}):
del self._data[store_name][key]
return True
return False
def delete_many(self, store_name, keys):
lock = self._locks.get(store_name)
if not lock:
return 0
with lock:
count = 0
store = self._data.get(store_name, {})
for key in keys:
if key in store:
del store[key]
count += 1
return count
# ─── QUERY OPERATIONS ───
def find(self, store_name, predicate):
lock = self._locks.get(store_name)
if not lock:
return []
with lock:
return [(k, v) for k, v in self._data.get(store_name, {}).items() if predicate(k, v)]
def find_keys_by_prefix(self, store_name, prefix):
lock = self._locks.get(store_name)
if not lock:
return {}
with lock:
store = self._data.get(store_name, {})
return {k: v for k, v in store.items() if k.startswith(prefix)}
# ─── GITHUB BACKUP OPERATIONS ───
def push_to_github(self):
try:
from github_storage import get_github_storage
gh = get_github_storage()
data_map = {}
for store_name in self.STORES:
with self._locks[store_name]:
data_map[store_name] = dict(self._data.get(store_name, {}))
success, errors = gh.push_all(data_map)
if success:
self._last_backup = time.time()
return success, errors
except Exception as e:
err = f"GitHub push error: {e}"
print(f" ❌ {err}")
return False, [err]
def pull_from_github(self):
try:
from github_storage import get_github_storage
gh = get_github_storage()
results, error = gh.pull_all()
if error:
return False, error
if not results:
return False, "No data returned from GitHub"
for store_name, data in results.items():
if store_name not in self.STORES:
continue
if data and isinstance(data, dict):
lock = self._locks.get(store_name)
if lock:
with lock:
self._data[store_name] = data
total = sum(len(self._data.get(s, {})) for s in self.STORES)
print(f" βœ… Pulled {total} records from GitHub into memory")
return True, None
except Exception as e:
err = f"GitHub pull error: {e}"
print(f" ❌ {err}")
return False, err
def push_single_to_github(self, store_name):
if store_name not in self.STORE_FILES:
return False, f"Unknown store: {store_name}"
try:
from github_storage import get_github_storage
gh = get_github_storage()
filename = self.STORE_FILES[store_name]
with self._locks[store_name]:
data = dict(self._data.get(store_name, {}))
success, error = gh.push_file(filename, data)
return success, error
except Exception as e:
return False, f"Error pushing {store_name}: {e}"
# ─── STATS ───
def get_stats(self):
stats = {}
for store_name in self.STORES:
with self._locks[store_name]:
stats[store_name] = {
"records": len(self._data.get(store_name, {})),
"github_file": self.STORE_FILES.get(store_name, ""),
}
stats["_meta"] = {
"last_backup": datetime.fromtimestamp(self._last_backup).isoformat(),
"mode": "github_only",
"local_files": False,
"cards": "external_microservice",
"telegram": "external_api",
"session_cache_size": len(self._session_cache),
}
return stats
def shutdown(self):
print(" πŸ›‘ MemoryDB shutting down...")
total = sum(len(self._data.get(s, {})) for s in self.STORES)
print(f" ⚠️ {total} records in memory. Push to GitHub if needed: /backup_db")
print(" βœ… MemoryDB shutdown complete")
def get_db():
return MemoryDB.get_instance()