Spaces:
Configuration error
Configuration error
| """ | |
| Virtual Me — Persistent Memory Sync | |
| =================================== | |
| Syncs conversation history + personality profile to a HuggingFace Dataset | |
| so the clone survives Space restarts and sleeps. | |
| Usage: | |
| 1. Set HF_TOKEN in Space Secrets (auto-provided in HF Spaces) | |
| 2. Memory auto-pushes to `camdog920/virtual-me-memory` dataset | |
| 3. On restart, memory auto-pulls from the dataset | |
| """ | |
| import os | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| # HuggingFace Hub for dataset push/pull | |
| from huggingface_hub import HfApi, create_repo, hf_hub_download, upload_file | |
| DATASET_REPO = os.getenv("VIRTUAL_ME_MEMORY_REPO", "camdog920/virtual-me-memory") | |
| MEMORY_DIR = Path("./virtual_me_memory") | |
| def _get_api() -> Optional[HfApi]: | |
| token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN") | |
| if not token: | |
| return None | |
| return HfApi(token=token) | |
| def _ensure_repo_exists(api: HfApi): | |
| """Create the dataset repo if it doesn't exist.""" | |
| try: | |
| create_repo(DATASET_REPO, repo_type="dataset", token=api.token, exist_ok=True) | |
| except Exception as e: | |
| print(f"[MemorySync] Repo ensure failed: {e}") | |
| def push_memory() -> str: | |
| """Push current memory files to HF Dataset repo.""" | |
| api = _get_api() | |
| if api is None: | |
| return "⚠️ HF_TOKEN not set — memory is local-only (will be lost on restart)." | |
| _ensure_repo_exists(api) | |
| pushed = [] | |
| for fname in ["personality_profile.json", "conversation_history.jsonl"]: | |
| fpath = MEMORY_DIR / fname | |
| if not fpath.exists(): | |
| continue | |
| try: | |
| upload_file( | |
| path_or_fileobj=str(fpath), | |
| path_in_repo=fname, | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| token=api.token, | |
| commit_message=f"Memory sync @ {time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}", | |
| ) | |
| pushed.append(fname) | |
| except Exception as e: | |
| print(f"[MemorySync] Push failed for {fname}: {e}") | |
| if pushed: | |
| return f"✅ Memory synced to [{DATASET_REPO}](https://huggingface.co/datasets/{DATASET_REPO}): {', '.join(pushed)}" | |
| return "ℹ️ Nothing to sync." | |
| def pull_memory() -> str: | |
| """Pull memory files from HF Dataset repo into local memory dir.""" | |
| api = _get_api() | |
| if api is None: | |
| return "⚠️ HF_TOKEN not set — cannot restore remote memory." | |
| MEMORY_DIR.mkdir(parents=True, exist_ok=True) | |
| restored = [] | |
| for fname in ["personality_profile.json", "conversation_history.jsonl"]: | |
| try: | |
| downloaded = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=fname, | |
| repo_type="dataset", | |
| token=api.token, | |
| local_dir=str(MEMORY_DIR), | |
| local_dir_use_symlinks=False, | |
| ) | |
| restored.append(fname) | |
| print(f"[MemorySync] Restored {fname} from dataset.") | |
| except Exception as e: | |
| print(f"[MemorySync] Pull failed for {fname}: {e}") | |
| if restored: | |
| return f"✅ Memory restored from [{DATASET_REPO}](https://huggingface.co/datasets/{DATASET_REPO}): {', '.join(restored)}" | |
| return "ℹ️ No remote memory found — starting fresh." | |
| def download_memory_zip() -> str: | |
| """Create a ZIP of all memory files for user download.""" | |
| import zipfile | |
| zip_path = "/tmp/virtual_me_memory.zip" | |
| with zipfile.ZipFile(zip_path, "w") as zf: | |
| for fpath in MEMORY_DIR.rglob("*"): | |
| if fpath.is_file(): | |
| zf.write(str(fpath), arcname=fpath.relative_to(MEMORY_DIR)) | |
| return zip_path | |