text2mcdm / text2mcdm.py
nuriyev's picture
move helper modules to helpers/
e4ab884
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextStreamer
import torch
import warnings
import re
import logging
import argparse
from znum import Znum, Topsis, Promethee, Beast
from helpers.utils import SYSTEM_PROMPT, DEFAULT_QUERY
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Z-number mappings: value/confidence (1-5) to fuzzy trapezoidal numbers
A_MAP = {
1: [2, 3, 3, 4],
2: [4, 5, 5, 6],
3: [6, 7, 7, 8],
4: [8, 9, 9, 10],
5: [10, 11, 11, 12],
}
B_MAP = {
1: [0.2, 0.3, 0.3, 0.4],
2: [0.3, 0.4, 0.4, 0.5],
3: [0.4, 0.5, 0.5, 0.6],
4: [0.5, 0.6, 0.6, 0.7],
5: [0.6, 0.7, 0.7, 0.8],
}
def parse_znum_pair(pair_str: str) -> Znum | None:
"""Convert 'N:M' string to Znum object using A_MAP and B_MAP (abs value for A)."""
try:
parts = pair_str.strip().split(':')
if len(parts) != 2:
return None
a_val = abs(int(parts[0]))
b_val = int(parts[1])
if a_val not in A_MAP or b_val not in B_MAP:
logger.warning(f"Invalid Z-number pair: {pair_str}")
return None
return Znum(A_MAP[a_val], B_MAP[b_val])
except (ValueError, KeyError) as e:
logger.warning(f"Failed to parse Z-number pair '{pair_str}': {e}")
return None
def parse_markdown_table(text: str) -> dict:
"""Parse markdown table from model output into structured dict."""
lines = [l.strip() for l in text.strip().split('\n') if l.strip() and '|' in l]
lines = [l for l in lines if not re.match(r'^\|[-:\s|]+\|$', l)]
if len(lines) < 4:
logger.warning("Table has fewer than expected rows")
return {}
def split_row(row: str) -> list:
cells = [c.strip() for c in row.split('|')]
return [c for c in cells if c]
headers = split_row(lines[0])
criteria = headers[1:] if headers else []
types_row = split_row(lines[1])
types = types_row[1:] if len(types_row) > 1 else []
weights_row = split_row(lines[-1])
weights = weights_row[1:] if len(weights_row) > 1 else []
alternatives = {}
for line in lines[2:-1]:
row = split_row(line)
if row:
alt_name = row[0]
values = row[1:]
alternatives[alt_name] = values
result = {
'criteria': criteria,
'types': types,
'alternatives': alternatives,
'weights': weights
}
return result
warnings.filterwarnings(
"ignore",
message="Chat template .*",
category=UserWarning,
)
# Parse command line arguments
parser = argparse.ArgumentParser(description='Z-number decision matrix extraction and MCDM')
parser.add_argument('--method', type=str, choices=['topsis', 'promethee'], default='topsis',
help='MCDM method to use (default: topsis)')
parser.add_argument('--query', '-q', type=str, default=DEFAULT_QUERY,
help='Decision query to process')
args = parser.parse_args()
qconfig = BitsAndBytesConfig(
load_in_8bit=True,
)
model_name = "nuriyev/Qwen3-4B-znum-decision-matrix" # or your preferred model
print(f"Loading model: {model_name}...")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
dtype=torch.bfloat16,
# quantization_config=qconfig,
)
print("Model loaded successfully!\n")
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": args.query},
]
# Prepare the prompt using the chat template
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True, enable_thinking=False)
# Tokenize the input
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
# Create streamer for real-time token output
streamer = TextStreamer(tokenizer, skip_special_tokens=True)
# Generate with streaming
output_ids = model.generate(**inputs, max_length=8192, streamer=streamer, temperature=1)
# Decode the generated output (excluding input tokens)
generated_ids = output_ids[0][inputs['input_ids'].shape[1]:]
generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True)
# Parse and log the decision matrix
logger.info("Parsing decision matrix from model output...")
matrix = parse_markdown_table(generated_text)
if matrix:
logger.info(f"Criteria: {matrix['criteria']}")
logger.info(f"Types: {matrix['types']}")
# Convert weights to Znum
znum_weights = [parse_znum_pair(w) for w in matrix['weights']]
logger.info("Weights as Znum:")
for i, (name, zw) in enumerate(zip(matrix['criteria'], znum_weights)):
logger.info(f" {name}: {zw}")
# Convert alternatives to Znum
znum_alternatives = {}
for alt_name, values in matrix['alternatives'].items():
znum_values = [parse_znum_pair(v) for v in values]
znum_alternatives[alt_name] = znum_values
logger.info(f"Alternative '{alt_name}' as Znum:")
for i, (crit, zv) in enumerate(zip(matrix['criteria'], znum_values)):
logger.info(f" {crit}: {zv}")
# Store converted data
matrix['znum_weights'] = znum_weights
matrix['znum_alternatives'] = znum_alternatives
# Build criteria types for MCDM
criteria_types = [
Beast.CriteriaType.BENEFIT if t.lower() == 'benefit' else Beast.CriteriaType.COST
for t in matrix['types']
]
# Build decision table: [weights, *alternatives, criteria_types]
alt_names = list(znum_alternatives.keys())
alt_rows = [znum_alternatives[name] for name in alt_names]
table = [znum_weights] + alt_rows + [criteria_types]
# Apply MCDM method
logger.info(f"\nApplying {args.method.upper()} method...")
if args.method == 'topsis':
solver = Topsis(table)
else:
solver = Promethee(table)
solver.solve()
# Log results
logger.info(f"\n{'='*50}")
logger.info(f"RESULTS ({args.method.upper()})")
logger.info(f"{'='*50}")
logger.info(f"Best alternative: {alt_names[solver.index_of_best_alternative]}")
logger.info(f"Worst alternative: {alt_names[solver.index_of_worst_alternative]}")
logger.info(f"\nRanking (best to worst):")
for rank, idx in enumerate(solver.ordered_indices, 1):
logger.info(f" {rank}. {alt_names[idx]}")
else:
logger.error("Failed to parse decision matrix")