File size: 6,442 Bytes
532d883
 
 
526b256
 
 
 
e4ab884
526b256
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
532d883
 
 
 
 
 
 
526b256
 
 
 
 
 
 
 
532d883
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
526b256
532d883
 
 
 
 
 
 
 
 
 
 
 
526b256
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextStreamer
import torch
import warnings
import re
import logging
import argparse
from znum import Znum, Topsis, Promethee, Beast
from helpers.utils import SYSTEM_PROMPT, DEFAULT_QUERY

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Z-number mappings: value/confidence (1-5) to fuzzy trapezoidal numbers
A_MAP = {
    1: [2, 3, 3, 4],
    2: [4, 5, 5, 6],
    3: [6, 7, 7, 8],
    4: [8, 9, 9, 10],
    5: [10, 11, 11, 12],
}

B_MAP = {
    1: [0.2, 0.3, 0.3, 0.4],
    2: [0.3, 0.4, 0.4, 0.5],
    3: [0.4, 0.5, 0.5, 0.6],
    4: [0.5, 0.6, 0.6, 0.7],
    5: [0.6, 0.7, 0.7, 0.8],
}


def parse_znum_pair(pair_str: str) -> Znum | None:
    """Convert 'N:M' string to Znum object using A_MAP and B_MAP (abs value for A)."""
    try:
        parts = pair_str.strip().split(':')
        if len(parts) != 2:
            return None
        a_val = abs(int(parts[0]))
        b_val = int(parts[1])
        if a_val not in A_MAP or b_val not in B_MAP:
            logger.warning(f"Invalid Z-number pair: {pair_str}")
            return None
        return Znum(A_MAP[a_val], B_MAP[b_val])
    except (ValueError, KeyError) as e:
        logger.warning(f"Failed to parse Z-number pair '{pair_str}': {e}")
        return None


def parse_markdown_table(text: str) -> dict:
    """Parse markdown table from model output into structured dict."""
    lines = [l.strip() for l in text.strip().split('\n') if l.strip() and '|' in l]
    lines = [l for l in lines if not re.match(r'^\|[-:\s|]+\|$', l)]

    if len(lines) < 4:
        logger.warning("Table has fewer than expected rows")
        return {}

    def split_row(row: str) -> list:
        cells = [c.strip() for c in row.split('|')]
        return [c for c in cells if c]

    headers = split_row(lines[0])
    criteria = headers[1:] if headers else []

    types_row = split_row(lines[1])
    types = types_row[1:] if len(types_row) > 1 else []

    weights_row = split_row(lines[-1])
    weights = weights_row[1:] if len(weights_row) > 1 else []

    alternatives = {}
    for line in lines[2:-1]:
        row = split_row(line)
        if row:
            alt_name = row[0]
            values = row[1:]
            alternatives[alt_name] = values

    result = {
        'criteria': criteria,
        'types': types,
        'alternatives': alternatives,
        'weights': weights
    }
    return result

warnings.filterwarnings(
    "ignore",
    message="Chat template .*",
    category=UserWarning,
)

# Parse command line arguments
parser = argparse.ArgumentParser(description='Z-number decision matrix extraction and MCDM')
parser.add_argument('--method', type=str, choices=['topsis', 'promethee'], default='topsis',
                    help='MCDM method to use (default: topsis)')
parser.add_argument('--query', '-q', type=str, default=DEFAULT_QUERY,
                    help='Decision query to process')
args = parser.parse_args()

qconfig = BitsAndBytesConfig(
    load_in_8bit=True,
)

model_name = "nuriyev/Qwen3-4B-znum-decision-matrix"  # or your preferred model
print(f"Loading model: {model_name}...")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    dtype=torch.bfloat16,
    # quantization_config=qconfig,
)
print("Model loaded successfully!\n")

messages = [
    {"role": "system", "content": SYSTEM_PROMPT},
    {"role": "user", "content": args.query},
]

# Prepare the prompt using the chat template
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True, enable_thinking=False)

# Tokenize the input
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

# Create streamer for real-time token output
streamer = TextStreamer(tokenizer, skip_special_tokens=True)

# Generate with streaming
output_ids = model.generate(**inputs, max_length=8192, streamer=streamer, temperature=1)

# Decode the generated output (excluding input tokens)
generated_ids = output_ids[0][inputs['input_ids'].shape[1]:]
generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True)

# Parse and log the decision matrix
logger.info("Parsing decision matrix from model output...")
matrix = parse_markdown_table(generated_text)

if matrix:
    logger.info(f"Criteria: {matrix['criteria']}")
    logger.info(f"Types: {matrix['types']}")

    # Convert weights to Znum
    znum_weights = [parse_znum_pair(w) for w in matrix['weights']]
    logger.info("Weights as Znum:")
    for i, (name, zw) in enumerate(zip(matrix['criteria'], znum_weights)):
        logger.info(f"  {name}: {zw}")

    # Convert alternatives to Znum
    znum_alternatives = {}
    for alt_name, values in matrix['alternatives'].items():
        znum_values = [parse_znum_pair(v) for v in values]
        znum_alternatives[alt_name] = znum_values
        logger.info(f"Alternative '{alt_name}' as Znum:")
        for i, (crit, zv) in enumerate(zip(matrix['criteria'], znum_values)):
            logger.info(f"  {crit}: {zv}")

    # Store converted data
    matrix['znum_weights'] = znum_weights
    matrix['znum_alternatives'] = znum_alternatives

    # Build criteria types for MCDM
    criteria_types = [
        Beast.CriteriaType.BENEFIT if t.lower() == 'benefit' else Beast.CriteriaType.COST
        for t in matrix['types']
    ]

    # Build decision table: [weights, *alternatives, criteria_types]
    alt_names = list(znum_alternatives.keys())
    alt_rows = [znum_alternatives[name] for name in alt_names]
    table = [znum_weights] + alt_rows + [criteria_types]

    # Apply MCDM method
    logger.info(f"\nApplying {args.method.upper()} method...")

    if args.method == 'topsis':
        solver = Topsis(table)
    else:
        solver = Promethee(table)

    solver.solve()

    # Log results
    logger.info(f"\n{'='*50}")
    logger.info(f"RESULTS ({args.method.upper()})")
    logger.info(f"{'='*50}")
    logger.info(f"Best alternative: {alt_names[solver.index_of_best_alternative]}")
    logger.info(f"Worst alternative: {alt_names[solver.index_of_worst_alternative]}")
    logger.info(f"\nRanking (best to worst):")
    for rank, idx in enumerate(solver.ordered_indices, 1):
        logger.info(f"  {rank}. {alt_names[idx]}")
else:
    logger.error("Failed to parse decision matrix")