HuggingClaw-Cain / memory /git_repo.py
Claude Code
Claude Code: Review the memory system implementation, specifically focusing on git op
0bbe516
"""
Git-based Memory Bridge for HuggingFace Spaces.
Allows the AI to persist memory to the Dataset repository.
Enhanced with timeout controls and retry mechanisms for reliability.
"""
import logging
import subprocess
import json
import time
from typing import Optional, Dict, Any, List
logger = logging.getLogger(__name__)
# Timeout values for different git operations (in seconds)
TIMEOUT_CONFIG = {
"fetch": 60, # Network operation, can be slow
"pull": 90, # Pull includes merge, may take time
"push": 120, # Push is network-heavy
"commit": 30, # Local operation, should be fast
"status": 15, # Quick local check
"add": 20, # Local operation
"reset": 30, # Local operation
"default": 45, # Fallback for unknown operations
}
class GitOperationError(Exception):
"""Custom exception for git operation failures."""
pass
class GitTimeoutError(GitOperationError):
"""Exception raised when git operation times out."""
pass
class GitMemoryBridge:
"""Git-based memory persistence with timeout and retry support."""
def __init__(self, repo_path: str = "/data", max_retries: int = 3, base_timeout: int = None):
"""
Initialize the Git memory bridge.
Args:
repo_path: Path to the git repository
max_retries: Maximum number of retry attempts for failed operations
base_timeout: Override default timeout (in seconds). None uses operation-specific timeouts.
"""
self.repo_path = repo_path
self.max_retries = max_retries
self.base_timeout = base_timeout
self.git = self._get_git_command()
def _get_git_command(self) -> Optional[str]:
"""Check if git is available."""
try:
result = subprocess.run(
["git", "--version"],
capture_output=True,
check=True,
timeout=10
)
logger.info(f"Git version: {result.stdout.strip()}")
return "git"
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
logger.warning(f"System git not found or timed out: {e}")
return None
def _get_timeout_for_command(self, command: str) -> int:
"""Get appropriate timeout based on git command."""
cmd = command.lower()
if self.base_timeout:
return self.base_timeout
return TIMEOUT_CONFIG.get(cmd, TIMEOUT_CONFIG["default"])
def _run_git(
self,
args: List[str],
check: bool = True,
timeout: Optional[int] = None,
retry_count: int = 0
) -> subprocess.CompletedProcess:
"""
Run a git command with timeout and retry logic.
Args:
args: Git command arguments (e.g., ['push', 'origin', 'main'])
check: Whether to raise an error on non-zero exit code
timeout: Override timeout for this specific call
retry_count: Current retry attempt (internal use)
Returns:
CompletedProcess result
Raises:
GitTimeoutError: If operation times out after all retries
GitOperationError: If operation fails after all retries
RuntimeError: If git is not available
"""
if not self.git:
raise RuntimeError("Git is not available in this environment.")
# Determine timeout
if timeout is None and args:
timeout = self._get_timeout_for_command(args[0])
elif timeout is None:
timeout = TIMEOUT_CONFIG["default"]
full_cmd = [self.git] + args
cmd_str = ' '.join(full_cmd)
logger.debug(f"Running git command: {cmd_str} (timeout: {timeout}s, attempt: {retry_count + 1}/{self.max_retries + 1})")
attempt = 0
last_error = None
while attempt <= self.max_retries:
try:
start_time = time.time()
result = subprocess.run(
full_cmd,
cwd=self.repo_path,
capture_output=True,
text=True,
check=check,
timeout=timeout
)
elapsed = time.time() - start_time
if result.stdout:
logger.debug(f"Git stdout: {result.stdout.strip()[:200]}")
logger.info(f"Git command completed successfully in {elapsed:.2f}s: {args[0]}")
return result
except subprocess.TimeoutExpired as e:
last_error = e
elapsed = time.time() - start_time
logger.warning(
f"Git command timed out after {elapsed:.2f}s: {args[0]} "
f"(attempt {attempt + 1}/{self.max_retries + 1})"
)
if attempt < self.max_retries:
# Exponential backoff: 2^attempt seconds, max 30s
backoff = min(2 ** attempt, 30)
logger.info(f"Retrying in {backoff}s...")
time.sleep(backoff)
else:
error_msg = f"Git operation timed out after {self.max_retries + 1} attempts: {args[0]}"
logger.error(error_msg)
raise GitTimeoutError(error_msg) from e
except subprocess.CalledProcessError as e:
last_error = e
logger.warning(
f"Git command failed with exit code {e.returncode}: {args[0]} "
f"(attempt {attempt + 1}/{self.max_retries + 1})"
)
if e.stderr:
logger.debug(f"Git stderr: {e.stderr.strip()[:200]}")
# Don't retry on certain errors
if any(msg in (e.stderr or "") for msg in ["fatal:", "error:"]):
# Permanent error, don't retry
raise GitOperationError(f"Git command failed: {e.stderr.strip()}") from e
if attempt < self.max_retries:
backoff = min(2 ** attempt, 30)
logger.info(f"Retrying in {backoff}s...")
time.sleep(backoff)
else:
error_msg = f"Git operation failed after {self.max_retries + 1} attempts: {args[0]}"
logger.error(error_msg)
raise GitOperationError(error_msg) from e
attempt += 1
# Should not reach here, but just in case
if last_error:
raise GitOperationError(f"Git operation failed: {last_error}") from last_error
def sync_memory(self) -> bool:
"""
Pulls latest changes from the remote dataset repository.
Returns True if successful, False otherwise.
Uses timeout-controlled git operations with retry logic.
"""
try:
logger.info("Syncing memory from remote dataset...")
# First, fetch to get latest remote state with timeout
fetch_result = self._run_git(
["fetch", "origin", "main"],
check=False,
timeout=TIMEOUT_CONFIG["fetch"]
)
if fetch_result.returncode != 0:
logger.warning(f"Fetch failed: {fetch_result.stderr}")
return False
# Pull with strategy X (theirs) to prefer remote state (survival priority)
pull_result = self._run_git(
["pull", "origin", "main", "--strategy-option=theirs"],
check=False,
timeout=TIMEOUT_CONFIG["pull"]
)
if pull_result.returncode == 0:
logger.info(f"Memory sync successful: {pull_result.stdout.strip()}")
return True
else:
# If it fails due to divergent branches or history, try a harder reset
if "divergent" in pull_result.stderr or "refusing to merge" in pull_result.stderr:
logger.warning("Git history diverged. Performing hard reset to remote origin/main.")
self._run_git(["reset", "--hard", "origin/main"], timeout=TIMEOUT_CONFIG["reset"])
return True
else:
logger.error(f"Memory sync failed: {pull_result.stderr}")
return False
except GitTimeoutError as e:
logger.error(f"Memory sync timed out: {e}")
return False
except GitOperationError as e:
logger.error(f"Memory sync operation failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected exception during memory sync: {e}")
return False
def save_memory(self, commit_message: str = "Update memory state") -> bool:
"""
Commits and pushes local changes to the remote dataset repository.
Uses timeout-controlled git operations with retry logic.
Args:
commit_message: Message for the git commit
Returns:
True if successful, False otherwise
"""
try:
# 1. Check status
status = self._run_git(["status", "--porcelain"], timeout=TIMEOUT_CONFIG["status"])
if not status.stdout.strip():
logger.info("No new memories to save.")
return True
logger.info(f"Saving memory changes: {status.stdout}")
# 2. Add changes
self._run_git(["add", "-A"], timeout=TIMEOUT_CONFIG["add"])
# 3. Commit
# Use --no-verify to skip pre-commit hooks that might fail
self._run_git(
["commit", "-m", commit_message, "--no-verify"],
timeout=TIMEOUT_CONFIG["commit"]
)
# 4. Push
logger.info("Pushing memory to remote...")
push_result = self._run_git(
["push", "origin", "main"],
check=False,
timeout=TIMEOUT_CONFIG["push"]
)
if push_result.returncode == 0:
logger.info("Memory saved successfully.")
return True
else:
# Handle race conditions (others pushed while we were working)
if "rejected" in push_result.stderr:
logger.warning("Remote updated during save. Pulling and retrying...")
self.sync_memory()
# Retry push once
retry_result = self._run_git(
["push", "origin", "main"],
check=False,
timeout=TIMEOUT_CONFIG["push"]
)
return retry_result.returncode == 0
logger.error(f"Push failed: {push_result.stderr}")
return False
except GitTimeoutError as e:
logger.error(f"Memory save timed out: {e}")
return False
except GitOperationError as e:
logger.error(f"Memory save operation failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected exception during memory save: {e}")
return False
def read_file(self, path: str) -> Optional[str]:
"""Read a file from the dataset."""
try:
import os
full_path = os.path.join(self.repo_path, path)
with open(full_path, 'r') as f:
return f.read()
except Exception as e:
logger.error(f"Failed to read file {path}: {e}")
return None
def write_file(self, path: str, content: str) -> bool:
"""Write a file to the dataset. Does NOT auto-commit."""
try:
import os
full_path = os.path.join(self.repo_path, path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w') as f:
f.write(content)
return True
except Exception as e:
logger.error(f"Failed to write file {path}: {e}")
return False