Spaces:
Sleeping
Sleeping
| """ | |
| Sync Engine — SQLite ↔ Cloudflare R2 / HuggingFace Dataset Sync (v7.1) | |
| PRIMARY: Cloudflare R2 (S3-compatible, zero egress, 10GB free) | |
| FALLBACK: HuggingFace Datasets (original v7.0 behavior) | |
| R2 advantages over HF Datasets: | |
| - Faster uploads/downloads (S3 API, global CDN vs Git LFS) | |
| - No HF rate limits or intermittent 503s | |
| - Zero egress fees (data out is free) | |
| - 10GB free storage (our .db is ~10-50MB) | |
| - S3 API = industry standard, battle-tested boto3 client | |
| Architecture: | |
| Streamlit App (local SQLite) ←──sync──→ R2 Bucket (backup .db) | |
| HF Space (reads same .db from R2) | |
| Lifecycle: | |
| 1. On startup: download mnemo.db from R2 → local path | |
| 2. If no .db exists, check for legacy mnemo_db.json (HF) and flag migration | |
| 3. Background thread: every SYNC_INTERVAL seconds, if dirty: | |
| a. WAL checkpoint (flush pending writes to main .db) | |
| b. Upload .db to R2 | |
| 4. On demand: force_sync() for immediate upload | |
| Conflict resolution: last-write-wins (single user, safe for Tina's setup). | |
| Required Streamlit secrets (for R2 primary): | |
| R2_ACCOUNT_ID = "your-cloudflare-account-id" | |
| R2_ACCESS_KEY_ID = "your-r2-access-key" | |
| R2_SECRET_ACCESS_KEY = "your-r2-secret-key" | |
| R2_BUCKET_NAME = "mnemo" # optional, defaults to "mnemo" | |
| Fallback HF secrets (existing): | |
| HF_TOKEN = "hf_..." | |
| DATASET_REPO_ID = "AthelaPerk/Private" | |
| Thread-safe. Non-blocking. All sync operations run in a daemon thread. | |
| """ | |
| import os | |
| import time | |
| import shutil | |
| import sqlite3 | |
| import logging | |
| import threading | |
| from typing import Optional | |
| from pathlib import Path | |
| log = logging.getLogger("mnemo.sync") | |
| # Sync interval in seconds | |
| SYNC_INTERVAL = 30 | |
| # R2 config defaults | |
| DEFAULT_R2_BUCKET = "mnemo" | |
| DB_KEY_IN_R2 = "mnemo.db" | |
| # HF Dataset config (fallback) | |
| DEFAULT_DATASET_REPO = "AthelaPerk/Private" | |
| DB_FILENAME_IN_REPO = "mnemo.db" | |
| LEGACY_JSON_FILENAME = "mnemo_db.json" | |
| # ============================================================================= | |
| # R2 CLIENT (lazy, lightweight S3-compatible) | |
| # ============================================================================= | |
| class R2Client: | |
| """Minimal S3-compatible client for Cloudflare R2. | |
| Uses boto3 under the hood — already in most Python environments. | |
| Falls back gracefully if credentials are missing. | |
| """ | |
| def __init__(self, account_id: str = None, access_key_id: str = None, | |
| secret_access_key: str = None, bucket_name: str = None): | |
| self.account_id = account_id or os.environ.get("R2_ACCOUNT_ID", "") | |
| self.access_key_id = access_key_id or os.environ.get("R2_ACCESS_KEY_ID", "") | |
| self.secret_access_key = secret_access_key or os.environ.get("R2_SECRET_ACCESS_KEY", "") | |
| self.bucket_name = bucket_name or os.environ.get("R2_BUCKET_NAME", DEFAULT_R2_BUCKET) | |
| self._client = None | |
| def available(self) -> bool: | |
| return bool(self.account_id and self.access_key_id and self.secret_access_key) | |
| def client(self): | |
| """Lazy-init boto3 S3 client pointing at R2 endpoint.""" | |
| if self._client is None and self.available: | |
| import boto3 | |
| self._client = boto3.client( | |
| "s3", | |
| endpoint_url=f"https://{self.account_id}.r2.cloudflarestorage.com", | |
| aws_access_key_id=self.access_key_id, | |
| aws_secret_access_key=self.secret_access_key, | |
| region_name="auto", | |
| ) | |
| return self._client | |
| def download(self, key: str, local_path: str) -> bool: | |
| """Download object from R2 to local file.""" | |
| if not self.available: | |
| return False | |
| try: | |
| os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True) | |
| self.client.download_file(self.bucket_name, key, local_path) | |
| return True | |
| except Exception as e: | |
| error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") | |
| if error_code in ("404", "NoSuchKey"): | |
| return False # Object doesn't exist yet — normal on first run | |
| log.warning(f"R2 download failed ({key}): {type(e).__name__}: {e}") | |
| return False | |
| def upload(self, local_path: str, key: str) -> bool: | |
| """Upload local file to R2.""" | |
| if not self.available: | |
| return False | |
| try: | |
| self.client.upload_file(local_path, self.bucket_name, key) | |
| return True | |
| except Exception as e: | |
| log.error(f"R2 upload failed ({key}): {type(e).__name__}: {e}") | |
| return False | |
| def exists(self, key: str) -> bool: | |
| """Check if object exists in R2.""" | |
| if not self.available: | |
| return False | |
| try: | |
| self.client.head_object(Bucket=self.bucket_name, Key=key) | |
| return True | |
| except Exception: | |
| return False | |
| # ============================================================================= | |
| # SYNC ENGINE (R2 primary, HF fallback) | |
| # ============================================================================= | |
| class SyncEngine: | |
| """Bidirectional sync between local SQLite and cloud storage. | |
| Priority: R2 (if credentials set) → HF Datasets (fallback) → disabled. | |
| Usage: | |
| sync = SyncEngine( | |
| db_path="/home/user/.mnemo/mnemo.db", | |
| hf_token="hf_...", | |
| dataset_repo_id="AthelaPerk/Private", | |
| ) | |
| sync.download() # On startup — get latest .db | |
| sync.start_background() # Start daemon sync thread | |
| # ... app runs, writes to SQLite ... | |
| sync.mark_dirty() # After writes, signal upload needed | |
| sync.force_sync() # Immediate upload (e.g., before shutdown) | |
| sync.stop() # Clean shutdown | |
| """ | |
| def __init__(self, db_path: str, hf_token: str = None, | |
| dataset_repo_id: str = None, | |
| sync_interval: int = SYNC_INTERVAL): | |
| self.db_path = db_path | |
| self.hf_token = hf_token or os.environ.get("HF_TOKEN", "") | |
| self.dataset_repo_id = dataset_repo_id or os.environ.get( | |
| "DATASET_REPO_ID", DEFAULT_DATASET_REPO) | |
| self.sync_interval = sync_interval | |
| self._dirty = False | |
| self._lock = threading.Lock() | |
| self._stop_event = threading.Event() | |
| self._worker: Optional[threading.Thread] = None | |
| self._last_sync: float = 0 | |
| self._sync_count: int = 0 | |
| self._sync_errors: int = 0 | |
| # R2 client (primary) | |
| self._r2 = R2Client() | |
| # HF API (fallback, lazy init) | |
| self._hf_api_instance = None | |
| # Determine backend | |
| if self._r2.available: | |
| self._backend = "r2" | |
| log.info("[SYNC] ✅ Using Cloudflare R2 (primary)") | |
| print("[SYNC] ✅ Using Cloudflare R2") | |
| elif self.hf_token: | |
| self._backend = "hf" | |
| log.info("[SYNC] Using HuggingFace Datasets (fallback)") | |
| print("[SYNC] Using HuggingFace Datasets (fallback)") | |
| else: | |
| self._backend = "none" | |
| log.warning("[SYNC] No sync credentials — running offline") | |
| print("[SYNC] ⚠️ No sync credentials — running offline") | |
| def backend(self) -> str: | |
| """Current sync backend: 'r2', 'hf', or 'none'.""" | |
| return self._backend | |
| def _hf_api(self): | |
| """Lazy-init HfApi for fallback.""" | |
| if self._hf_api_instance is None and self.hf_token: | |
| from huggingface_hub import HfApi | |
| self._hf_api_instance = HfApi(token=self.hf_token) | |
| return self._hf_api_instance | |
| def has_credentials(self) -> bool: | |
| return self._backend != "none" | |
| # ========================================================================= | |
| # DOWNLOAD (startup) | |
| # ========================================================================= | |
| def download(self) -> bool: | |
| """Download .db from cloud storage. Returns True if downloaded. | |
| Tries R2 first, falls back to HF Datasets. | |
| If neither has a .db, checks HF for legacy JSON for migration. | |
| """ | |
| if not self.has_credentials: | |
| log.warning("No sync credentials — skipping download.") | |
| return False | |
| os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True) | |
| # === R2 PRIMARY === | |
| if self._backend == "r2": | |
| if self._r2.download(DB_KEY_IN_R2, self.db_path): | |
| size_mb = os.path.getsize(self.db_path) / 1_048_576 | |
| log.info(f"Downloaded {DB_KEY_IN_R2} from R2 ({size_mb:.1f} MB)") | |
| print(f"[SYNC] Downloaded mnemo.db from R2 ({size_mb:.1f} MB)") | |
| return True | |
| log.info("No .db in R2 — checking HF for legacy data...") | |
| # Fall through to HF for legacy migration check | |
| # === HF DATASETS (primary if no R2, or fallback for legacy) === | |
| if self.hf_token: | |
| # Try .db from HF | |
| if self._backend == "hf": | |
| if self._download_hf_file(DB_FILENAME_IN_REPO, self.db_path): | |
| log.info(f"Downloaded {DB_FILENAME_IN_REPO} from HF Datasets.") | |
| return True | |
| # Try legacy JSON for migration | |
| legacy_path = self._legacy_json_path() | |
| if self._download_hf_file(LEGACY_JSON_FILENAME, legacy_path): | |
| log.info(f"Downloaded legacy {LEGACY_JSON_FILENAME} for migration.") | |
| return False # Signal caller to run migration | |
| log.info("No existing data found — starting fresh.") | |
| return False | |
| def _download_hf_file(self, filename: str, local_path: str) -> bool: | |
| """Download a single file from HF Datasets repo.""" | |
| try: | |
| import concurrent.futures | |
| from huggingface_hub import hf_hub_download | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit( | |
| hf_hub_download, | |
| repo_id=self.dataset_repo_id, | |
| filename=filename, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| force_download=True, | |
| ) | |
| try: | |
| downloaded = future.result(timeout=60) | |
| except concurrent.futures.TimeoutError: | |
| log.warning(f"HF download timed out: {filename}") | |
| return False | |
| os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True) | |
| shutil.copy2(downloaded, local_path) | |
| return True | |
| except Exception as e: | |
| if "EntryNotFoundError" in type(e).__name__ or "404" in str(e): | |
| return False | |
| log.warning(f"HF download failed ({filename}): {type(e).__name__}: {e}") | |
| return False | |
| def _legacy_json_path(self) -> str: | |
| return os.path.join( | |
| os.path.dirname(self.db_path) or ".", | |
| "mnemo_legacy.json" | |
| ) | |
| def get_legacy_json_path(self) -> Optional[str]: | |
| path = self._legacy_json_path() | |
| return path if os.path.exists(path) else None | |
| # ========================================================================= | |
| # UPLOAD (background sync) | |
| # ========================================================================= | |
| def upload(self) -> bool: | |
| """Upload local .db to cloud storage. | |
| Performs WAL checkpoint first to ensure .db is self-contained. | |
| Uses R2 if available, falls back to HF Datasets. | |
| """ | |
| if not self.has_credentials: | |
| return False | |
| if not os.path.exists(self.db_path): | |
| log.warning("No local .db file to upload.") | |
| return False | |
| try: | |
| # WAL checkpoint — merge journal into main .db | |
| self._wal_checkpoint() | |
| success = False | |
| # === R2 PRIMARY === | |
| if self._backend == "r2": | |
| success = self._r2.upload(self.db_path, DB_KEY_IN_R2) | |
| if success: | |
| size_mb = os.path.getsize(self.db_path) / 1_048_576 | |
| log.info(f"Uploaded to R2 ({size_mb:.1f} MB)") | |
| # === HF FALLBACK === | |
| elif self._backend == "hf": | |
| self._hf_api.upload_file( | |
| path_or_fileobj=self.db_path, | |
| path_in_repo=DB_FILENAME_IN_REPO, | |
| repo_id=self.dataset_repo_id, | |
| repo_type="dataset", | |
| commit_message="Auto-backup mnemo v7 database", | |
| ) | |
| success = True | |
| if success: | |
| with self._lock: | |
| self._dirty = False | |
| self._last_sync = time.time() | |
| self._sync_count += 1 | |
| log.info(f"Sync #{self._sync_count} complete ({self._backend}).") | |
| return True | |
| else: | |
| raise RuntimeError("Upload returned False") | |
| except Exception as e: | |
| with self._lock: | |
| self._sync_errors += 1 | |
| log.error(f"Upload failed ({self._backend}): {type(e).__name__}: {e}") | |
| return False | |
| def _wal_checkpoint(self): | |
| """Flush WAL journal into the main .db file.""" | |
| try: | |
| conn = sqlite3.connect(self.db_path, timeout=10) | |
| conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") | |
| conn.close() | |
| except Exception as e: | |
| log.warning(f"WAL checkpoint failed: {e}") | |
| # ========================================================================= | |
| # DIRTY TRACKING | |
| # ========================================================================= | |
| def mark_dirty(self): | |
| """Signal that local .db has changed and needs uploading.""" | |
| with self._lock: | |
| self._dirty = True | |
| def is_dirty(self) -> bool: | |
| with self._lock: | |
| return self._dirty | |
| # ========================================================================= | |
| # BACKGROUND SYNC THREAD | |
| # ========================================================================= | |
| def start_background(self): | |
| """Start background sync daemon thread.""" | |
| if not self.has_credentials: | |
| log.warning("No credentials — background sync disabled.") | |
| return | |
| if self._worker and self._worker.is_alive(): | |
| return | |
| self._stop_event.clear() | |
| self._worker = threading.Thread( | |
| target=self._sync_loop, | |
| daemon=True, | |
| name="mnemo-sync", | |
| ) | |
| self._worker.start() | |
| log.info(f"Background sync started (every {self.sync_interval}s, backend={self._backend}).") | |
| def stop(self): | |
| """Stop background sync. Does a final upload if dirty.""" | |
| self._stop_event.set() | |
| if self._worker and self._worker.is_alive(): | |
| self._worker.join(timeout=5) | |
| if self.is_dirty: | |
| self.upload() | |
| def force_sync(self): | |
| """Force an immediate sync (blocking).""" | |
| if self.is_dirty: | |
| self.upload() | |
| def _sync_loop(self): | |
| """Background sync loop. Runs in daemon thread.""" | |
| while not self._stop_event.is_set(): | |
| self._stop_event.wait(timeout=self.sync_interval) | |
| if self._stop_event.is_set(): | |
| break | |
| if self.is_dirty: | |
| self.upload() | |
| # ========================================================================= | |
| # STATS | |
| # ========================================================================= | |
| def get_stats(self) -> dict: | |
| with self._lock: | |
| return { | |
| "backend": self._backend, | |
| "db_path": self.db_path, | |
| "r2_bucket": self._r2.bucket_name if self._r2.available else None, | |
| "dataset_repo": self.dataset_repo_id if self._backend == "hf" else None, | |
| "has_credentials": self.has_credentials, | |
| "is_dirty": self._dirty, | |
| "last_sync": self._last_sync, | |
| "last_sync_ago": round(time.time() - self._last_sync, 1) if self._last_sync else None, | |
| "sync_count": self._sync_count, | |
| "sync_errors": self._sync_errors, | |
| "sync_interval": self.sync_interval, | |
| "background_running": self._worker.is_alive() if self._worker else False, | |
| "db_exists": os.path.exists(self.db_path), | |
| "db_size_mb": round(os.path.getsize(self.db_path) / 1_048_576, 2) | |
| if os.path.exists(self.db_path) else 0, | |
| } | |