mnemo / sync_engine.py
AthelaPerk's picture
Rename sync_engine_v7.py to sync_engine.py
2535710 verified
"""
Sync Engine — SQLite ↔ Cloudflare R2 / HuggingFace Dataset Sync (v7.1)
PRIMARY: Cloudflare R2 (S3-compatible, zero egress, 10GB free)
FALLBACK: HuggingFace Datasets (original v7.0 behavior)
R2 advantages over HF Datasets:
- Faster uploads/downloads (S3 API, global CDN vs Git LFS)
- No HF rate limits or intermittent 503s
- Zero egress fees (data out is free)
- 10GB free storage (our .db is ~10-50MB)
- S3 API = industry standard, battle-tested boto3 client
Architecture:
Streamlit App (local SQLite) ←──sync──→ R2 Bucket (backup .db)
HF Space (reads same .db from R2)
Lifecycle:
1. On startup: download mnemo.db from R2 → local path
2. If no .db exists, check for legacy mnemo_db.json (HF) and flag migration
3. Background thread: every SYNC_INTERVAL seconds, if dirty:
a. WAL checkpoint (flush pending writes to main .db)
b. Upload .db to R2
4. On demand: force_sync() for immediate upload
Conflict resolution: last-write-wins (single user, safe for Tina's setup).
Required Streamlit secrets (for R2 primary):
R2_ACCOUNT_ID = "your-cloudflare-account-id"
R2_ACCESS_KEY_ID = "your-r2-access-key"
R2_SECRET_ACCESS_KEY = "your-r2-secret-key"
R2_BUCKET_NAME = "mnemo" # optional, defaults to "mnemo"
Fallback HF secrets (existing):
HF_TOKEN = "hf_..."
DATASET_REPO_ID = "AthelaPerk/Private"
Thread-safe. Non-blocking. All sync operations run in a daemon thread.
"""
import os
import time
import shutil
import sqlite3
import logging
import threading
from typing import Optional
from pathlib import Path
log = logging.getLogger("mnemo.sync")
# Sync interval in seconds
SYNC_INTERVAL = 30
# R2 config defaults
DEFAULT_R2_BUCKET = "mnemo"
DB_KEY_IN_R2 = "mnemo.db"
# HF Dataset config (fallback)
DEFAULT_DATASET_REPO = "AthelaPerk/Private"
DB_FILENAME_IN_REPO = "mnemo.db"
LEGACY_JSON_FILENAME = "mnemo_db.json"
# =============================================================================
# R2 CLIENT (lazy, lightweight S3-compatible)
# =============================================================================
class R2Client:
"""Minimal S3-compatible client for Cloudflare R2.
Uses boto3 under the hood — already in most Python environments.
Falls back gracefully if credentials are missing.
"""
def __init__(self, account_id: str = None, access_key_id: str = None,
secret_access_key: str = None, bucket_name: str = None):
self.account_id = account_id or os.environ.get("R2_ACCOUNT_ID", "")
self.access_key_id = access_key_id or os.environ.get("R2_ACCESS_KEY_ID", "")
self.secret_access_key = secret_access_key or os.environ.get("R2_SECRET_ACCESS_KEY", "")
self.bucket_name = bucket_name or os.environ.get("R2_BUCKET_NAME", DEFAULT_R2_BUCKET)
self._client = None
@property
def available(self) -> bool:
return bool(self.account_id and self.access_key_id and self.secret_access_key)
@property
def client(self):
"""Lazy-init boto3 S3 client pointing at R2 endpoint."""
if self._client is None and self.available:
import boto3
self._client = boto3.client(
"s3",
endpoint_url=f"https://{self.account_id}.r2.cloudflarestorage.com",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name="auto",
)
return self._client
def download(self, key: str, local_path: str) -> bool:
"""Download object from R2 to local file."""
if not self.available:
return False
try:
os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
self.client.download_file(self.bucket_name, key, local_path)
return True
except Exception as e:
error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "")
if error_code in ("404", "NoSuchKey"):
return False # Object doesn't exist yet — normal on first run
log.warning(f"R2 download failed ({key}): {type(e).__name__}: {e}")
return False
def upload(self, local_path: str, key: str) -> bool:
"""Upload local file to R2."""
if not self.available:
return False
try:
self.client.upload_file(local_path, self.bucket_name, key)
return True
except Exception as e:
log.error(f"R2 upload failed ({key}): {type(e).__name__}: {e}")
return False
def exists(self, key: str) -> bool:
"""Check if object exists in R2."""
if not self.available:
return False
try:
self.client.head_object(Bucket=self.bucket_name, Key=key)
return True
except Exception:
return False
# =============================================================================
# SYNC ENGINE (R2 primary, HF fallback)
# =============================================================================
class SyncEngine:
"""Bidirectional sync between local SQLite and cloud storage.
Priority: R2 (if credentials set) → HF Datasets (fallback) → disabled.
Usage:
sync = SyncEngine(
db_path="/home/user/.mnemo/mnemo.db",
hf_token="hf_...",
dataset_repo_id="AthelaPerk/Private",
)
sync.download() # On startup — get latest .db
sync.start_background() # Start daemon sync thread
# ... app runs, writes to SQLite ...
sync.mark_dirty() # After writes, signal upload needed
sync.force_sync() # Immediate upload (e.g., before shutdown)
sync.stop() # Clean shutdown
"""
def __init__(self, db_path: str, hf_token: str = None,
dataset_repo_id: str = None,
sync_interval: int = SYNC_INTERVAL):
self.db_path = db_path
self.hf_token = hf_token or os.environ.get("HF_TOKEN", "")
self.dataset_repo_id = dataset_repo_id or os.environ.get(
"DATASET_REPO_ID", DEFAULT_DATASET_REPO)
self.sync_interval = sync_interval
self._dirty = False
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._worker: Optional[threading.Thread] = None
self._last_sync: float = 0
self._sync_count: int = 0
self._sync_errors: int = 0
# R2 client (primary)
self._r2 = R2Client()
# HF API (fallback, lazy init)
self._hf_api_instance = None
# Determine backend
if self._r2.available:
self._backend = "r2"
log.info("[SYNC] ✅ Using Cloudflare R2 (primary)")
print("[SYNC] ✅ Using Cloudflare R2")
elif self.hf_token:
self._backend = "hf"
log.info("[SYNC] Using HuggingFace Datasets (fallback)")
print("[SYNC] Using HuggingFace Datasets (fallback)")
else:
self._backend = "none"
log.warning("[SYNC] No sync credentials — running offline")
print("[SYNC] ⚠️ No sync credentials — running offline")
@property
def backend(self) -> str:
"""Current sync backend: 'r2', 'hf', or 'none'."""
return self._backend
@property
def _hf_api(self):
"""Lazy-init HfApi for fallback."""
if self._hf_api_instance is None and self.hf_token:
from huggingface_hub import HfApi
self._hf_api_instance = HfApi(token=self.hf_token)
return self._hf_api_instance
@property
def has_credentials(self) -> bool:
return self._backend != "none"
# =========================================================================
# DOWNLOAD (startup)
# =========================================================================
def download(self) -> bool:
"""Download .db from cloud storage. Returns True if downloaded.
Tries R2 first, falls back to HF Datasets.
If neither has a .db, checks HF for legacy JSON for migration.
"""
if not self.has_credentials:
log.warning("No sync credentials — skipping download.")
return False
os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True)
# === R2 PRIMARY ===
if self._backend == "r2":
if self._r2.download(DB_KEY_IN_R2, self.db_path):
size_mb = os.path.getsize(self.db_path) / 1_048_576
log.info(f"Downloaded {DB_KEY_IN_R2} from R2 ({size_mb:.1f} MB)")
print(f"[SYNC] Downloaded mnemo.db from R2 ({size_mb:.1f} MB)")
return True
log.info("No .db in R2 — checking HF for legacy data...")
# Fall through to HF for legacy migration check
# === HF DATASETS (primary if no R2, or fallback for legacy) ===
if self.hf_token:
# Try .db from HF
if self._backend == "hf":
if self._download_hf_file(DB_FILENAME_IN_REPO, self.db_path):
log.info(f"Downloaded {DB_FILENAME_IN_REPO} from HF Datasets.")
return True
# Try legacy JSON for migration
legacy_path = self._legacy_json_path()
if self._download_hf_file(LEGACY_JSON_FILENAME, legacy_path):
log.info(f"Downloaded legacy {LEGACY_JSON_FILENAME} for migration.")
return False # Signal caller to run migration
log.info("No existing data found — starting fresh.")
return False
def _download_hf_file(self, filename: str, local_path: str) -> bool:
"""Download a single file from HF Datasets repo."""
try:
import concurrent.futures
from huggingface_hub import hf_hub_download
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
hf_hub_download,
repo_id=self.dataset_repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token,
force_download=True,
)
try:
downloaded = future.result(timeout=60)
except concurrent.futures.TimeoutError:
log.warning(f"HF download timed out: {filename}")
return False
os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
shutil.copy2(downloaded, local_path)
return True
except Exception as e:
if "EntryNotFoundError" in type(e).__name__ or "404" in str(e):
return False
log.warning(f"HF download failed ({filename}): {type(e).__name__}: {e}")
return False
def _legacy_json_path(self) -> str:
return os.path.join(
os.path.dirname(self.db_path) or ".",
"mnemo_legacy.json"
)
def get_legacy_json_path(self) -> Optional[str]:
path = self._legacy_json_path()
return path if os.path.exists(path) else None
# =========================================================================
# UPLOAD (background sync)
# =========================================================================
def upload(self) -> bool:
"""Upload local .db to cloud storage.
Performs WAL checkpoint first to ensure .db is self-contained.
Uses R2 if available, falls back to HF Datasets.
"""
if not self.has_credentials:
return False
if not os.path.exists(self.db_path):
log.warning("No local .db file to upload.")
return False
try:
# WAL checkpoint — merge journal into main .db
self._wal_checkpoint()
success = False
# === R2 PRIMARY ===
if self._backend == "r2":
success = self._r2.upload(self.db_path, DB_KEY_IN_R2)
if success:
size_mb = os.path.getsize(self.db_path) / 1_048_576
log.info(f"Uploaded to R2 ({size_mb:.1f} MB)")
# === HF FALLBACK ===
elif self._backend == "hf":
self._hf_api.upload_file(
path_or_fileobj=self.db_path,
path_in_repo=DB_FILENAME_IN_REPO,
repo_id=self.dataset_repo_id,
repo_type="dataset",
commit_message="Auto-backup mnemo v7 database",
)
success = True
if success:
with self._lock:
self._dirty = False
self._last_sync = time.time()
self._sync_count += 1
log.info(f"Sync #{self._sync_count} complete ({self._backend}).")
return True
else:
raise RuntimeError("Upload returned False")
except Exception as e:
with self._lock:
self._sync_errors += 1
log.error(f"Upload failed ({self._backend}): {type(e).__name__}: {e}")
return False
def _wal_checkpoint(self):
"""Flush WAL journal into the main .db file."""
try:
conn = sqlite3.connect(self.db_path, timeout=10)
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
conn.close()
except Exception as e:
log.warning(f"WAL checkpoint failed: {e}")
# =========================================================================
# DIRTY TRACKING
# =========================================================================
def mark_dirty(self):
"""Signal that local .db has changed and needs uploading."""
with self._lock:
self._dirty = True
@property
def is_dirty(self) -> bool:
with self._lock:
return self._dirty
# =========================================================================
# BACKGROUND SYNC THREAD
# =========================================================================
def start_background(self):
"""Start background sync daemon thread."""
if not self.has_credentials:
log.warning("No credentials — background sync disabled.")
return
if self._worker and self._worker.is_alive():
return
self._stop_event.clear()
self._worker = threading.Thread(
target=self._sync_loop,
daemon=True,
name="mnemo-sync",
)
self._worker.start()
log.info(f"Background sync started (every {self.sync_interval}s, backend={self._backend}).")
def stop(self):
"""Stop background sync. Does a final upload if dirty."""
self._stop_event.set()
if self._worker and self._worker.is_alive():
self._worker.join(timeout=5)
if self.is_dirty:
self.upload()
def force_sync(self):
"""Force an immediate sync (blocking)."""
if self.is_dirty:
self.upload()
def _sync_loop(self):
"""Background sync loop. Runs in daemon thread."""
while not self._stop_event.is_set():
self._stop_event.wait(timeout=self.sync_interval)
if self._stop_event.is_set():
break
if self.is_dirty:
self.upload()
# =========================================================================
# STATS
# =========================================================================
def get_stats(self) -> dict:
with self._lock:
return {
"backend": self._backend,
"db_path": self.db_path,
"r2_bucket": self._r2.bucket_name if self._r2.available else None,
"dataset_repo": self.dataset_repo_id if self._backend == "hf" else None,
"has_credentials": self.has_credentials,
"is_dirty": self._dirty,
"last_sync": self._last_sync,
"last_sync_ago": round(time.time() - self._last_sync, 1) if self._last_sync else None,
"sync_count": self._sync_count,
"sync_errors": self._sync_errors,
"sync_interval": self.sync_interval,
"background_running": self._worker.is_alive() if self._worker else False,
"db_exists": os.path.exists(self.db_path),
"db_size_mb": round(os.path.getsize(self.db_path) / 1_048_576, 2)
if os.path.exists(self.db_path) else 0,
}