text2mcdm / validate_znum.py
nuriyev's picture
move helper modules to helpers/
e4ab884
import json
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class ValidationError:
row_name: str
column: str
value: str
expected: str
message: str
def parse_markdown_table(table_str: str) -> tuple[list[str], list[list[str]]]:
"""Parse a markdown table into headers and rows."""
# Handle escaped newlines (\\n literal string)
normalized = table_str.replace('\\n', '\n')
lines = [line.strip() for line in normalized.strip().split('\n') if line.strip()]
# Filter out separator lines (|---|---|...)
data_lines = [line for line in lines if not re.match(r'^\|[\s\-|]+\|$', line)]
rows = []
for line in data_lines:
# Split by | and clean up
cells = [cell.strip() for cell in line.split('|')]
# Remove exactly one empty string from start and end (caused by leading/trailing |)
if cells and cells[0] == '':
cells.pop(0)
if cells and cells[-1] == '':
cells.pop()
if cells:
rows.append(cells)
if len(rows) < 2:
return [], []
headers = rows[0]
data_rows = rows[1:]
return headers, data_rows
def parse_z_number(value: str) -> tuple[Optional[int], Optional[int]]:
"""Parse a Z-number in format 'A:B' and return (A, B)."""
match = re.match(r'^(-?\d+):(\d+)$', value.strip())
if match:
return int(match.group(1)), int(match.group(2))
return None, None
def validate_decision_matrix(matrix_str: str, entry_id: int) -> list[ValidationError]:
"""Validate a single decision matrix."""
errors = []
headers, rows = parse_markdown_table(matrix_str)
if not headers or not rows:
errors.append(ValidationError(
row_name="N/A",
column="N/A",
value="N/A",
expected="Valid markdown table",
message="Could not parse markdown table"
))
return errors
# First column is row labels, rest are criteria
criteria = headers[1:] # Skip first empty/label column
if len(rows) < 3:
errors.append(ValidationError(
row_name="N/A",
column="N/A",
value=f"{len(rows)} rows",
expected="At least 3 rows (type, alternatives, weight)",
message="Insufficient rows in table"
))
return errors
# First row should be "type" row
type_row = rows[0]
if type_row[0].lower() != 'type':
errors.append(ValidationError(
row_name=type_row[0],
column="row_label",
value=type_row[0],
expected="type",
message="First row should be 'type' row"
))
# Extract criterion types (benefit/cost)
criterion_types = {}
for i, criterion in enumerate(criteria):
if i + 1 < len(type_row):
ctype = type_row[i + 1].lower().strip()
if ctype not in ['benefit', 'cost']:
errors.append(ValidationError(
row_name="type",
column=criterion,
value=ctype,
expected="'benefit' or 'cost'",
message=f"Invalid criterion type"
))
criterion_types[criterion] = ctype
# Last row should be "weight" row
weight_row = rows[-1]
if weight_row[0].lower() != 'weight':
errors.append(ValidationError(
row_name=weight_row[0],
column="row_label",
value=weight_row[0],
expected="weight",
message="Last row should be 'weight' row"
))
# Validate weight row values (should be positive 1-5 for both parts)
if weight_row[0].lower() == 'weight':
for i, criterion in enumerate(criteria):
if i + 1 < len(weight_row):
value = weight_row[i + 1]
a_part, b_part = parse_z_number(value)
if a_part is None or b_part is None:
errors.append(ValidationError(
row_name="weight",
column=criterion,
value=value,
expected="Format 'A:B' (e.g., '5:4')",
message="Invalid Z-number format"
))
else:
if not (1 <= a_part <= 5):
errors.append(ValidationError(
row_name="weight",
column=criterion,
value=value,
expected="A-part: 1-5",
message=f"Weight A-part {a_part} out of range"
))
if not (1 <= b_part <= 5):
errors.append(ValidationError(
row_name="weight",
column=criterion,
value=value,
expected="B-part: 1-5",
message=f"Weight B-part {b_part} out of range"
))
# Validate alternative rows (between type and weight)
alternative_rows = rows[1:-1]
for alt_row in alternative_rows:
alt_name = alt_row[0]
for i, criterion in enumerate(criteria):
if i + 1 >= len(alt_row):
errors.append(ValidationError(
row_name=alt_name,
column=criterion,
value="MISSING",
expected="Z-number value",
message="Missing value"
))
continue
value = alt_row[i + 1]
a_part, b_part = parse_z_number(value)
if a_part is None or b_part is None:
errors.append(ValidationError(
row_name=alt_name,
column=criterion,
value=value,
expected="Format 'A:B' (e.g., '4:3' or '-3:4')",
message="Invalid Z-number format"
))
continue
# Validate B-part (confidence) - always 1-5
if not (1 <= b_part <= 5):
errors.append(ValidationError(
row_name=alt_name,
column=criterion,
value=value,
expected="B-part (confidence): 1-5",
message=f"Confidence {b_part} out of range"
))
# Validate A-part based on criterion type
ctype = criterion_types.get(criterion, 'unknown')
if ctype == 'benefit':
if not (1 <= a_part <= 5):
errors.append(ValidationError(
row_name=alt_name,
column=criterion,
value=value,
expected="Benefit A-part: 1-5",
message=f"Benefit value {a_part} out of range"
))
elif ctype == 'cost':
if not (-5 <= a_part <= -1):
errors.append(ValidationError(
row_name=alt_name,
column=criterion,
value=value,
expected="Cost A-part: -5 to -1",
message=f"Cost value {a_part} out of range"
))
return errors
def main():
import argparse
parser = argparse.ArgumentParser(description='Validate Z-number decision matrices in JSONL files')
parser.add_argument('filepath', nargs='?', default='train.jsonl', help='Path to JSONL file (default: train.jsonl)')
args = parser.parse_args()
filepath = args.filepath
total_entries = 0
entries_with_errors = 0
total_errors = 0
all_errors = {}
print("=" * 70)
print("Decision Matrix Validation Report")
print("=" * 70)
with open(filepath, 'r') as f:
for line_num, line in enumerate(f, 1):
try:
entry = json.loads(line)
entry_id = entry.get('id', line_num - 1)
total_entries += 1
matrix_str = entry.get('decision_matrix', '')
if not matrix_str:
print(f"\n[Entry {entry_id}] WARNING: No decision_matrix field")
continue
errors = validate_decision_matrix(matrix_str, entry_id)
if errors:
entries_with_errors += 1
total_errors += len(errors)
all_errors[entry_id] = errors
except json.JSONDecodeError as e:
print(f"\n[Line {line_num}] JSON Parse Error: {e}")
# Print detailed errors
if all_errors:
print(f"\n{'=' * 70}")
print("VALIDATION ERRORS")
print('=' * 70)
for entry_id, errors in all_errors.items():
print(f"\n[Entry {entry_id}] - {len(errors)} error(s):")
for err in errors:
print(f" • Row '{err.row_name}', Column '{err.column}'")
print(f" Value: {err.value}")
print(f" Expected: {err.expected}")
print(f" Message: {err.message}")
# Summary
print(f"\n{'=' * 70}")
print("SUMMARY")
print('=' * 70)
print(f"Total entries checked: {total_entries}")
print(f"Entries with errors: {entries_with_errors}")
print(f"Entries valid: {total_entries - entries_with_errors}")
print(f"Total errors found: {total_errors}")
if entries_with_errors == 0:
print("\n✓ All decision matrices are valid!")
else:
print(f"\n✗ {entries_with_errors}/{total_entries} entries have validation errors")
return entries_with_errors == 0
if __name__ == '__main__':
import sys
success = main()
sys.exit(0 if success else 1)