Spaces:
Running
Running
| """CLI subcommand: ``dataforge repair <path> [--dry-run | --apply]``.""" | |
| from __future__ import annotations | |
| import hashlib | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Annotated | |
| import pandas as pd | |
| import typer | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from dataforge.cli.common import load_schema, read_csv | |
| from dataforge.detectors import run_all_detectors | |
| from dataforge.detectors.base import Issue, Schema | |
| from dataforge.repairers import build_repairers | |
| from dataforge.repairers.base import ProposedFix, RepairAttempt, RetryContext | |
| from dataforge.safety import SafetyContext, SafetyFilter, SafetyResult, SafetyVerdict | |
| from dataforge.transactions.log import ( | |
| append_applied_event, | |
| append_created_transaction, | |
| cache_dir_for, | |
| sha256_bytes, | |
| snapshot_path_for, | |
| ) | |
| from dataforge.transactions.txn import CellFix, RepairTransaction, generate_txn_id | |
| from dataforge.ui.repair_diff import render_repair_diff | |
| from dataforge.verifier import SMTVerifier, VerificationVerdict | |
| _console = Console(stderr=True) | |
| def apply_fixes_to_csv(path: Path, fixes: list[CellFix]) -> str: | |
| """Apply ordered cell fixes to a CSV and return the post-state SHA-256. | |
| Args: | |
| path: Source CSV path. | |
| fixes: Ordered list of cell fixes to apply. | |
| Returns: | |
| SHA-256 of the written file bytes. | |
| Raises: | |
| ValueError: If a fix references a missing row/column or stale old value. | |
| """ | |
| df = read_csv(path) | |
| for fix in fixes: | |
| if fix.operation != "update": | |
| raise ValueError(f"Unsupported repair operation '{fix.operation}' for row {fix.row}.") | |
| if fix.column not in df.columns: | |
| raise ValueError(f"Column '{fix.column}' not found in '{path}'.") | |
| if fix.row < 0 or fix.row >= len(df.index): | |
| raise ValueError(f"Row {fix.row} is out of bounds for '{path}'.") | |
| current_value = str(df.at[fix.row, fix.column]) | |
| if current_value != fix.old_value: | |
| raise ValueError( | |
| f"Refusing to apply stale fix for row {fix.row}, column '{fix.column}': " | |
| f"expected '{fix.old_value}', found '{current_value}'." | |
| ) | |
| df.at[fix.row, fix.column] = fix.new_value | |
| df.to_csv(path, index=False, lineterminator="\n") | |
| return hashlib.sha256(path.read_bytes()).hexdigest() | |
| def _resolve_schema(schema_path: Path | None) -> Schema | None: | |
| """Resolve an optional schema path into a parsed Schema.""" | |
| if schema_path is None: | |
| return None | |
| return load_schema(schema_path) | |
| def _print_error(message: str, *, hint: str | None = None) -> None: | |
| """Render a rich-formatted CLI error.""" | |
| body = f"[bold red]{message}[/bold red]" | |
| if hint: | |
| body = f"{body}\n\n[dim]{hint}[/dim]" | |
| _console.print(Panel(body, title="Repair Error", style="red")) | |
| def _propose_repairs( | |
| issues: list[Issue], | |
| path: Path, | |
| working_df: pd.DataFrame, | |
| schema: Schema | None, | |
| *, | |
| allow_llm: bool, | |
| model: str, | |
| allow_pii: bool, | |
| confirm_pii: bool, | |
| confirm_escalations: bool, | |
| interactive: bool, | |
| ) -> tuple[list[ProposedFix], list[list[RepairAttempt]]]: | |
| """Run repairers and gates issue-by-issue against the working dataframe.""" | |
| repairers = build_repairers( | |
| cache_dir=cache_dir_for(path), | |
| allow_llm=allow_llm, | |
| model=model, | |
| ) | |
| safety_filter = SafetyFilter() | |
| verifier = SMTVerifier() | |
| safety_context = SafetyContext( | |
| allow_pii=allow_pii, | |
| confirm_pii=confirm_pii, | |
| confirm_escalations=confirm_escalations, | |
| ) | |
| accepted_fixes: list[ProposedFix] = [] | |
| attempt_groups: list[list[RepairAttempt]] = [] | |
| for issue in issues: | |
| attempts: list[RepairAttempt] = [] | |
| repairer = repairers.get(issue.issue_type) | |
| if repairer is None: | |
| attempts.append( | |
| RepairAttempt( | |
| issue=issue, | |
| attempt_number=1, | |
| status="attempted_not_fixed", | |
| reason="No repairer is registered for this issue type.", | |
| ) | |
| ) | |
| attempt_groups.append(attempts) | |
| continue | |
| accepted = False | |
| retry_context = RetryContext(issue=issue) | |
| for attempt_number in range(1, 4): | |
| candidate = repairer.propose(issue, working_df, schema, retry_context=retry_context) | |
| if candidate is None: | |
| attempts.append( | |
| RepairAttempt( | |
| issue=issue, | |
| attempt_number=attempt_number, | |
| status="attempted_not_fixed", | |
| reason="No repair proposal was available for this issue.", | |
| ) | |
| ) | |
| break | |
| preferred = safety_filter.choose_preferred([candidate], schema, safety_context) | |
| safety_result = safety_filter.evaluate(preferred, schema, safety_context) | |
| if safety_result.verdict == SafetyVerdict.ESCALATE and interactive: | |
| safety_context, safety_result = _resolve_escalation( | |
| preferred, | |
| schema, | |
| safety_context, | |
| safety_filter, | |
| safety_result, | |
| ) | |
| if safety_result.verdict == SafetyVerdict.DENY: | |
| attempts.append( | |
| RepairAttempt( | |
| issue=issue, | |
| attempt_number=attempt_number, | |
| fix=preferred, | |
| status="denied", | |
| reason=safety_result.reason, | |
| ) | |
| ) | |
| retry_context = _build_retry_context(issue, attempts) | |
| continue | |
| if safety_result.verdict == SafetyVerdict.ESCALATE: | |
| attempts.append( | |
| RepairAttempt( | |
| issue=issue, | |
| attempt_number=attempt_number, | |
| fix=preferred, | |
| status="escalated", | |
| reason=safety_result.reason, | |
| ) | |
| ) | |
| break | |
| verifier_result = verifier.verify(working_df, [preferred], schema) | |
| if verifier_result.verdict == VerificationVerdict.ACCEPT: | |
| accepted_fixes.append(preferred) | |
| working_df.at[preferred.fix.row, preferred.fix.column] = preferred.fix.new_value | |
| attempts.append( | |
| RepairAttempt( | |
| issue=issue, | |
| attempt_number=attempt_number, | |
| fix=preferred, | |
| status="accepted", | |
| reason=verifier_result.reason, | |
| ) | |
| ) | |
| accepted = True | |
| break | |
| attempts.append( | |
| RepairAttempt( | |
| issue=issue, | |
| attempt_number=attempt_number, | |
| fix=preferred, | |
| status=( | |
| "rejected" | |
| if verifier_result.verdict == VerificationVerdict.REJECT | |
| else "unknown" | |
| ), | |
| reason=verifier_result.reason, | |
| unsat_core=verifier_result.unsat_core, | |
| ) | |
| ) | |
| retry_context = _build_retry_context(issue, attempts) | |
| if ( | |
| not accepted | |
| and attempts | |
| and attempts[-1].status not in {"attempted_not_fixed", "escalated"} | |
| ): | |
| last_reason = attempts[-1].reason | |
| attempts[-1] = attempts[-1].model_copy( | |
| update={ | |
| "status": "attempted_not_fixed", | |
| "reason": ( | |
| f"Issue was attempted but not fixed after {len(attempts)} attempt(s). " | |
| f"Last failure: {last_reason}" | |
| ), | |
| } | |
| ) | |
| attempt_groups.append(attempts) | |
| return accepted_fixes, attempt_groups | |
| def _build_retry_context(issue: Issue, attempts: list[RepairAttempt]) -> RetryContext: | |
| """Build retry hints from previous failed attempts.""" | |
| rejected_values = frozenset( | |
| attempt.fix.fix.new_value | |
| for attempt in attempts | |
| if attempt.fix is not None and attempt.status in {"denied", "rejected", "unknown"} | |
| ) | |
| hints: list[str] = [] | |
| for attempt in attempts: | |
| hints.append(attempt.reason) | |
| hints.extend(attempt.unsat_core) | |
| return RetryContext( | |
| issue=issue, | |
| previous_attempts=tuple(attempts), | |
| rejected_values=rejected_values, | |
| hints=tuple(hints), | |
| ) | |
| def _resolve_escalation( | |
| candidate: ProposedFix, | |
| schema: Schema | None, | |
| context: SafetyContext, | |
| safety_filter: SafetyFilter, | |
| safety_result: SafetyResult, | |
| ) -> tuple[SafetyContext, SafetyResult]: | |
| """Prompt for safety escalations and re-evaluate if the user confirms.""" | |
| if "NO_PII_OVERWRITE" in safety_result.rule_ids: | |
| confirmed = typer.confirm( | |
| f"Candidate fix for row {candidate.fix.row}, column '{candidate.fix.column}' " | |
| "touches PII. Confirm this edit?", | |
| default=False, | |
| ) | |
| if confirmed: | |
| updated = context.model_copy(update={"confirm_pii": True}) | |
| return updated, safety_filter.evaluate(candidate, schema, updated) | |
| return context, safety_result | |
| confirmed = typer.confirm( | |
| f"Candidate fix for row {candidate.fix.row}, column '{candidate.fix.column}' " | |
| "touches an aggregate-sensitive column. Confirm this edit?", | |
| default=False, | |
| ) | |
| if confirmed: | |
| updated = context.model_copy(update={"confirm_escalations": True}) | |
| return updated, safety_filter.evaluate(candidate, schema, updated) | |
| return context, safety_result | |
| def _render_attempt_summary( | |
| attempt_groups: list[list[RepairAttempt]], | |
| console: Console, | |
| ) -> int: | |
| """Render a summary for issues that were not accepted.""" | |
| failed_groups = [ | |
| attempts for attempts in attempt_groups if attempts and attempts[-1].status != "accepted" | |
| ] | |
| if not failed_groups: | |
| return 0 | |
| lines: list[str] = [] | |
| for attempts in failed_groups: | |
| final_attempt = attempts[-1] | |
| issue = final_attempt.issue | |
| prefix = "" | |
| if any(label.startswith("fd::") for label in final_attempt.unsat_core): | |
| prefix = "functional dependency rejection - " | |
| elif any(label.startswith("domain::") for label in final_attempt.unsat_core): | |
| prefix = "domain bound rejection - " | |
| lines.append( | |
| f"{issue.issue_type} at {issue.row}:{issue.column} " | |
| f"after {len(attempts)} attempt(s): {prefix}{final_attempt.reason}" | |
| ) | |
| console.print("[bold yellow]Attempted But Not Fixed[/bold yellow]") | |
| for line in lines: | |
| console.print(line, overflow="fold") | |
| return len(failed_groups) | |
| def _apply_transaction( | |
| path: Path, | |
| fixes: list[ProposedFix], | |
| source_bytes: bytes, | |
| ) -> str: | |
| """Write a transaction record, apply fixes, and append the applied event.""" | |
| resolved_path = path.resolve() | |
| txn_id = generate_txn_id() | |
| snapshot_path = snapshot_path_for(resolved_path, txn_id) | |
| snapshot_path.parent.mkdir(parents=True, exist_ok=True) | |
| snapshot_path.write_bytes(source_bytes) | |
| transaction = RepairTransaction( | |
| txn_id=txn_id, | |
| created_at=datetime.now(UTC), | |
| source_path=str(resolved_path), | |
| source_sha256=sha256_bytes(source_bytes), | |
| source_snapshot_path=str(snapshot_path.resolve()), | |
| fixes=[proposal.fix for proposal in fixes], | |
| applied=False, | |
| ) | |
| log_path = append_created_transaction(transaction) | |
| try: | |
| post_sha256 = apply_fixes_to_csv(path, [proposal.fix for proposal in fixes]) | |
| append_applied_event(log_path, txn_id, post_sha256=post_sha256) | |
| except Exception: | |
| path.write_bytes(source_bytes) | |
| raise | |
| return txn_id | |
| def repair( | |
| path: Annotated[ | |
| Path, | |
| typer.Argument( | |
| exists=True, | |
| readable=True, | |
| help="Path to the CSV file to repair.", | |
| ), | |
| ], | |
| schema: Annotated[ | |
| Path | None, | |
| typer.Option( | |
| "--schema", | |
| exists=True, | |
| readable=True, | |
| help="Path to a YAML schema file with column types and FDs.", | |
| ), | |
| ] = None, | |
| dry_run: Annotated[ | |
| bool, | |
| typer.Option("--dry-run", help="Show proposed fixes without changing the file."), | |
| ] = False, | |
| apply: Annotated[ | |
| bool, | |
| typer.Option("--apply", help="Apply fixes and record a reversible transaction."), | |
| ] = False, | |
| allow_llm: Annotated[ | |
| bool, | |
| typer.Option( | |
| "--allow-llm", | |
| help="Allow fd_violation repair to call the configured LLM provider if needed.", | |
| ), | |
| ] = False, | |
| allow_pii: Annotated[ | |
| bool, | |
| typer.Option( | |
| "--allow-pii", | |
| help="Allow PII-targeting fixes to be considered by the safety layer.", | |
| ), | |
| ] = False, | |
| confirm_pii: Annotated[ | |
| bool, | |
| typer.Option( | |
| "--confirm-pii", | |
| help="Non-interactively confirm any PII-targeting fixes allowed via --allow-pii.", | |
| ), | |
| ] = False, | |
| confirm_escalations: Annotated[ | |
| bool, | |
| typer.Option( | |
| "--confirm-escalations", | |
| help="Non-interactively confirm soft safety escalations such as aggregate-sensitive edits.", | |
| ), | |
| ] = False, | |
| llm_model: Annotated[ | |
| str, | |
| typer.Option("--llm-model", help="Model name for fd_violation LLM fallback."), | |
| ] = "gemini-2.0-flash", | |
| ) -> None: | |
| """Detect, propose, and optionally apply reversible repairs to a CSV.""" | |
| if dry_run == apply: | |
| _print_error( | |
| "Choose exactly one of --dry-run or --apply.", | |
| hint="Example: dataforge repair data.csv --dry-run", | |
| ) | |
| raise typer.Exit(code=2) | |
| try: | |
| parsed_schema = _resolve_schema(schema) | |
| df = read_csv(path) | |
| except Exception as exc: | |
| _print_error(str(exc)) | |
| raise typer.Exit(code=2) from exc | |
| issues = run_all_detectors(df, parsed_schema) | |
| accepted_fixes, attempt_groups = _propose_repairs( | |
| issues, | |
| path, | |
| df.copy(deep=True), | |
| parsed_schema, | |
| allow_llm=allow_llm, | |
| model=llm_model, | |
| allow_pii=allow_pii, | |
| confirm_pii=confirm_pii, | |
| confirm_escalations=confirm_escalations, | |
| interactive=apply, | |
| ) | |
| output_console = Console() | |
| render_repair_diff(accepted_fixes, output_console, file_path=str(path)) | |
| failed_issue_count = _render_attempt_summary(attempt_groups, output_console) | |
| if not accepted_fixes and failed_issue_count == 0: | |
| raise typer.Exit(code=1) | |
| if dry_run: | |
| raise typer.Exit(code=0 if accepted_fixes else 1) | |
| if not accepted_fixes: | |
| raise typer.Exit(code=1) | |
| batch_safety = SafetyFilter().evaluate_batch(accepted_fixes) | |
| if batch_safety.verdict != SafetyVerdict.ALLOW: | |
| _print_error(batch_safety.reason) | |
| raise typer.Exit(code=1) | |
| source_bytes = path.read_bytes() | |
| try: | |
| txn_id = _apply_transaction(path, accepted_fixes, source_bytes) | |
| except Exception as exc: | |
| _print_error( | |
| f"Failed to apply repairs: {exc}", | |
| hint="The source file was restored to its pre-apply bytes.", | |
| ) | |
| raise typer.Exit(code=1) from exc | |
| output_console.print( | |
| Panel( | |
| f"[green]Applied {len(accepted_fixes)} fix(es).[/green]\n" | |
| f"Transaction ID: [bold]{txn_id}[/bold]", | |
| title="Repair Applied", | |
| style="green", | |
| ) | |
| ) | |
| if failed_issue_count: | |
| output_console.print( | |
| Panel( | |
| f"[yellow]{failed_issue_count} issue(s) were attempted but not fixed.[/yellow]", | |
| title="Week 3 Summary", | |
| style="yellow", | |
| ) | |
| ) | |