File size: 6,695 Bytes
d081ad0
 
 
670c60e
d081ad0
 
 
 
 
670c60e
 
 
 
 
 
 
 
 
d081ad0
 
 
 
670c60e
d081ad0
670c60e
d081ad0
 
670c60e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d081ad0
 
 
 
 
670c60e
 
 
 
 
d081ad0
670c60e
 
d081ad0
670c60e
 
 
d081ad0
 
 
 
 
 
 
670c60e
 
d081ad0
670c60e
 
d081ad0
670c60e
d081ad0
670c60e
04fb45c
 
d081ad0
670c60e
d081ad0
670c60e
d081ad0
 
 
 
 
c552811
d081ad0
 
 
670c60e
 
 
d081ad0
 
 
670c60e
 
 
d081ad0
 
670c60e
d081ad0
 
 
 
670c60e
d081ad0
 
 
 
 
 
 
 
670c60e
d081ad0
 
 
670c60e
d081ad0
 
 
670c60e
 
 
d081ad0
 
670c60e
d081ad0
670c60e
d081ad0
 
 
 
 
 
 
 
 
 
 
 
 
 
670c60e
d081ad0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
670c60e
d081ad0
 
670c60e
 
 
d081ad0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
670c60e
 
 
d081ad0
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import torch
import re
from html import unescape
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from peft import PeftModel
from transformers import StoppingCriteria, StoppingCriteriaList
from difflib import SequenceMatcher
from flask import Flask, request, jsonify

# --------------------------
# Step 1: Nastavení zařízení
# --------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 Running on device: {device}")

# --------------------------
# Step 2: Načtení tokenizeru
# --------------------------
model_path = "./"
try:
    tokenizer = GPT2Tokenizer.from_pretrained(model_path)
    tokenizer.pad_token = tokenizer.eos_token
    print("✅ Tokenizer loaded successfully")
except Exception as e:
    print(f"❌ Error loading tokenizer: {e}")
    exit()

# --------------------------
# Step 3: Načtení modelu s fallbackem
# --------------------------
quant_config = None
if torch.cuda.is_available():
    try:
        from transformers import BitsAndBytesConfig
        quant_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16
        )
        print("✅ Using 4-bit quantization (GPU mode)")
    except Exception as e:
        print("⚠️ BitsAndBytes not available, continuing without quantization:", e)
else:
    print("💡 CPU mode — quantization disabled")

try:
    base_model = GPT2LMHeadModel.from_pretrained(
        model_path,
        quantization_config=quant_config,
        device_map={"": 0} if torch.cuda.is_available() else None,
        low_cpu_mem_usage=True,
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
    ).to(device)
    print("✅ Base model loaded successfully")
except Exception as e:
    print(f"❌ Error loading base model: {e}")
    exit()

# --------------------------
# Step 4: Načtení PEFT (LoRA)
# --------------------------
try:
    model = PeftModel.from_pretrained(
        base_model,
        model_path,
        is_trainable=False,
        device_map={"": 0} if torch.cuda.is_available() else None
    )
    model.to(device)
    print("✅ PEFT model loaded successfully")
except Exception as e:
    print(f"⚠️ Warning: Failed to load PEFT adapter, using base model. ({e})")
    model = base_model

# --------------------------
# Step 5: System prompt
# --------------------------
system_prompt = """You are GPT-A, a friendly AI assistant made by LuxAI.
You must answer very short and cooherent."""

# --------------------------
# Step 6: Stopping criteria
# --------------------------
class CustomStoppingCriteria(StoppingCriteria):
    def __init__(self, stop_token_id):
        self.stop_token_id = stop_token_id

    def __call__(self, input_ids, scores, **kwargs):
        return input_ids[0][-1] == self.stop_token_id or len(input_ids[0]) > 512

stopping_criteria = StoppingCriteriaList([CustomStoppingCriteria(tokenizer.eos_token_id)])

# --------------------------
# Step 6.5: Utility funkce
# --------------------------
def clean_response(text):
    """Odstraní HTML, Markdown a redundantní mezery."""
    original_text = text
    text = re.sub(r"<[^>]+>", " ", text)
    text = unescape(text)
    text = re.sub(r"[*#`_~]+", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    if text != original_text:
        print("🧹 Cleaned response.")
    return text


def remove_repetitions(text, similarity_threshold=0.8):
    """Odstraní opakující se věty."""
    sentences = re.split(r'(?<=[.!?])\s+', text)
    if len(sentences) <= 1:
        return text
    unique_sentences = []
    for sent in sentences:
        sent_clean = sent.strip()
        if not sent_clean:
            continue
        if not unique_sentences or SequenceMatcher(None, sent_clean, unique_sentences[-1]).ratio() < similarity_threshold:
            unique_sentences.append(sent_clean)
    return " ".join(unique_sentences)


def truncate_to_last_sentence(text):
    """Zkrátí text na poslední dokončenou větu."""
    sentences = re.split(r'(?<=[.!?])\s+', text)
    for i in range(len(sentences) - 1, -1, -1):
        if re.search(r'[.!?]$', sentences[i].strip()):
            return " ".join(sentences[:i+1]).strip()
    return text.strip()

# --------------------------
# Step 7: Generování odpovědi
# --------------------------
def generate_response(
    user_input,
    max_length=2048,
    temperature=0.7,
    top_k=50,
    top_p=0.7,
    repetition_penalty=10.0,
    num_beams=4,
    early_stopping=True,
    do_sample=True
):
    try:
        prompt = f"{system_prompt}\n\nUser: {user_input}\nAssistant:"
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        print(f"📥 Input on device: {inputs['input_ids'].device}")

        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_length=max_length,
                temperature=temperature if do_sample else 1.0,
                top_k=top_k if do_sample else None,
                top_p=top_p if do_sample else None,
                repetition_penalty=repetition_penalty,
                num_beams=num_beams,
                early_stopping=early_stopping if num_beams > 1 else False,
                num_return_sequences=1,
                pad_token_id=tokenizer.eos_token_id,
                eos_token_id=tokenizer.eos_token_id,
                do_sample=do_sample,
                stopping_criteria=stopping_criteria
            )

        generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        response = generated_text.split("Assistant:")[-1].strip()

        response = clean_response(response)
        response = remove_repetitions(response)
        response = truncate_to_last_sentence(response)

        return response

    except Exception as e:
        print(f"❌ Error during generation: {e}")
        return None

# --------------------------
# Step 8: Flask API
# --------------------------
app = Flask(__name__)

@app.route('/generate', methods=['POST'])
def generate_text():
    data = request.get_json()
    if not data or 'user_input' not in data:
        return jsonify({'error': 'Missing user_input parameter'}), 400

    user_input = data['user_input']
    generated_response = generate_response(user_input)

    if generated_response is None:
        return jsonify({'error': 'Failed to generate response'}), 500

    return jsonify({'response': generated_response})

# --------------------------
# Step 9: Spuštění serveru
# --------------------------
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=7860)