""" Git-based Memory Bridge for HuggingFace Spaces. Allows the AI to persist memory to the Dataset repository. Enhanced with timeout controls and retry mechanisms for reliability. """ import logging import subprocess import json import time from typing import Optional, Dict, Any, List logger = logging.getLogger(__name__) # Timeout values for different git operations (in seconds) TIMEOUT_CONFIG = { "fetch": 60, # Network operation, can be slow "pull": 90, # Pull includes merge, may take time "push": 120, # Push is network-heavy "commit": 30, # Local operation, should be fast "status": 15, # Quick local check "add": 20, # Local operation "reset": 30, # Local operation "default": 45, # Fallback for unknown operations } class GitOperationError(Exception): """Custom exception for git operation failures.""" pass class GitTimeoutError(GitOperationError): """Exception raised when git operation times out.""" pass class GitMemoryBridge: """Git-based memory persistence with timeout and retry support.""" def __init__(self, repo_path: str = "/data", max_retries: int = 3, base_timeout: int = None): """ Initialize the Git memory bridge. Args: repo_path: Path to the git repository max_retries: Maximum number of retry attempts for failed operations base_timeout: Override default timeout (in seconds). None uses operation-specific timeouts. """ self.repo_path = repo_path self.max_retries = max_retries self.base_timeout = base_timeout self.git = self._get_git_command() def _get_git_command(self) -> Optional[str]: """Check if git is available.""" try: result = subprocess.run( ["git", "--version"], capture_output=True, check=True, timeout=10 ) logger.info(f"Git version: {result.stdout.strip()}") return "git" except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e: logger.warning(f"System git not found or timed out: {e}") return None def _get_timeout_for_command(self, command: str) -> int: """Get appropriate timeout based on git command.""" cmd = command.lower() if self.base_timeout: return self.base_timeout return TIMEOUT_CONFIG.get(cmd, TIMEOUT_CONFIG["default"]) def _run_git( self, args: List[str], check: bool = True, timeout: Optional[int] = None, retry_count: int = 0 ) -> subprocess.CompletedProcess: """ Run a git command with timeout and retry logic. Args: args: Git command arguments (e.g., ['push', 'origin', 'main']) check: Whether to raise an error on non-zero exit code timeout: Override timeout for this specific call retry_count: Current retry attempt (internal use) Returns: CompletedProcess result Raises: GitTimeoutError: If operation times out after all retries GitOperationError: If operation fails after all retries RuntimeError: If git is not available """ if not self.git: raise RuntimeError("Git is not available in this environment.") # Determine timeout if timeout is None and args: timeout = self._get_timeout_for_command(args[0]) elif timeout is None: timeout = TIMEOUT_CONFIG["default"] full_cmd = [self.git] + args cmd_str = ' '.join(full_cmd) logger.debug(f"Running git command: {cmd_str} (timeout: {timeout}s, attempt: {retry_count + 1}/{self.max_retries + 1})") attempt = 0 last_error = None while attempt <= self.max_retries: try: start_time = time.time() result = subprocess.run( full_cmd, cwd=self.repo_path, capture_output=True, text=True, check=check, timeout=timeout ) elapsed = time.time() - start_time if result.stdout: logger.debug(f"Git stdout: {result.stdout.strip()[:200]}") logger.info(f"Git command completed successfully in {elapsed:.2f}s: {args[0]}") return result except subprocess.TimeoutExpired as e: last_error = e elapsed = time.time() - start_time logger.warning( f"Git command timed out after {elapsed:.2f}s: {args[0]} " f"(attempt {attempt + 1}/{self.max_retries + 1})" ) if attempt < self.max_retries: # Exponential backoff: 2^attempt seconds, max 30s backoff = min(2 ** attempt, 30) logger.info(f"Retrying in {backoff}s...") time.sleep(backoff) else: error_msg = f"Git operation timed out after {self.max_retries + 1} attempts: {args[0]}" logger.error(error_msg) raise GitTimeoutError(error_msg) from e except subprocess.CalledProcessError as e: last_error = e logger.warning( f"Git command failed with exit code {e.returncode}: {args[0]} " f"(attempt {attempt + 1}/{self.max_retries + 1})" ) if e.stderr: logger.debug(f"Git stderr: {e.stderr.strip()[:200]}") # Don't retry on certain errors if any(msg in (e.stderr or "") for msg in ["fatal:", "error:"]): # Permanent error, don't retry raise GitOperationError(f"Git command failed: {e.stderr.strip()}") from e if attempt < self.max_retries: backoff = min(2 ** attempt, 30) logger.info(f"Retrying in {backoff}s...") time.sleep(backoff) else: error_msg = f"Git operation failed after {self.max_retries + 1} attempts: {args[0]}" logger.error(error_msg) raise GitOperationError(error_msg) from e attempt += 1 # Should not reach here, but just in case if last_error: raise GitOperationError(f"Git operation failed: {last_error}") from last_error def sync_memory(self) -> bool: """ Pulls latest changes from the remote dataset repository. Returns True if successful, False otherwise. Uses timeout-controlled git operations with retry logic. """ try: logger.info("Syncing memory from remote dataset...") # First, fetch to get latest remote state with timeout fetch_result = self._run_git( ["fetch", "origin", "main"], check=False, timeout=TIMEOUT_CONFIG["fetch"] ) if fetch_result.returncode != 0: logger.warning(f"Fetch failed: {fetch_result.stderr}") return False # Pull with strategy X (theirs) to prefer remote state (survival priority) pull_result = self._run_git( ["pull", "origin", "main", "--strategy-option=theirs"], check=False, timeout=TIMEOUT_CONFIG["pull"] ) if pull_result.returncode == 0: logger.info(f"Memory sync successful: {pull_result.stdout.strip()}") return True else: # If it fails due to divergent branches or history, try a harder reset if "divergent" in pull_result.stderr or "refusing to merge" in pull_result.stderr: logger.warning("Git history diverged. Performing hard reset to remote origin/main.") self._run_git(["reset", "--hard", "origin/main"], timeout=TIMEOUT_CONFIG["reset"]) return True else: logger.error(f"Memory sync failed: {pull_result.stderr}") return False except GitTimeoutError as e: logger.error(f"Memory sync timed out: {e}") return False except GitOperationError as e: logger.error(f"Memory sync operation failed: {e}") return False except Exception as e: logger.error(f"Unexpected exception during memory sync: {e}") return False def save_memory(self, commit_message: str = "Update memory state") -> bool: """ Commits and pushes local changes to the remote dataset repository. Uses timeout-controlled git operations with retry logic. Args: commit_message: Message for the git commit Returns: True if successful, False otherwise """ try: # 1. Check status status = self._run_git(["status", "--porcelain"], timeout=TIMEOUT_CONFIG["status"]) if not status.stdout.strip(): logger.info("No new memories to save.") return True logger.info(f"Saving memory changes: {status.stdout}") # 2. Add changes self._run_git(["add", "-A"], timeout=TIMEOUT_CONFIG["add"]) # 3. Commit # Use --no-verify to skip pre-commit hooks that might fail self._run_git( ["commit", "-m", commit_message, "--no-verify"], timeout=TIMEOUT_CONFIG["commit"] ) # 4. Push logger.info("Pushing memory to remote...") push_result = self._run_git( ["push", "origin", "main"], check=False, timeout=TIMEOUT_CONFIG["push"] ) if push_result.returncode == 0: logger.info("Memory saved successfully.") return True else: # Handle race conditions (others pushed while we were working) if "rejected" in push_result.stderr: logger.warning("Remote updated during save. Pulling and retrying...") self.sync_memory() # Retry push once retry_result = self._run_git( ["push", "origin", "main"], check=False, timeout=TIMEOUT_CONFIG["push"] ) return retry_result.returncode == 0 logger.error(f"Push failed: {push_result.stderr}") return False except GitTimeoutError as e: logger.error(f"Memory save timed out: {e}") return False except GitOperationError as e: logger.error(f"Memory save operation failed: {e}") return False except Exception as e: logger.error(f"Unexpected exception during memory save: {e}") return False def read_file(self, path: str) -> Optional[str]: """Read a file from the dataset.""" try: import os full_path = os.path.join(self.repo_path, path) with open(full_path, 'r') as f: return f.read() except Exception as e: logger.error(f"Failed to read file {path}: {e}") return None def write_file(self, path: str, content: str) -> bool: """Write a file to the dataset. Does NOT auto-commit.""" try: import os full_path = os.path.join(self.repo_path, path) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, 'w') as f: f.write(content) return True except Exception as e: logger.error(f"Failed to write file {path}: {e}") return False