Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| V4A Patch Format Parser | |
| Parses the V4A patch format used by codex, cline, and other coding agents. | |
| V4A Format: | |
| *** Begin Patch | |
| *** Update File: path/to/file.py | |
| @@ optional context hint @@ | |
| context line (space prefix) | |
| -removed line (minus prefix) | |
| +added line (plus prefix) | |
| *** Add File: path/to/new.py | |
| +new file content | |
| +line 2 | |
| *** Delete File: path/to/old.py | |
| *** Move File: old/path.py -> new/path.py | |
| *** End Patch | |
| Usage: | |
| from tools.patch_parser import parse_v4a_patch, apply_v4a_operations | |
| operations, error = parse_v4a_patch(patch_content) | |
| if error: | |
| print(f"Parse error: {error}") | |
| else: | |
| result = apply_v4a_operations(operations, file_ops) | |
| """ | |
| import difflib | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional, Tuple, Any | |
| from enum import Enum | |
| class OperationType(Enum): | |
| ADD = "add" | |
| UPDATE = "update" | |
| DELETE = "delete" | |
| MOVE = "move" | |
| class HunkLine: | |
| """A single line in a patch hunk.""" | |
| prefix: str # ' ', '-', or '+' | |
| content: str | |
| class Hunk: | |
| """A group of changes within a file.""" | |
| context_hint: Optional[str] = None | |
| lines: List[HunkLine] = field(default_factory=list) | |
| class PatchOperation: | |
| """A single operation in a V4A patch.""" | |
| operation: OperationType | |
| file_path: str | |
| new_path: Optional[str] = None # For move operations | |
| hunks: List[Hunk] = field(default_factory=list) | |
| content: Optional[str] = None # For add file operations | |
| def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]: | |
| """ | |
| Parse a V4A format patch. | |
| Args: | |
| patch_content: The patch text in V4A format | |
| Returns: | |
| Tuple of (operations, error_message) | |
| - If successful: (list_of_operations, None) | |
| - If failed: ([], error_description) | |
| """ | |
| lines = patch_content.split('\n') | |
| operations: List[PatchOperation] = [] | |
| # Find patch boundaries | |
| start_idx = None | |
| end_idx = None | |
| for i, line in enumerate(lines): | |
| if '*** Begin Patch' in line or '***Begin Patch' in line: | |
| start_idx = i | |
| elif '*** End Patch' in line or '***End Patch' in line: | |
| end_idx = i | |
| break | |
| if start_idx is None: | |
| # Try to parse without explicit begin marker | |
| start_idx = -1 | |
| if end_idx is None: | |
| end_idx = len(lines) | |
| # Parse operations between boundaries | |
| i = start_idx + 1 | |
| current_op: Optional[PatchOperation] = None | |
| current_hunk: Optional[Hunk] = None | |
| while i < end_idx: | |
| line = lines[i] | |
| # Check for file operation markers | |
| update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line) | |
| add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line) | |
| delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line) | |
| move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line) | |
| if update_match: | |
| # Save previous operation | |
| if current_op: | |
| if current_hunk and current_hunk.lines: | |
| current_op.hunks.append(current_hunk) | |
| operations.append(current_op) | |
| current_op = PatchOperation( | |
| operation=OperationType.UPDATE, | |
| file_path=update_match.group(1).strip() | |
| ) | |
| current_hunk = None | |
| elif add_match: | |
| if current_op: | |
| if current_hunk and current_hunk.lines: | |
| current_op.hunks.append(current_hunk) | |
| operations.append(current_op) | |
| current_op = PatchOperation( | |
| operation=OperationType.ADD, | |
| file_path=add_match.group(1).strip() | |
| ) | |
| current_hunk = Hunk() | |
| elif delete_match: | |
| if current_op: | |
| if current_hunk and current_hunk.lines: | |
| current_op.hunks.append(current_hunk) | |
| operations.append(current_op) | |
| current_op = PatchOperation( | |
| operation=OperationType.DELETE, | |
| file_path=delete_match.group(1).strip() | |
| ) | |
| operations.append(current_op) | |
| current_op = None | |
| current_hunk = None | |
| elif move_match: | |
| if current_op: | |
| if current_hunk and current_hunk.lines: | |
| current_op.hunks.append(current_hunk) | |
| operations.append(current_op) | |
| current_op = PatchOperation( | |
| operation=OperationType.MOVE, | |
| file_path=move_match.group(1).strip(), | |
| new_path=move_match.group(2).strip() | |
| ) | |
| operations.append(current_op) | |
| current_op = None | |
| current_hunk = None | |
| elif line.startswith('@@'): | |
| # Context hint / hunk marker | |
| if current_op: | |
| if current_hunk and current_hunk.lines: | |
| current_op.hunks.append(current_hunk) | |
| # Extract context hint | |
| hint_match = re.match(r'@@\s*(.+?)\s*@@', line) | |
| hint = hint_match.group(1) if hint_match else None | |
| current_hunk = Hunk(context_hint=hint) | |
| elif current_op and line: | |
| # Parse hunk line | |
| if current_hunk is None: | |
| current_hunk = Hunk() | |
| if line.startswith('+'): | |
| current_hunk.lines.append(HunkLine('+', line[1:])) | |
| elif line.startswith('-'): | |
| current_hunk.lines.append(HunkLine('-', line[1:])) | |
| elif line.startswith(' '): | |
| current_hunk.lines.append(HunkLine(' ', line[1:])) | |
| elif line.startswith('\\'): | |
| # "\ No newline at end of file" marker - skip | |
| pass | |
| else: | |
| # Treat as context line (implicit space prefix) | |
| current_hunk.lines.append(HunkLine(' ', line)) | |
| i += 1 | |
| # Don't forget the last operation | |
| if current_op: | |
| if current_hunk and current_hunk.lines: | |
| current_op.hunks.append(current_hunk) | |
| operations.append(current_op) | |
| # Validate the parsed result | |
| if not operations: | |
| # Empty patch is not an error — callers get [] and can decide | |
| return operations, None | |
| parse_errors: List[str] = [] | |
| for op in operations: | |
| if not op.file_path: | |
| parse_errors.append("Operation with empty file path") | |
| if op.operation == OperationType.UPDATE and not op.hunks: | |
| parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found") | |
| if op.operation == OperationType.MOVE and not op.new_path: | |
| parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')") | |
| if parse_errors: | |
| return [], "Parse error: " + "; ".join(parse_errors) | |
| return operations, None | |
| def _count_occurrences(text: str, pattern: str) -> int: | |
| """Count non-overlapping occurrences of *pattern* in *text*.""" | |
| count = 0 | |
| start = 0 | |
| while True: | |
| pos = text.find(pattern, start) | |
| if pos == -1: | |
| break | |
| count += 1 | |
| start = pos + 1 | |
| return count | |
| def _validate_operations( | |
| operations: List[PatchOperation], | |
| file_ops: Any, | |
| ) -> List[str]: | |
| """Validate all operations without writing any files. | |
| Returns a list of error strings; an empty list means all operations | |
| are valid and the apply phase can proceed safely. | |
| For UPDATE operations, hunks are simulated in order so that later | |
| hunks validate against post-earlier-hunk content (matching apply order). | |
| """ | |
| # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency | |
| from tools.fuzzy_match import fuzzy_find_and_replace | |
| errors: List[str] = [] | |
| for op in operations: | |
| if op.operation == OperationType.UPDATE: | |
| read_result = file_ops.read_file_raw(op.file_path) | |
| if read_result.error: | |
| errors.append(f"{op.file_path}: {read_result.error}") | |
| continue | |
| simulated = read_result.content | |
| for hunk in op.hunks: | |
| search_lines = [l.content for l in hunk.lines if l.prefix in (' ', '-')] | |
| if not search_lines: | |
| # Addition-only hunk: validate context hint uniqueness | |
| if hunk.context_hint: | |
| occurrences = _count_occurrences(simulated, hunk.context_hint) | |
| if occurrences == 0: | |
| errors.append( | |
| f"{op.file_path}: addition-only hunk context hint " | |
| f"'{hunk.context_hint}' not found" | |
| ) | |
| elif occurrences > 1: | |
| errors.append( | |
| f"{op.file_path}: addition-only hunk context hint " | |
| f"'{hunk.context_hint}' is ambiguous " | |
| f"({occurrences} occurrences)" | |
| ) | |
| continue | |
| search_pattern = '\n'.join(search_lines) | |
| replace_lines = [l.content for l in hunk.lines if l.prefix in (' ', '+')] | |
| replacement = '\n'.join(replace_lines) | |
| new_simulated, count, _strategy, match_error = fuzzy_find_and_replace( | |
| simulated, search_pattern, replacement, replace_all=False | |
| ) | |
| if count == 0: | |
| label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)" | |
| errors.append( | |
| f"{op.file_path}: hunk {label} not found" | |
| + (f" — {match_error}" if match_error else "") | |
| ) | |
| else: | |
| # Advance simulation so subsequent hunks validate correctly. | |
| # Reuse the result from the call above — no second fuzzy run. | |
| simulated = new_simulated | |
| elif op.operation == OperationType.DELETE: | |
| read_result = file_ops.read_file_raw(op.file_path) | |
| if read_result.error: | |
| errors.append(f"{op.file_path}: file not found for deletion") | |
| elif op.operation == OperationType.MOVE: | |
| if not op.new_path: | |
| errors.append(f"{op.file_path}: MOVE operation missing destination path") | |
| continue | |
| src_result = file_ops.read_file_raw(op.file_path) | |
| if src_result.error: | |
| errors.append(f"{op.file_path}: source file not found for move") | |
| dst_result = file_ops.read_file_raw(op.new_path) | |
| if not dst_result.error: | |
| errors.append( | |
| f"{op.new_path}: destination already exists — move would overwrite" | |
| ) | |
| # ADD: parent directory creation handled by write_file; no pre-check needed. | |
| return errors | |
| def apply_v4a_operations(operations: List[PatchOperation], | |
| file_ops: Any) -> 'PatchResult': | |
| """Apply V4A patch operations using a file operations interface. | |
| Uses a two-phase validate-then-apply approach: | |
| - Phase 1: validate all operations against current file contents without | |
| writing anything. If any validation error is found, return immediately | |
| with no filesystem changes. | |
| - Phase 2: apply all operations. A failure here (e.g. a race between | |
| validation and apply) is reported with a note to run ``git diff``. | |
| Args: | |
| operations: List of PatchOperation from parse_v4a_patch | |
| file_ops: Object with read_file_raw, write_file methods | |
| Returns: | |
| PatchResult with results of all operations | |
| """ | |
| # Import here to avoid circular imports | |
| from tools.file_operations import PatchResult | |
| # ---- Phase 1: validate ---- | |
| validation_errors = _validate_operations(operations, file_ops) | |
| if validation_errors: | |
| return PatchResult( | |
| success=False, | |
| error="Patch validation failed (no files were modified):\n" | |
| + "\n".join(f" • {e}" for e in validation_errors), | |
| ) | |
| # ---- Phase 2: apply ---- | |
| files_modified = [] | |
| files_created = [] | |
| files_deleted = [] | |
| all_diffs = [] | |
| errors = [] | |
| for op in operations: | |
| try: | |
| if op.operation == OperationType.ADD: | |
| result = _apply_add(op, file_ops) | |
| if result[0]: | |
| files_created.append(op.file_path) | |
| all_diffs.append(result[1]) | |
| else: | |
| errors.append(f"Failed to add {op.file_path}: {result[1]}") | |
| elif op.operation == OperationType.DELETE: | |
| result = _apply_delete(op, file_ops) | |
| if result[0]: | |
| files_deleted.append(op.file_path) | |
| all_diffs.append(result[1]) | |
| else: | |
| errors.append(f"Failed to delete {op.file_path}: {result[1]}") | |
| elif op.operation == OperationType.MOVE: | |
| result = _apply_move(op, file_ops) | |
| if result[0]: | |
| files_modified.append(f"{op.file_path} -> {op.new_path}") | |
| all_diffs.append(result[1]) | |
| else: | |
| errors.append(f"Failed to move {op.file_path}: {result[1]}") | |
| elif op.operation == OperationType.UPDATE: | |
| result = _apply_update(op, file_ops) | |
| if result[0]: | |
| files_modified.append(op.file_path) | |
| all_diffs.append(result[1]) | |
| else: | |
| errors.append(f"Failed to update {op.file_path}: {result[1]}") | |
| except Exception as e: | |
| errors.append(f"Error processing {op.file_path}: {str(e)}") | |
| # Run lint on all modified/created files | |
| lint_results = {} | |
| for f in files_modified + files_created: | |
| if hasattr(file_ops, '_check_lint'): | |
| lint_result = file_ops._check_lint(f) | |
| lint_results[f] = lint_result.to_dict() | |
| combined_diff = '\n'.join(all_diffs) | |
| if errors: | |
| return PatchResult( | |
| success=False, | |
| diff=combined_diff, | |
| files_modified=files_modified, | |
| files_created=files_created, | |
| files_deleted=files_deleted, | |
| lint=lint_results if lint_results else None, | |
| error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n" | |
| + "\n".join(f" • {e}" for e in errors), | |
| ) | |
| return PatchResult( | |
| success=True, | |
| diff=combined_diff, | |
| files_modified=files_modified, | |
| files_created=files_created, | |
| files_deleted=files_deleted, | |
| lint=lint_results if lint_results else None, | |
| ) | |
| def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: | |
| """Apply an add file operation.""" | |
| # Extract content from hunks (all + lines) | |
| content_lines = [] | |
| for hunk in op.hunks: | |
| for line in hunk.lines: | |
| if line.prefix == '+': | |
| content_lines.append(line.content) | |
| content = '\n'.join(content_lines) | |
| result = file_ops.write_file(op.file_path, content) | |
| if result.error: | |
| return False, result.error | |
| diff = f"--- /dev/null\n+++ b/{op.file_path}\n" | |
| diff += '\n'.join(f"+{line}" for line in content_lines) | |
| return True, diff | |
| def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: | |
| """Apply a delete file operation.""" | |
| # Read before deleting so we can produce a real unified diff. | |
| # Validation already confirmed existence; this guards against races. | |
| read_result = file_ops.read_file_raw(op.file_path) | |
| if read_result.error: | |
| return False, f"Cannot delete {op.file_path}: file not found" | |
| result = file_ops.delete_file(op.file_path) | |
| if result.error: | |
| return False, result.error | |
| removed_lines = read_result.content.splitlines(keepends=True) | |
| diff = ''.join(difflib.unified_diff( | |
| removed_lines, [], | |
| fromfile=f"a/{op.file_path}", | |
| tofile="/dev/null", | |
| )) | |
| return True, diff or f"# Deleted: {op.file_path}" | |
| def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: | |
| """Apply a move file operation.""" | |
| result = file_ops.move_file(op.file_path, op.new_path) | |
| if result.error: | |
| return False, result.error | |
| diff = f"# Moved: {op.file_path} -> {op.new_path}" | |
| return True, diff | |
| def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: | |
| """Apply an update file operation.""" | |
| # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency | |
| from tools.fuzzy_match import fuzzy_find_and_replace | |
| # Read current content — raw so no line-number prefixes or per-line truncation | |
| read_result = file_ops.read_file_raw(op.file_path) | |
| if read_result.error: | |
| return False, f"Cannot read file: {read_result.error}" | |
| current_content = read_result.content | |
| # Apply each hunk | |
| new_content = current_content | |
| for hunk in op.hunks: | |
| # Build search pattern from context and removed lines | |
| search_lines = [] | |
| replace_lines = [] | |
| for line in hunk.lines: | |
| if line.prefix == ' ': | |
| search_lines.append(line.content) | |
| replace_lines.append(line.content) | |
| elif line.prefix == '-': | |
| search_lines.append(line.content) | |
| elif line.prefix == '+': | |
| replace_lines.append(line.content) | |
| if search_lines: | |
| search_pattern = '\n'.join(search_lines) | |
| replacement = '\n'.join(replace_lines) | |
| new_content, count, _strategy, error = fuzzy_find_and_replace( | |
| new_content, search_pattern, replacement, replace_all=False | |
| ) | |
| if error and count == 0: | |
| # Try with context hint if available | |
| if hunk.context_hint: | |
| # Find the context hint location and search nearby | |
| hint_pos = new_content.find(hunk.context_hint) | |
| if hint_pos != -1: | |
| # Search in a window around the hint | |
| window_start = max(0, hint_pos - 500) | |
| window_end = min(len(new_content), hint_pos + 2000) | |
| window = new_content[window_start:window_end] | |
| window_new, count, _strategy, error = fuzzy_find_and_replace( | |
| window, search_pattern, replacement, replace_all=False | |
| ) | |
| if count > 0: | |
| new_content = new_content[:window_start] + window_new + new_content[window_end:] | |
| error = None | |
| if error: | |
| return False, f"Could not apply hunk: {error}" | |
| else: | |
| # Addition-only hunk (no context or removed lines). | |
| # Insert at the location indicated by the context hint, or at end of file. | |
| insert_text = '\n'.join(replace_lines) | |
| if hunk.context_hint: | |
| occurrences = _count_occurrences(new_content, hunk.context_hint) | |
| if occurrences == 0: | |
| # Hint not found — append at end as a safe fallback | |
| new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' | |
| elif occurrences > 1: | |
| return False, ( | |
| f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous " | |
| f"({occurrences} occurrences) — provide a more unique hint" | |
| ) | |
| else: | |
| hint_pos = new_content.find(hunk.context_hint) | |
| # Insert after the line containing the context hint | |
| eol = new_content.find('\n', hint_pos) | |
| if eol != -1: | |
| new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:] | |
| else: | |
| new_content = new_content + '\n' + insert_text | |
| else: | |
| new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' | |
| # Write new content | |
| write_result = file_ops.write_file(op.file_path, new_content) | |
| if write_result.error: | |
| return False, write_result.error | |
| # Generate diff | |
| diff_lines = difflib.unified_diff( | |
| current_content.splitlines(keepends=True), | |
| new_content.splitlines(keepends=True), | |
| fromfile=f"a/{op.file_path}", | |
| tofile=f"b/{op.file_path}" | |
| ) | |
| diff = ''.join(diff_lines) | |
| return True, diff | |