#!/usr/bin/env python3 """ V4A Patch Format Parser Parses the V4A patch format used by codex, cline, and other coding agents. V4A Format: *** Begin Patch *** Update File: path/to/file.py @@ optional context hint @@ context line (space prefix) -removed line (minus prefix) +added line (plus prefix) *** Add File: path/to/new.py +new file content +line 2 *** Delete File: path/to/old.py *** Move File: old/path.py -> new/path.py *** End Patch Usage: from tools.patch_parser import parse_v4a_patch, apply_v4a_operations operations, error = parse_v4a_patch(patch_content) if error: print(f"Parse error: {error}") else: result = apply_v4a_operations(operations, file_ops) """ import re from dataclasses import dataclass, field from typing import List, Optional, Tuple, Any from enum import Enum class OperationType(Enum): ADD = "add" UPDATE = "update" DELETE = "delete" MOVE = "move" @dataclass class HunkLine: """A single line in a patch hunk.""" prefix: str # ' ', '-', or '+' content: str @dataclass class Hunk: """A group of changes within a file.""" context_hint: Optional[str] = None lines: List[HunkLine] = field(default_factory=list) @dataclass class PatchOperation: """A single operation in a V4A patch.""" operation: OperationType file_path: str new_path: Optional[str] = None # For move operations hunks: List[Hunk] = field(default_factory=list) content: Optional[str] = None # For add file operations def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]: """ Parse a V4A format patch. Args: patch_content: The patch text in V4A format Returns: Tuple of (operations, error_message) - If successful: (list_of_operations, None) - If failed: ([], error_description) """ lines = patch_content.split('\n') operations: List[PatchOperation] = [] # Find patch boundaries start_idx = None end_idx = None for i, line in enumerate(lines): if '*** Begin Patch' in line or '***Begin Patch' in line: start_idx = i elif '*** End Patch' in line or '***End Patch' in line: end_idx = i break if start_idx is None: # Try to parse without explicit begin marker start_idx = -1 if end_idx is None: end_idx = len(lines) # Parse operations between boundaries i = start_idx + 1 current_op: Optional[PatchOperation] = None current_hunk: Optional[Hunk] = None while i < end_idx: line = lines[i] # Check for file operation markers update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line) add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line) delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line) move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line) if update_match: # Save previous operation if current_op: if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) operations.append(current_op) current_op = PatchOperation( operation=OperationType.UPDATE, file_path=update_match.group(1).strip() ) current_hunk = None elif add_match: if current_op: if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) operations.append(current_op) current_op = PatchOperation( operation=OperationType.ADD, file_path=add_match.group(1).strip() ) current_hunk = Hunk() elif delete_match: if current_op: if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) operations.append(current_op) current_op = PatchOperation( operation=OperationType.DELETE, file_path=delete_match.group(1).strip() ) operations.append(current_op) current_op = None current_hunk = None elif move_match: if current_op: if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) operations.append(current_op) current_op = PatchOperation( operation=OperationType.MOVE, file_path=move_match.group(1).strip(), new_path=move_match.group(2).strip() ) operations.append(current_op) current_op = None current_hunk = None elif line.startswith('@@'): # Context hint / hunk marker if current_op: if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) # Extract context hint hint_match = re.match(r'@@\s*(.+?)\s*@@', line) hint = hint_match.group(1) if hint_match else None current_hunk = Hunk(context_hint=hint) elif current_op and line: # Parse hunk line if current_hunk is None: current_hunk = Hunk() if line.startswith('+'): current_hunk.lines.append(HunkLine('+', line[1:])) elif line.startswith('-'): current_hunk.lines.append(HunkLine('-', line[1:])) elif line.startswith(' '): current_hunk.lines.append(HunkLine(' ', line[1:])) elif line.startswith('\\'): # "\ No newline at end of file" marker - skip pass else: # Treat as context line (implicit space prefix) current_hunk.lines.append(HunkLine(' ', line)) i += 1 # Don't forget the last operation if current_op: if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) operations.append(current_op) return operations, None def apply_v4a_operations(operations: List[PatchOperation], file_ops: Any) -> 'PatchResult': """ Apply V4A patch operations using a file operations interface. Args: operations: List of PatchOperation from parse_v4a_patch file_ops: Object with read_file, write_file methods Returns: PatchResult with results of all operations """ # Import here to avoid circular imports from tools.file_operations import PatchResult files_modified = [] files_created = [] files_deleted = [] all_diffs = [] errors = [] for op in operations: try: if op.operation == OperationType.ADD: result = _apply_add(op, file_ops) if result[0]: files_created.append(op.file_path) all_diffs.append(result[1]) else: errors.append(f"Failed to add {op.file_path}: {result[1]}") elif op.operation == OperationType.DELETE: result = _apply_delete(op, file_ops) if result[0]: files_deleted.append(op.file_path) all_diffs.append(result[1]) else: errors.append(f"Failed to delete {op.file_path}: {result[1]}") elif op.operation == OperationType.MOVE: result = _apply_move(op, file_ops) if result[0]: files_modified.append(f"{op.file_path} -> {op.new_path}") all_diffs.append(result[1]) else: errors.append(f"Failed to move {op.file_path}: {result[1]}") elif op.operation == OperationType.UPDATE: result = _apply_update(op, file_ops) if result[0]: files_modified.append(op.file_path) all_diffs.append(result[1]) else: errors.append(f"Failed to update {op.file_path}: {result[1]}") except Exception as e: errors.append(f"Error processing {op.file_path}: {str(e)}") # Run lint on all modified/created files lint_results = {} for f in files_modified + files_created: if hasattr(file_ops, '_check_lint'): lint_result = file_ops._check_lint(f) lint_results[f] = lint_result.to_dict() combined_diff = '\n'.join(all_diffs) if errors: return PatchResult( success=False, diff=combined_diff, files_modified=files_modified, files_created=files_created, files_deleted=files_deleted, lint=lint_results if lint_results else None, error='; '.join(errors) ) return PatchResult( success=True, diff=combined_diff, files_modified=files_modified, files_created=files_created, files_deleted=files_deleted, lint=lint_results if lint_results else None ) def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply an add file operation.""" # Extract content from hunks (all + lines) content_lines = [] for hunk in op.hunks: for line in hunk.lines: if line.prefix == '+': content_lines.append(line.content) content = '\n'.join(content_lines) result = file_ops.write_file(op.file_path, content) if result.error: return False, result.error diff = f"--- /dev/null\n+++ b/{op.file_path}\n" diff += '\n'.join(f"+{line}" for line in content_lines) return True, diff def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply a delete file operation.""" # Read file first for diff read_result = file_ops.read_file(op.file_path) if read_result.error and "not found" in read_result.error.lower(): # File doesn't exist, nothing to delete return True, f"# {op.file_path} already deleted or doesn't exist" # Delete directly via shell command using the underlying environment rm_result = file_ops._exec(f"rm -f {file_ops._escape_shell_arg(op.file_path)}") if rm_result.exit_code != 0: return False, rm_result.stdout diff = f"--- a/{op.file_path}\n+++ /dev/null\n# File deleted" return True, diff def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply a move file operation.""" # Use shell mv command mv_result = file_ops._exec( f"mv {file_ops._escape_shell_arg(op.file_path)} {file_ops._escape_shell_arg(op.new_path)}" ) if mv_result.exit_code != 0: return False, mv_result.stdout diff = f"# Moved: {op.file_path} -> {op.new_path}" return True, diff def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply an update file operation.""" # Read current content read_result = file_ops.read_file(op.file_path, limit=10000) if read_result.error: return False, f"Cannot read file: {read_result.error}" # Parse content (remove line numbers) current_lines = [] for line in read_result.content.split('\n'): if re.match(r'^\s*\d+\|', line): # Line format: " 123|content" parts = line.split('|', 1) if len(parts) == 2: current_lines.append(parts[1]) else: current_lines.append(line) else: current_lines.append(line) current_content = '\n'.join(current_lines) # Apply each hunk new_content = current_content for hunk in op.hunks: # Build search pattern from context and removed lines search_lines = [] replace_lines = [] for line in hunk.lines: if line.prefix == ' ': search_lines.append(line.content) replace_lines.append(line.content) elif line.prefix == '-': search_lines.append(line.content) elif line.prefix == '+': replace_lines.append(line.content) if search_lines: search_pattern = '\n'.join(search_lines) replacement = '\n'.join(replace_lines) # Use fuzzy matching from tools.fuzzy_match import fuzzy_find_and_replace new_content, count, error = fuzzy_find_and_replace( new_content, search_pattern, replacement, replace_all=False ) if error and count == 0: # Try with context hint if available if hunk.context_hint: # Find the context hint location and search nearby hint_pos = new_content.find(hunk.context_hint) if hint_pos != -1: # Search in a window around the hint window_start = max(0, hint_pos - 500) window_end = min(len(new_content), hint_pos + 2000) window = new_content[window_start:window_end] window_new, count, error = fuzzy_find_and_replace( window, search_pattern, replacement, replace_all=False ) if count > 0: new_content = new_content[:window_start] + window_new + new_content[window_end:] error = None if error: return False, f"Could not apply hunk: {error}" # Write new content write_result = file_ops.write_file(op.file_path, new_content) if write_result.error: return False, write_result.error # Generate diff import difflib diff_lines = difflib.unified_diff( current_content.splitlines(keepends=True), new_content.splitlines(keepends=True), fromfile=f"a/{op.file_path}", tofile=f"b/{op.file_path}" ) diff = ''.join(diff_lines) return True, diff