File size: 16,862 Bytes
934cb25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""
Sync Engine β€” SQLite ↔ Cloudflare R2 / HuggingFace Dataset Sync (v7.1)

PRIMARY:  Cloudflare R2 (S3-compatible, zero egress, 10GB free)
FALLBACK: HuggingFace Datasets (original v7.0 behavior)

R2 advantages over HF Datasets:
  - Faster uploads/downloads (S3 API, global CDN vs Git LFS)
  - No HF rate limits or intermittent 503s
  - Zero egress fees (data out is free)
  - 10GB free storage (our .db is ~10-50MB)
  - S3 API = industry standard, battle-tested boto3 client

Architecture:

  Streamlit App (local SQLite)  ←──sync──→  R2 Bucket (backup .db)
                                            HF Space (reads same .db from R2)

Lifecycle:
  1. On startup: download mnemo.db from R2 β†’ local path
  2. If no .db exists, check for legacy mnemo_db.json (HF) and flag migration
  3. Background thread: every SYNC_INTERVAL seconds, if dirty:
     a. WAL checkpoint (flush pending writes to main .db)
     b. Upload .db to R2
  4. On demand: force_sync() for immediate upload

Conflict resolution: last-write-wins (single user, safe for Tina's setup).

Required Streamlit secrets (for R2 primary):
  R2_ACCOUNT_ID       = "your-cloudflare-account-id"
  R2_ACCESS_KEY_ID    = "your-r2-access-key"
  R2_SECRET_ACCESS_KEY = "your-r2-secret-key"
  R2_BUCKET_NAME      = "mnemo"        # optional, defaults to "mnemo"

Fallback HF secrets (existing):
  HF_TOKEN            = "hf_..."
  DATASET_REPO_ID     = "AthelaPerk/Private"

Thread-safe. Non-blocking. All sync operations run in a daemon thread.
"""

import os
import time
import shutil
import sqlite3
import logging
import threading
from typing import Optional
from pathlib import Path

log = logging.getLogger("mnemo.sync")

# Sync interval in seconds
SYNC_INTERVAL = 30

# R2 config defaults
DEFAULT_R2_BUCKET = "mnemo"
DB_KEY_IN_R2 = "mnemo.db"

# HF Dataset config (fallback)
DEFAULT_DATASET_REPO = "AthelaPerk/Private"
DB_FILENAME_IN_REPO = "mnemo.db"
LEGACY_JSON_FILENAME = "mnemo_db.json"


# =============================================================================
# R2 CLIENT (lazy, lightweight S3-compatible)
# =============================================================================

class R2Client:
    """Minimal S3-compatible client for Cloudflare R2.

    Uses boto3 under the hood β€” already in most Python environments.
    Falls back gracefully if credentials are missing.
    """

    def __init__(self, account_id: str = None, access_key_id: str = None,
                 secret_access_key: str = None, bucket_name: str = None):
        self.account_id = account_id or os.environ.get("R2_ACCOUNT_ID", "")
        self.access_key_id = access_key_id or os.environ.get("R2_ACCESS_KEY_ID", "")
        self.secret_access_key = secret_access_key or os.environ.get("R2_SECRET_ACCESS_KEY", "")
        self.bucket_name = bucket_name or os.environ.get("R2_BUCKET_NAME", DEFAULT_R2_BUCKET)
        self._client = None

    @property
    def available(self) -> bool:
        return bool(self.account_id and self.access_key_id and self.secret_access_key)

    @property
    def client(self):
        """Lazy-init boto3 S3 client pointing at R2 endpoint."""
        if self._client is None and self.available:
            import boto3
            self._client = boto3.client(
                "s3",
                endpoint_url=f"https://{self.account_id}.r2.cloudflarestorage.com",
                aws_access_key_id=self.access_key_id,
                aws_secret_access_key=self.secret_access_key,
                region_name="auto",
            )
        return self._client

    def download(self, key: str, local_path: str) -> bool:
        """Download object from R2 to local file."""
        if not self.available:
            return False
        try:
            os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
            self.client.download_file(self.bucket_name, key, local_path)
            return True
        except Exception as e:
            error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "")
            if error_code in ("404", "NoSuchKey"):
                return False  # Object doesn't exist yet β€” normal on first run
            log.warning(f"R2 download failed ({key}): {type(e).__name__}: {e}")
            return False

    def upload(self, local_path: str, key: str) -> bool:
        """Upload local file to R2."""
        if not self.available:
            return False
        try:
            self.client.upload_file(local_path, self.bucket_name, key)
            return True
        except Exception as e:
            log.error(f"R2 upload failed ({key}): {type(e).__name__}: {e}")
            return False

    def exists(self, key: str) -> bool:
        """Check if object exists in R2."""
        if not self.available:
            return False
        try:
            self.client.head_object(Bucket=self.bucket_name, Key=key)
            return True
        except Exception:
            return False


# =============================================================================
# SYNC ENGINE (R2 primary, HF fallback)
# =============================================================================

class SyncEngine:
    """Bidirectional sync between local SQLite and cloud storage.

    Priority: R2 (if credentials set) β†’ HF Datasets (fallback) β†’ disabled.

    Usage:
        sync = SyncEngine(
            db_path="/home/user/.mnemo/mnemo.db",
            hf_token="hf_...",
            dataset_repo_id="AthelaPerk/Private",
        )
        sync.download()          # On startup β€” get latest .db
        sync.start_background()  # Start daemon sync thread

        # ... app runs, writes to SQLite ...

        sync.mark_dirty()        # After writes, signal upload needed
        sync.force_sync()        # Immediate upload (e.g., before shutdown)
        sync.stop()              # Clean shutdown
    """

    def __init__(self, db_path: str, hf_token: str = None,
                 dataset_repo_id: str = None,
                 sync_interval: int = SYNC_INTERVAL):
        self.db_path = db_path
        self.hf_token = hf_token or os.environ.get("HF_TOKEN", "")
        self.dataset_repo_id = dataset_repo_id or os.environ.get(
            "DATASET_REPO_ID", DEFAULT_DATASET_REPO)
        self.sync_interval = sync_interval

        self._dirty = False
        self._lock = threading.Lock()
        self._stop_event = threading.Event()
        self._worker: Optional[threading.Thread] = None
        self._last_sync: float = 0
        self._sync_count: int = 0
        self._sync_errors: int = 0

        # R2 client (primary)
        self._r2 = R2Client()

        # HF API (fallback, lazy init)
        self._hf_api_instance = None

        # Determine backend
        if self._r2.available:
            self._backend = "r2"
            log.info("[SYNC] βœ… Using Cloudflare R2 (primary)")
            print("[SYNC] βœ… Using Cloudflare R2")
        elif self.hf_token:
            self._backend = "hf"
            log.info("[SYNC] Using HuggingFace Datasets (fallback)")
            print("[SYNC] Using HuggingFace Datasets (fallback)")
        else:
            self._backend = "none"
            log.warning("[SYNC] No sync credentials β€” running offline")
            print("[SYNC] ⚠️  No sync credentials β€” running offline")

    @property
    def backend(self) -> str:
        """Current sync backend: 'r2', 'hf', or 'none'."""
        return self._backend

    @property
    def _hf_api(self):
        """Lazy-init HfApi for fallback."""
        if self._hf_api_instance is None and self.hf_token:
            from huggingface_hub import HfApi
            self._hf_api_instance = HfApi(token=self.hf_token)
        return self._hf_api_instance

    @property
    def has_credentials(self) -> bool:
        return self._backend != "none"

    # =========================================================================
    # DOWNLOAD (startup)
    # =========================================================================

    def download(self) -> bool:
        """Download .db from cloud storage. Returns True if downloaded.

        Tries R2 first, falls back to HF Datasets.
        If neither has a .db, checks HF for legacy JSON for migration.
        """
        if not self.has_credentials:
            log.warning("No sync credentials β€” skipping download.")
            return False

        os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True)

        # === R2 PRIMARY ===
        if self._backend == "r2":
            if self._r2.download(DB_KEY_IN_R2, self.db_path):
                size_mb = os.path.getsize(self.db_path) / 1_048_576
                log.info(f"Downloaded {DB_KEY_IN_R2} from R2 ({size_mb:.1f} MB)")
                print(f"[SYNC] Downloaded mnemo.db from R2 ({size_mb:.1f} MB)")
                return True
            log.info("No .db in R2 β€” checking HF for legacy data...")
            # Fall through to HF for legacy migration check

        # === HF DATASETS (primary if no R2, or fallback for legacy) ===
        if self.hf_token:
            # Try .db from HF
            if self._backend == "hf":
                if self._download_hf_file(DB_FILENAME_IN_REPO, self.db_path):
                    log.info(f"Downloaded {DB_FILENAME_IN_REPO} from HF Datasets.")
                    return True

            # Try legacy JSON for migration
            legacy_path = self._legacy_json_path()
            if self._download_hf_file(LEGACY_JSON_FILENAME, legacy_path):
                log.info(f"Downloaded legacy {LEGACY_JSON_FILENAME} for migration.")
                return False  # Signal caller to run migration

        log.info("No existing data found β€” starting fresh.")
        return False

    def _download_hf_file(self, filename: str, local_path: str) -> bool:
        """Download a single file from HF Datasets repo."""
        try:
            import concurrent.futures
            from huggingface_hub import hf_hub_download

            with concurrent.futures.ThreadPoolExecutor() as executor:
                future = executor.submit(
                    hf_hub_download,
                    repo_id=self.dataset_repo_id,
                    filename=filename,
                    repo_type="dataset",
                    token=self.hf_token,
                    force_download=True,
                )
                try:
                    downloaded = future.result(timeout=60)
                except concurrent.futures.TimeoutError:
                    log.warning(f"HF download timed out: {filename}")
                    return False

            os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
            shutil.copy2(downloaded, local_path)
            return True

        except Exception as e:
            if "EntryNotFoundError" in type(e).__name__ or "404" in str(e):
                return False
            log.warning(f"HF download failed ({filename}): {type(e).__name__}: {e}")
            return False

    def _legacy_json_path(self) -> str:
        return os.path.join(
            os.path.dirname(self.db_path) or ".",
            "mnemo_legacy.json"
        )

    def get_legacy_json_path(self) -> Optional[str]:
        path = self._legacy_json_path()
        return path if os.path.exists(path) else None

    # =========================================================================
    # UPLOAD (background sync)
    # =========================================================================

    def upload(self) -> bool:
        """Upload local .db to cloud storage.

        Performs WAL checkpoint first to ensure .db is self-contained.
        Uses R2 if available, falls back to HF Datasets.
        """
        if not self.has_credentials:
            return False

        if not os.path.exists(self.db_path):
            log.warning("No local .db file to upload.")
            return False

        try:
            # WAL checkpoint β€” merge journal into main .db
            self._wal_checkpoint()

            success = False

            # === R2 PRIMARY ===
            if self._backend == "r2":
                success = self._r2.upload(self.db_path, DB_KEY_IN_R2)
                if success:
                    size_mb = os.path.getsize(self.db_path) / 1_048_576
                    log.info(f"Uploaded to R2 ({size_mb:.1f} MB)")

            # === HF FALLBACK ===
            elif self._backend == "hf":
                self._hf_api.upload_file(
                    path_or_fileobj=self.db_path,
                    path_in_repo=DB_FILENAME_IN_REPO,
                    repo_id=self.dataset_repo_id,
                    repo_type="dataset",
                    commit_message="Auto-backup mnemo v7 database",
                )
                success = True

            if success:
                with self._lock:
                    self._dirty = False
                    self._last_sync = time.time()
                    self._sync_count += 1
                log.info(f"Sync #{self._sync_count} complete ({self._backend}).")
                return True
            else:
                raise RuntimeError("Upload returned False")

        except Exception as e:
            with self._lock:
                self._sync_errors += 1
            log.error(f"Upload failed ({self._backend}): {type(e).__name__}: {e}")
            return False

    def _wal_checkpoint(self):
        """Flush WAL journal into the main .db file."""
        try:
            conn = sqlite3.connect(self.db_path, timeout=10)
            conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
            conn.close()
        except Exception as e:
            log.warning(f"WAL checkpoint failed: {e}")

    # =========================================================================
    # DIRTY TRACKING
    # =========================================================================

    def mark_dirty(self):
        """Signal that local .db has changed and needs uploading."""
        with self._lock:
            self._dirty = True

    @property
    def is_dirty(self) -> bool:
        with self._lock:
            return self._dirty

    # =========================================================================
    # BACKGROUND SYNC THREAD
    # =========================================================================

    def start_background(self):
        """Start background sync daemon thread."""
        if not self.has_credentials:
            log.warning("No credentials β€” background sync disabled.")
            return

        if self._worker and self._worker.is_alive():
            return

        self._stop_event.clear()
        self._worker = threading.Thread(
            target=self._sync_loop,
            daemon=True,
            name="mnemo-sync",
        )
        self._worker.start()
        log.info(f"Background sync started (every {self.sync_interval}s, backend={self._backend}).")

    def stop(self):
        """Stop background sync. Does a final upload if dirty."""
        self._stop_event.set()
        if self._worker and self._worker.is_alive():
            self._worker.join(timeout=5)

        if self.is_dirty:
            self.upload()

    def force_sync(self):
        """Force an immediate sync (blocking)."""
        if self.is_dirty:
            self.upload()

    def _sync_loop(self):
        """Background sync loop. Runs in daemon thread."""
        while not self._stop_event.is_set():
            self._stop_event.wait(timeout=self.sync_interval)
            if self._stop_event.is_set():
                break
            if self.is_dirty:
                self.upload()

    # =========================================================================
    # STATS
    # =========================================================================

    def get_stats(self) -> dict:
        with self._lock:
            return {
                "backend": self._backend,
                "db_path": self.db_path,
                "r2_bucket": self._r2.bucket_name if self._r2.available else None,
                "dataset_repo": self.dataset_repo_id if self._backend == "hf" else None,
                "has_credentials": self.has_credentials,
                "is_dirty": self._dirty,
                "last_sync": self._last_sync,
                "last_sync_ago": round(time.time() - self._last_sync, 1) if self._last_sync else None,
                "sync_count": self._sync_count,
                "sync_errors": self._sync_errors,
                "sync_interval": self.sync_interval,
                "background_running": self._worker.is_alive() if self._worker else False,
                "db_exists": os.path.exists(self.db_path),
                "db_size_mb": round(os.path.getsize(self.db_path) / 1_048_576, 2)
                              if os.path.exists(self.db_path) else 0,
            }