| |
| |
|
|
| import os |
| import fcntl |
| import logging |
| from datetime import datetime |
| from huggingface_hub import hf_hub_download, HfApi |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from app.core.config import settings |
|
|
| logger = logging.getLogger("romeo_sync") |
| api = HfApi() |
| scheduler = BackgroundScheduler() |
|
|
| |
| HF_TOKEN = settings.HF_TOKEN |
| REPO_ID = settings.HF_DATASET_REPO |
| DB_NAME = "romeo_research.db" |
| LOCAL_DATA_DIR = "./data" |
| LOCAL_PATH = os.path.join(LOCAL_DATA_DIR, DB_NAME) |
|
|
| def download_db_from_hf(): |
| """Startup: Syncs DB with local directory creation.""" |
| os.makedirs(LOCAL_DATA_DIR, exist_ok=True) |
| |
| if not REPO_ID or not HF_TOKEN: |
| logger.info("Running in local-only mode (no HF sync variables found)") |
| return |
| |
| try: |
| logger.info(f"Downloading {DB_NAME} from {REPO_ID}...") |
| hf_hub_download( |
| repo_id=REPO_ID, |
| filename=DB_NAME, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| local_dir=LOCAL_DATA_DIR |
| ) |
| logger.info("Database successfully synchronized.") |
| except Exception as e: |
| logger.warning(f"No existing DB found on HF (First Run): {e}") |
|
|
| def backup_db_to_hf(): |
| """Uploads with file locking to prevent corruption during active writes.""" |
| if not REPO_ID or not HF_TOKEN or not os.path.exists(LOCAL_PATH): |
| return |
| |
| try: |
| |
| with open(LOCAL_PATH, 'rb') as f: |
| fcntl.flock(f, fcntl.LOCK_SH) |
| api.upload_file( |
| path_or_fileobj=LOCAL_PATH, |
| path_in_repo=DB_NAME, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| commit_message=f"Romeo AI Backup: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
| ) |
| fcntl.flock(f, fcntl.LOCK_UN) |
| logger.info("HF Backup completed successfully.") |
| except Exception as e: |
| logger.error(f"Backup failed: {e}") |
|
|
| def start_backup_scheduler(): |
| """Initialize the 5-minute interval backup.""" |
| if HF_TOKEN and REPO_ID: |
| scheduler.add_job(backup_db_to_hf, 'interval', minutes=5) |
| scheduler.start() |
| logger.info("HF backup scheduler started (5min interval)") |
|
|
| def stop_backup_scheduler(): |
| """Graceful shutdown for the scheduler.""" |
| if scheduler.running: |
| scheduler.shutdown() |
|
|