Spaces:
Sleeping
Sleeping
Claude Code
Claude Code: Review the memory system implementation, specifically focusing on git op
0bbe516 | """ | |
| Git-based Memory Bridge for HuggingFace Spaces. | |
| Allows the AI to persist memory to the Dataset repository. | |
| Enhanced with timeout controls and retry mechanisms for reliability. | |
| """ | |
| import logging | |
| import subprocess | |
| import json | |
| import time | |
| from typing import Optional, Dict, Any, List | |
| logger = logging.getLogger(__name__) | |
| # Timeout values for different git operations (in seconds) | |
| TIMEOUT_CONFIG = { | |
| "fetch": 60, # Network operation, can be slow | |
| "pull": 90, # Pull includes merge, may take time | |
| "push": 120, # Push is network-heavy | |
| "commit": 30, # Local operation, should be fast | |
| "status": 15, # Quick local check | |
| "add": 20, # Local operation | |
| "reset": 30, # Local operation | |
| "default": 45, # Fallback for unknown operations | |
| } | |
| class GitOperationError(Exception): | |
| """Custom exception for git operation failures.""" | |
| pass | |
| class GitTimeoutError(GitOperationError): | |
| """Exception raised when git operation times out.""" | |
| pass | |
| class GitMemoryBridge: | |
| """Git-based memory persistence with timeout and retry support.""" | |
| def __init__(self, repo_path: str = "/data", max_retries: int = 3, base_timeout: int = None): | |
| """ | |
| Initialize the Git memory bridge. | |
| Args: | |
| repo_path: Path to the git repository | |
| max_retries: Maximum number of retry attempts for failed operations | |
| base_timeout: Override default timeout (in seconds). None uses operation-specific timeouts. | |
| """ | |
| self.repo_path = repo_path | |
| self.max_retries = max_retries | |
| self.base_timeout = base_timeout | |
| self.git = self._get_git_command() | |
| def _get_git_command(self) -> Optional[str]: | |
| """Check if git is available.""" | |
| try: | |
| result = subprocess.run( | |
| ["git", "--version"], | |
| capture_output=True, | |
| check=True, | |
| timeout=10 | |
| ) | |
| logger.info(f"Git version: {result.stdout.strip()}") | |
| return "git" | |
| except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e: | |
| logger.warning(f"System git not found or timed out: {e}") | |
| return None | |
| def _get_timeout_for_command(self, command: str) -> int: | |
| """Get appropriate timeout based on git command.""" | |
| cmd = command.lower() | |
| if self.base_timeout: | |
| return self.base_timeout | |
| return TIMEOUT_CONFIG.get(cmd, TIMEOUT_CONFIG["default"]) | |
| def _run_git( | |
| self, | |
| args: List[str], | |
| check: bool = True, | |
| timeout: Optional[int] = None, | |
| retry_count: int = 0 | |
| ) -> subprocess.CompletedProcess: | |
| """ | |
| Run a git command with timeout and retry logic. | |
| Args: | |
| args: Git command arguments (e.g., ['push', 'origin', 'main']) | |
| check: Whether to raise an error on non-zero exit code | |
| timeout: Override timeout for this specific call | |
| retry_count: Current retry attempt (internal use) | |
| Returns: | |
| CompletedProcess result | |
| Raises: | |
| GitTimeoutError: If operation times out after all retries | |
| GitOperationError: If operation fails after all retries | |
| RuntimeError: If git is not available | |
| """ | |
| if not self.git: | |
| raise RuntimeError("Git is not available in this environment.") | |
| # Determine timeout | |
| if timeout is None and args: | |
| timeout = self._get_timeout_for_command(args[0]) | |
| elif timeout is None: | |
| timeout = TIMEOUT_CONFIG["default"] | |
| full_cmd = [self.git] + args | |
| cmd_str = ' '.join(full_cmd) | |
| logger.debug(f"Running git command: {cmd_str} (timeout: {timeout}s, attempt: {retry_count + 1}/{self.max_retries + 1})") | |
| attempt = 0 | |
| last_error = None | |
| while attempt <= self.max_retries: | |
| try: | |
| start_time = time.time() | |
| result = subprocess.run( | |
| full_cmd, | |
| cwd=self.repo_path, | |
| capture_output=True, | |
| text=True, | |
| check=check, | |
| timeout=timeout | |
| ) | |
| elapsed = time.time() - start_time | |
| if result.stdout: | |
| logger.debug(f"Git stdout: {result.stdout.strip()[:200]}") | |
| logger.info(f"Git command completed successfully in {elapsed:.2f}s: {args[0]}") | |
| return result | |
| except subprocess.TimeoutExpired as e: | |
| last_error = e | |
| elapsed = time.time() - start_time | |
| logger.warning( | |
| f"Git command timed out after {elapsed:.2f}s: {args[0]} " | |
| f"(attempt {attempt + 1}/{self.max_retries + 1})" | |
| ) | |
| if attempt < self.max_retries: | |
| # Exponential backoff: 2^attempt seconds, max 30s | |
| backoff = min(2 ** attempt, 30) | |
| logger.info(f"Retrying in {backoff}s...") | |
| time.sleep(backoff) | |
| else: | |
| error_msg = f"Git operation timed out after {self.max_retries + 1} attempts: {args[0]}" | |
| logger.error(error_msg) | |
| raise GitTimeoutError(error_msg) from e | |
| except subprocess.CalledProcessError as e: | |
| last_error = e | |
| logger.warning( | |
| f"Git command failed with exit code {e.returncode}: {args[0]} " | |
| f"(attempt {attempt + 1}/{self.max_retries + 1})" | |
| ) | |
| if e.stderr: | |
| logger.debug(f"Git stderr: {e.stderr.strip()[:200]}") | |
| # Don't retry on certain errors | |
| if any(msg in (e.stderr or "") for msg in ["fatal:", "error:"]): | |
| # Permanent error, don't retry | |
| raise GitOperationError(f"Git command failed: {e.stderr.strip()}") from e | |
| if attempt < self.max_retries: | |
| backoff = min(2 ** attempt, 30) | |
| logger.info(f"Retrying in {backoff}s...") | |
| time.sleep(backoff) | |
| else: | |
| error_msg = f"Git operation failed after {self.max_retries + 1} attempts: {args[0]}" | |
| logger.error(error_msg) | |
| raise GitOperationError(error_msg) from e | |
| attempt += 1 | |
| # Should not reach here, but just in case | |
| if last_error: | |
| raise GitOperationError(f"Git operation failed: {last_error}") from last_error | |
| def sync_memory(self) -> bool: | |
| """ | |
| Pulls latest changes from the remote dataset repository. | |
| Returns True if successful, False otherwise. | |
| Uses timeout-controlled git operations with retry logic. | |
| """ | |
| try: | |
| logger.info("Syncing memory from remote dataset...") | |
| # First, fetch to get latest remote state with timeout | |
| fetch_result = self._run_git( | |
| ["fetch", "origin", "main"], | |
| check=False, | |
| timeout=TIMEOUT_CONFIG["fetch"] | |
| ) | |
| if fetch_result.returncode != 0: | |
| logger.warning(f"Fetch failed: {fetch_result.stderr}") | |
| return False | |
| # Pull with strategy X (theirs) to prefer remote state (survival priority) | |
| pull_result = self._run_git( | |
| ["pull", "origin", "main", "--strategy-option=theirs"], | |
| check=False, | |
| timeout=TIMEOUT_CONFIG["pull"] | |
| ) | |
| if pull_result.returncode == 0: | |
| logger.info(f"Memory sync successful: {pull_result.stdout.strip()}") | |
| return True | |
| else: | |
| # If it fails due to divergent branches or history, try a harder reset | |
| if "divergent" in pull_result.stderr or "refusing to merge" in pull_result.stderr: | |
| logger.warning("Git history diverged. Performing hard reset to remote origin/main.") | |
| self._run_git(["reset", "--hard", "origin/main"], timeout=TIMEOUT_CONFIG["reset"]) | |
| return True | |
| else: | |
| logger.error(f"Memory sync failed: {pull_result.stderr}") | |
| return False | |
| except GitTimeoutError as e: | |
| logger.error(f"Memory sync timed out: {e}") | |
| return False | |
| except GitOperationError as e: | |
| logger.error(f"Memory sync operation failed: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Unexpected exception during memory sync: {e}") | |
| return False | |
| def save_memory(self, commit_message: str = "Update memory state") -> bool: | |
| """ | |
| Commits and pushes local changes to the remote dataset repository. | |
| Uses timeout-controlled git operations with retry logic. | |
| Args: | |
| commit_message: Message for the git commit | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # 1. Check status | |
| status = self._run_git(["status", "--porcelain"], timeout=TIMEOUT_CONFIG["status"]) | |
| if not status.stdout.strip(): | |
| logger.info("No new memories to save.") | |
| return True | |
| logger.info(f"Saving memory changes: {status.stdout}") | |
| # 2. Add changes | |
| self._run_git(["add", "-A"], timeout=TIMEOUT_CONFIG["add"]) | |
| # 3. Commit | |
| # Use --no-verify to skip pre-commit hooks that might fail | |
| self._run_git( | |
| ["commit", "-m", commit_message, "--no-verify"], | |
| timeout=TIMEOUT_CONFIG["commit"] | |
| ) | |
| # 4. Push | |
| logger.info("Pushing memory to remote...") | |
| push_result = self._run_git( | |
| ["push", "origin", "main"], | |
| check=False, | |
| timeout=TIMEOUT_CONFIG["push"] | |
| ) | |
| if push_result.returncode == 0: | |
| logger.info("Memory saved successfully.") | |
| return True | |
| else: | |
| # Handle race conditions (others pushed while we were working) | |
| if "rejected" in push_result.stderr: | |
| logger.warning("Remote updated during save. Pulling and retrying...") | |
| self.sync_memory() | |
| # Retry push once | |
| retry_result = self._run_git( | |
| ["push", "origin", "main"], | |
| check=False, | |
| timeout=TIMEOUT_CONFIG["push"] | |
| ) | |
| return retry_result.returncode == 0 | |
| logger.error(f"Push failed: {push_result.stderr}") | |
| return False | |
| except GitTimeoutError as e: | |
| logger.error(f"Memory save timed out: {e}") | |
| return False | |
| except GitOperationError as e: | |
| logger.error(f"Memory save operation failed: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Unexpected exception during memory save: {e}") | |
| return False | |
| def read_file(self, path: str) -> Optional[str]: | |
| """Read a file from the dataset.""" | |
| try: | |
| import os | |
| full_path = os.path.join(self.repo_path, path) | |
| with open(full_path, 'r') as f: | |
| return f.read() | |
| except Exception as e: | |
| logger.error(f"Failed to read file {path}: {e}") | |
| return None | |
| def write_file(self, path: str, content: str) -> bool: | |
| """Write a file to the dataset. Does NOT auto-commit.""" | |
| try: | |
| import os | |
| full_path = os.path.join(self.repo_path, path) | |
| os.makedirs(os.path.dirname(full_path), exist_ok=True) | |
| with open(full_path, 'w') as f: | |
| f.write(content) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to write file {path}: {e}") | |
| return False |