"""HuggingFace Hub storage integration for persistent ledger management.""" import os import time import pandas as pd import tempfile from pathlib import Path from typing import Optional import logging logger = logging.getLogger(__name__) class HFHubLedger: """Manages ledger CSV persistence using HuggingFace Hub storage.""" def __init__( self, hf_token: Optional[str] = None, repo_id: Optional[str] = None, repo_type: str = "dataset", csv_filename: str = "ledger.csv", local_cache_dir: str = "./cache", max_retries: int = 3, retry_delay: float = 1.0, ): """ Initialize HuggingFace Hub ledger storage. Args: hf_token: HuggingFace API token (uses HF_TOKEN env var if not provided) repo_id: Repository ID in format "username/repo-name" repo_type: Type of repo ("dataset", "model", "space") csv_filename: Name of the CSV file in the repo local_cache_dir: Local directory for caching max_retries: Maximum number of upload retries retry_delay: Initial delay between retries (exponential backoff) """ self.hf_token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") self.repo_id = repo_id or os.getenv("HF_REPO_ID") self.repo_type = repo_type self.csv_filename = csv_filename self.local_cache_dir = local_cache_dir self.max_retries = max_retries self.retry_delay = retry_delay self.enabled = self.hf_token and self.repo_id self.df = None # Create local cache directory Path(self.local_cache_dir).mkdir(parents=True, exist_ok=True) self.local_csv_path = Path(self.local_cache_dir) / self.csv_filename if self.enabled: logger.info(f"HF Hub storage enabled: {self.repo_id}") self._ensure_repo_exists() self._load_from_hub() else: logger.warning("HF Hub storage disabled. Set HF_TOKEN and HF_REPO_ID to enable.") self._load_local_or_create() def _ensure_repo_exists(self) -> bool: """ Ensure the HuggingFace Hub repository exists. Returns: True if repo exists or was created, False otherwise """ try: from huggingface_hub import create_repo, repo_exists if repo_exists(self.repo_id, repo_type=self.repo_type, token=self.hf_token): logger.info(f"Repository {self.repo_id} exists") return True # Create repo if it doesn't exist repo_url = create_repo( self.repo_id, repo_type=self.repo_type, private=True, exist_ok=True, token=self.hf_token, ) logger.info(f"Created repository: {repo_url}") return True except Exception as e: logger.error(f"Failed to ensure repo exists: {e}") return False def _load_from_hub(self) -> bool: """ Download and load CSV from HuggingFace Hub. Returns: True if successful, False otherwise """ try: from huggingface_hub import hf_hub_download logger.info(f"Attempting to download {self.csv_filename} from {self.repo_id}") file_path = hf_hub_download( repo_id=self.repo_id, filename=self.csv_filename, repo_type=self.repo_type, token=self.hf_token, cache_dir=self.local_cache_dir, ) # Load CSV self.df = pd.read_csv(file_path) self.df["Date"] = pd.to_datetime(self.df["Date"]) self.df["Amount"] = pd.to_numeric(self.df["Amount"]) self.df = self.df.sort_values("Date", ascending=False).reset_index(drop=True) logger.info(f"Loaded {len(self.df)} entries from HF Hub") return True except Exception as e: logger.warning(f"Could not load from Hub: {e}. Starting fresh.") self._load_local_or_create() return False def _load_local_or_create(self) -> bool: """ Load CSV from local cache or create new DataFrame. Returns: True if loaded, False if created new """ if self.local_csv_path.exists(): try: self.df = pd.read_csv(self.local_csv_path) self.df["Date"] = pd.to_datetime(self.df["Date"]) self.df["Amount"] = pd.to_numeric(self.df["Amount"]) logger.info(f"Loaded {len(self.df)} entries from local cache") return True except Exception as e: logger.warning(f"Failed to load local CSV: {e}") # Create new empty DataFrame self.df = pd.DataFrame(columns=["Date", "Description", "Category", "Amount"]) self.df["Date"] = pd.to_datetime(self.df["Date"]) self.df["Amount"] = pd.to_numeric(self.df["Amount"]) logger.info("Created new empty ledger") return False def save(self, df: pd.DataFrame) -> bool: """ Save DataFrame to local cache and optionally to HF Hub. Args: df: DataFrame to save Returns: True if successful, False otherwise """ try: # Save locally first df_copy = df.copy() df_copy["Date"] = df_copy["Date"].dt.strftime("%Y-%m-%d") df_copy.to_csv(self.local_csv_path, index=False) self.df = df # Upload to Hub if enabled if self.enabled: self._upload_to_hub_with_retry() return True except Exception as e: logger.error(f"Failed to save ledger: {e}") return False def _upload_to_hub_with_retry(self) -> bool: """ Upload CSV to HuggingFace Hub with exponential backoff retry. Returns: True if successful, False otherwise """ for attempt in range(self.max_retries): try: from huggingface_hub import upload_file logger.info(f"Uploading to HF Hub (attempt {attempt + 1}/{self.max_retries})") upload_file( path_or_fileobj=str(self.local_csv_path), path_in_repo=self.csv_filename, repo_id=self.repo_id, repo_type=self.repo_type, token=self.hf_token, commit_message=f"Auto-save ledger at {pd.Timestamp.now()}", ) logger.info("Successfully uploaded to HF Hub") return True except Exception as e: wait_time = self.retry_delay * (2 ** attempt) # Exponential backoff logger.warning(f"Upload failed (attempt {attempt + 1}): {e}") if attempt < self.max_retries - 1: logger.info(f"Retrying in {wait_time:.1f}s...") time.sleep(wait_time) else: logger.error(f"Failed to upload after {self.max_retries} attempts") return False return False def get_dataframe(self) -> pd.DataFrame: """Return a copy of the current DataFrame.""" if self.df is None: return pd.DataFrame(columns=["Date", "Description", "Category", "Amount"]) return self.df.copy() def add_entry(self, date: str, description: str, category: str, amount: float) -> bool: """ Add a new entry and save. Args: date: Date in YYYY-MM-DD format description: Expense description category: Expense category amount: Amount in dollars Returns: True if successful, False otherwise """ try: new_entry = pd.DataFrame({ "Date": [pd.to_datetime(date)], "Description": [description], "Category": [category], "Amount": [float(amount)] }) self.df = pd.concat([self.df, new_entry], ignore_index=True) self.df = self.df.sort_values("Date", ascending=False).reset_index(drop=True) # Save immediately return self.save(self.df) except Exception as e: logger.error(f"Failed to add entry: {e}") return False def get_total_spending(self) -> float: """Calculate and return total spending.""" if self.df is None or self.df.empty: return 0.0 return float(self.df["Amount"].sum()) def get_category_summary(self) -> dict: """Get spending summary by category.""" if self.df is None or self.df.empty: return {} return self.df.groupby("Category")["Amount"].sum().to_dict() def is_enabled(self) -> bool: """Check if HF Hub storage is enabled.""" return self.enabled def get_status(self) -> str: """Get human-readable status string.""" if self.enabled: return f"✅ HF Hub: {self.repo_id}" else: return "⚠️ Local cache only (HF Hub disabled)"