import json import re from dataclasses import dataclass from typing import Optional @dataclass class ValidationError: row_name: str column: str value: str expected: str message: str def parse_markdown_table(table_str: str) -> tuple[list[str], list[list[str]]]: """Parse a markdown table into headers and rows.""" # Handle escaped newlines (\\n literal string) normalized = table_str.replace('\\n', '\n') lines = [line.strip() for line in normalized.strip().split('\n') if line.strip()] # Filter out separator lines (|---|---|...) data_lines = [line for line in lines if not re.match(r'^\|[\s\-|]+\|$', line)] rows = [] for line in data_lines: # Split by | and clean up cells = [cell.strip() for cell in line.split('|')] # Remove exactly one empty string from start and end (caused by leading/trailing |) if cells and cells[0] == '': cells.pop(0) if cells and cells[-1] == '': cells.pop() if cells: rows.append(cells) if len(rows) < 2: return [], [] headers = rows[0] data_rows = rows[1:] return headers, data_rows def parse_z_number(value: str) -> tuple[Optional[int], Optional[int]]: """Parse a Z-number in format 'A:B' and return (A, B).""" match = re.match(r'^(-?\d+):(\d+)$', value.strip()) if match: return int(match.group(1)), int(match.group(2)) return None, None def validate_decision_matrix(matrix_str: str, entry_id: int) -> list[ValidationError]: """Validate a single decision matrix.""" errors = [] headers, rows = parse_markdown_table(matrix_str) if not headers or not rows: errors.append(ValidationError( row_name="N/A", column="N/A", value="N/A", expected="Valid markdown table", message="Could not parse markdown table" )) return errors # First column is row labels, rest are criteria criteria = headers[1:] # Skip first empty/label column if len(rows) < 3: errors.append(ValidationError( row_name="N/A", column="N/A", value=f"{len(rows)} rows", expected="At least 3 rows (type, alternatives, weight)", message="Insufficient rows in table" )) return errors # First row should be "type" row type_row = rows[0] if type_row[0].lower() != 'type': errors.append(ValidationError( row_name=type_row[0], column="row_label", value=type_row[0], expected="type", message="First row should be 'type' row" )) # Extract criterion types (benefit/cost) criterion_types = {} for i, criterion in enumerate(criteria): if i + 1 < len(type_row): ctype = type_row[i + 1].lower().strip() if ctype not in ['benefit', 'cost']: errors.append(ValidationError( row_name="type", column=criterion, value=ctype, expected="'benefit' or 'cost'", message=f"Invalid criterion type" )) criterion_types[criterion] = ctype # Last row should be "weight" row weight_row = rows[-1] if weight_row[0].lower() != 'weight': errors.append(ValidationError( row_name=weight_row[0], column="row_label", value=weight_row[0], expected="weight", message="Last row should be 'weight' row" )) # Validate weight row values (should be positive 1-5 for both parts) if weight_row[0].lower() == 'weight': for i, criterion in enumerate(criteria): if i + 1 < len(weight_row): value = weight_row[i + 1] a_part, b_part = parse_z_number(value) if a_part is None or b_part is None: errors.append(ValidationError( row_name="weight", column=criterion, value=value, expected="Format 'A:B' (e.g., '5:4')", message="Invalid Z-number format" )) else: if not (1 <= a_part <= 5): errors.append(ValidationError( row_name="weight", column=criterion, value=value, expected="A-part: 1-5", message=f"Weight A-part {a_part} out of range" )) if not (1 <= b_part <= 5): errors.append(ValidationError( row_name="weight", column=criterion, value=value, expected="B-part: 1-5", message=f"Weight B-part {b_part} out of range" )) # Validate alternative rows (between type and weight) alternative_rows = rows[1:-1] for alt_row in alternative_rows: alt_name = alt_row[0] for i, criterion in enumerate(criteria): if i + 1 >= len(alt_row): errors.append(ValidationError( row_name=alt_name, column=criterion, value="MISSING", expected="Z-number value", message="Missing value" )) continue value = alt_row[i + 1] a_part, b_part = parse_z_number(value) if a_part is None or b_part is None: errors.append(ValidationError( row_name=alt_name, column=criterion, value=value, expected="Format 'A:B' (e.g., '4:3' or '-3:4')", message="Invalid Z-number format" )) continue # Validate B-part (confidence) - always 1-5 if not (1 <= b_part <= 5): errors.append(ValidationError( row_name=alt_name, column=criterion, value=value, expected="B-part (confidence): 1-5", message=f"Confidence {b_part} out of range" )) # Validate A-part based on criterion type ctype = criterion_types.get(criterion, 'unknown') if ctype == 'benefit': if not (1 <= a_part <= 5): errors.append(ValidationError( row_name=alt_name, column=criterion, value=value, expected="Benefit A-part: 1-5", message=f"Benefit value {a_part} out of range" )) elif ctype == 'cost': if not (-5 <= a_part <= -1): errors.append(ValidationError( row_name=alt_name, column=criterion, value=value, expected="Cost A-part: -5 to -1", message=f"Cost value {a_part} out of range" )) return errors def main(): import argparse parser = argparse.ArgumentParser(description='Validate Z-number decision matrices in JSONL files') parser.add_argument('filepath', nargs='?', default='train.jsonl', help='Path to JSONL file (default: train.jsonl)') args = parser.parse_args() filepath = args.filepath total_entries = 0 entries_with_errors = 0 total_errors = 0 all_errors = {} print("=" * 70) print("Decision Matrix Validation Report") print("=" * 70) with open(filepath, 'r') as f: for line_num, line in enumerate(f, 1): try: entry = json.loads(line) entry_id = entry.get('id', line_num - 1) total_entries += 1 matrix_str = entry.get('decision_matrix', '') if not matrix_str: print(f"\n[Entry {entry_id}] WARNING: No decision_matrix field") continue errors = validate_decision_matrix(matrix_str, entry_id) if errors: entries_with_errors += 1 total_errors += len(errors) all_errors[entry_id] = errors except json.JSONDecodeError as e: print(f"\n[Line {line_num}] JSON Parse Error: {e}") # Print detailed errors if all_errors: print(f"\n{'=' * 70}") print("VALIDATION ERRORS") print('=' * 70) for entry_id, errors in all_errors.items(): print(f"\n[Entry {entry_id}] - {len(errors)} error(s):") for err in errors: print(f" • Row '{err.row_name}', Column '{err.column}'") print(f" Value: {err.value}") print(f" Expected: {err.expected}") print(f" Message: {err.message}") # Summary print(f"\n{'=' * 70}") print("SUMMARY") print('=' * 70) print(f"Total entries checked: {total_entries}") print(f"Entries with errors: {entries_with_errors}") print(f"Entries valid: {total_entries - entries_with_errors}") print(f"Total errors found: {total_errors}") if entries_with_errors == 0: print("\n✓ All decision matrices are valid!") else: print(f"\n✗ {entries_with_errors}/{total_entries} entries have validation errors") return entries_with_errors == 0 if __name__ == '__main__': import sys success = main() sys.exit(0 if success else 1)