|
|
import json |
|
|
import re |
|
|
from dataclasses import dataclass |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ValidationError: |
|
|
row_name: str |
|
|
column: str |
|
|
value: str |
|
|
expected: str |
|
|
message: str |
|
|
|
|
|
|
|
|
def parse_markdown_table(table_str: str) -> tuple[list[str], list[list[str]]]: |
|
|
"""Parse a markdown table into headers and rows.""" |
|
|
|
|
|
normalized = table_str.replace('\\n', '\n') |
|
|
lines = [line.strip() for line in normalized.strip().split('\n') if line.strip()] |
|
|
|
|
|
|
|
|
data_lines = [line for line in lines if not re.match(r'^\|[\s\-|]+\|$', line)] |
|
|
|
|
|
rows = [] |
|
|
for line in data_lines: |
|
|
|
|
|
cells = [cell.strip() for cell in line.split('|')] |
|
|
|
|
|
if cells and cells[0] == '': |
|
|
cells.pop(0) |
|
|
if cells and cells[-1] == '': |
|
|
cells.pop() |
|
|
if cells: |
|
|
rows.append(cells) |
|
|
|
|
|
if len(rows) < 2: |
|
|
return [], [] |
|
|
|
|
|
headers = rows[0] |
|
|
data_rows = rows[1:] |
|
|
|
|
|
return headers, data_rows |
|
|
|
|
|
|
|
|
def parse_z_number(value: str) -> tuple[Optional[int], Optional[int]]: |
|
|
"""Parse a Z-number in format 'A:B' and return (A, B).""" |
|
|
match = re.match(r'^(-?\d+):(\d+)$', value.strip()) |
|
|
if match: |
|
|
return int(match.group(1)), int(match.group(2)) |
|
|
return None, None |
|
|
|
|
|
|
|
|
def validate_decision_matrix(matrix_str: str, entry_id: int) -> list[ValidationError]: |
|
|
"""Validate a single decision matrix.""" |
|
|
errors = [] |
|
|
|
|
|
headers, rows = parse_markdown_table(matrix_str) |
|
|
|
|
|
if not headers or not rows: |
|
|
errors.append(ValidationError( |
|
|
row_name="N/A", |
|
|
column="N/A", |
|
|
value="N/A", |
|
|
expected="Valid markdown table", |
|
|
message="Could not parse markdown table" |
|
|
)) |
|
|
return errors |
|
|
|
|
|
|
|
|
criteria = headers[1:] |
|
|
|
|
|
if len(rows) < 3: |
|
|
errors.append(ValidationError( |
|
|
row_name="N/A", |
|
|
column="N/A", |
|
|
value=f"{len(rows)} rows", |
|
|
expected="At least 3 rows (type, alternatives, weight)", |
|
|
message="Insufficient rows in table" |
|
|
)) |
|
|
return errors |
|
|
|
|
|
|
|
|
type_row = rows[0] |
|
|
if type_row[0].lower() != 'type': |
|
|
errors.append(ValidationError( |
|
|
row_name=type_row[0], |
|
|
column="row_label", |
|
|
value=type_row[0], |
|
|
expected="type", |
|
|
message="First row should be 'type' row" |
|
|
)) |
|
|
|
|
|
|
|
|
criterion_types = {} |
|
|
for i, criterion in enumerate(criteria): |
|
|
if i + 1 < len(type_row): |
|
|
ctype = type_row[i + 1].lower().strip() |
|
|
if ctype not in ['benefit', 'cost']: |
|
|
errors.append(ValidationError( |
|
|
row_name="type", |
|
|
column=criterion, |
|
|
value=ctype, |
|
|
expected="'benefit' or 'cost'", |
|
|
message=f"Invalid criterion type" |
|
|
)) |
|
|
criterion_types[criterion] = ctype |
|
|
|
|
|
|
|
|
weight_row = rows[-1] |
|
|
if weight_row[0].lower() != 'weight': |
|
|
errors.append(ValidationError( |
|
|
row_name=weight_row[0], |
|
|
column="row_label", |
|
|
value=weight_row[0], |
|
|
expected="weight", |
|
|
message="Last row should be 'weight' row" |
|
|
)) |
|
|
|
|
|
|
|
|
if weight_row[0].lower() == 'weight': |
|
|
for i, criterion in enumerate(criteria): |
|
|
if i + 1 < len(weight_row): |
|
|
value = weight_row[i + 1] |
|
|
a_part, b_part = parse_z_number(value) |
|
|
|
|
|
if a_part is None or b_part is None: |
|
|
errors.append(ValidationError( |
|
|
row_name="weight", |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="Format 'A:B' (e.g., '5:4')", |
|
|
message="Invalid Z-number format" |
|
|
)) |
|
|
else: |
|
|
if not (1 <= a_part <= 5): |
|
|
errors.append(ValidationError( |
|
|
row_name="weight", |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="A-part: 1-5", |
|
|
message=f"Weight A-part {a_part} out of range" |
|
|
)) |
|
|
if not (1 <= b_part <= 5): |
|
|
errors.append(ValidationError( |
|
|
row_name="weight", |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="B-part: 1-5", |
|
|
message=f"Weight B-part {b_part} out of range" |
|
|
)) |
|
|
|
|
|
|
|
|
alternative_rows = rows[1:-1] |
|
|
|
|
|
for alt_row in alternative_rows: |
|
|
alt_name = alt_row[0] |
|
|
|
|
|
for i, criterion in enumerate(criteria): |
|
|
if i + 1 >= len(alt_row): |
|
|
errors.append(ValidationError( |
|
|
row_name=alt_name, |
|
|
column=criterion, |
|
|
value="MISSING", |
|
|
expected="Z-number value", |
|
|
message="Missing value" |
|
|
)) |
|
|
continue |
|
|
|
|
|
value = alt_row[i + 1] |
|
|
a_part, b_part = parse_z_number(value) |
|
|
|
|
|
if a_part is None or b_part is None: |
|
|
errors.append(ValidationError( |
|
|
row_name=alt_name, |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="Format 'A:B' (e.g., '4:3' or '-3:4')", |
|
|
message="Invalid Z-number format" |
|
|
)) |
|
|
continue |
|
|
|
|
|
|
|
|
if not (1 <= b_part <= 5): |
|
|
errors.append(ValidationError( |
|
|
row_name=alt_name, |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="B-part (confidence): 1-5", |
|
|
message=f"Confidence {b_part} out of range" |
|
|
)) |
|
|
|
|
|
|
|
|
ctype = criterion_types.get(criterion, 'unknown') |
|
|
|
|
|
if ctype == 'benefit': |
|
|
if not (1 <= a_part <= 5): |
|
|
errors.append(ValidationError( |
|
|
row_name=alt_name, |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="Benefit A-part: 1-5", |
|
|
message=f"Benefit value {a_part} out of range" |
|
|
)) |
|
|
elif ctype == 'cost': |
|
|
if not (-5 <= a_part <= -1): |
|
|
errors.append(ValidationError( |
|
|
row_name=alt_name, |
|
|
column=criterion, |
|
|
value=value, |
|
|
expected="Cost A-part: -5 to -1", |
|
|
message=f"Cost value {a_part} out of range" |
|
|
)) |
|
|
|
|
|
return errors |
|
|
|
|
|
|
|
|
def main(): |
|
|
import argparse |
|
|
parser = argparse.ArgumentParser(description='Validate Z-number decision matrices in JSONL files') |
|
|
parser.add_argument('filepath', nargs='?', default='train.jsonl', help='Path to JSONL file (default: train.jsonl)') |
|
|
args = parser.parse_args() |
|
|
|
|
|
filepath = args.filepath |
|
|
|
|
|
total_entries = 0 |
|
|
entries_with_errors = 0 |
|
|
total_errors = 0 |
|
|
|
|
|
all_errors = {} |
|
|
|
|
|
print("=" * 70) |
|
|
print("Decision Matrix Validation Report") |
|
|
print("=" * 70) |
|
|
|
|
|
with open(filepath, 'r') as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
try: |
|
|
entry = json.loads(line) |
|
|
entry_id = entry.get('id', line_num - 1) |
|
|
total_entries += 1 |
|
|
|
|
|
matrix_str = entry.get('decision_matrix', '') |
|
|
|
|
|
if not matrix_str: |
|
|
print(f"\n[Entry {entry_id}] WARNING: No decision_matrix field") |
|
|
continue |
|
|
|
|
|
errors = validate_decision_matrix(matrix_str, entry_id) |
|
|
|
|
|
if errors: |
|
|
entries_with_errors += 1 |
|
|
total_errors += len(errors) |
|
|
all_errors[entry_id] = errors |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
print(f"\n[Line {line_num}] JSON Parse Error: {e}") |
|
|
|
|
|
|
|
|
if all_errors: |
|
|
print(f"\n{'=' * 70}") |
|
|
print("VALIDATION ERRORS") |
|
|
print('=' * 70) |
|
|
|
|
|
for entry_id, errors in all_errors.items(): |
|
|
print(f"\n[Entry {entry_id}] - {len(errors)} error(s):") |
|
|
for err in errors: |
|
|
print(f" • Row '{err.row_name}', Column '{err.column}'") |
|
|
print(f" Value: {err.value}") |
|
|
print(f" Expected: {err.expected}") |
|
|
print(f" Message: {err.message}") |
|
|
|
|
|
|
|
|
print(f"\n{'=' * 70}") |
|
|
print("SUMMARY") |
|
|
print('=' * 70) |
|
|
print(f"Total entries checked: {total_entries}") |
|
|
print(f"Entries with errors: {entries_with_errors}") |
|
|
print(f"Entries valid: {total_entries - entries_with_errors}") |
|
|
print(f"Total errors found: {total_errors}") |
|
|
|
|
|
if entries_with_errors == 0: |
|
|
print("\n✓ All decision matrices are valid!") |
|
|
else: |
|
|
print(f"\n✗ {entries_with_errors}/{total_entries} entries have validation errors") |
|
|
|
|
|
return entries_with_errors == 0 |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
import sys |
|
|
success = main() |
|
|
sys.exit(0 if success else 1) |