| import os |
| import torch |
| from pathlib import Path |
| from datasets import Dataset |
| from transformers import ( |
| AutoTokenizer, |
| AutoModelForSeq2SeqLM, |
| DataCollatorForSeq2Seq, |
| Seq2SeqTrainingArguments, |
| Seq2SeqTrainer |
| ) |
|
|
| |
| |
| BASE_DIR = Path(__file__).resolve().parent.parent |
| DATA_ROOT = BASE_DIR / "content" |
| TITLE_ROOT = BASE_DIR / "titles" |
| SUMMARY_ROOT = BASE_DIR / "summary" |
|
|
| MODEL_ID = "google/flan-t5-small" |
| OUTPUT_MODEL_DIR = BASE_DIR / "summarizer" / "models" / "flan_t5_custom" |
|
|
| MAX_INPUT_LEN = 512 |
| MAX_TARGET_LEN = 128 |
|
|
| def load_data(): |
| """Wczytuje dane i tworzy pary: Instrukcja + Tekst -> Wynik.""" |
| dataset_dict = {"input_text": [], "target_text": []} |
| |
| print(f"📂 Szukam danych w: {DATA_ROOT}") |
| |
| |
| files = list(DATA_ROOT.rglob("*.txt")) |
| for txt_file in files: |
| rel_path = txt_file.relative_to(DATA_ROOT) |
| |
| |
| with open(txt_file, "r", encoding="utf-8") as f: |
| ocr_content = f.read().strip() |
| |
| if not ocr_content: continue |
|
|
| |
| t_file = TITLE_ROOT / rel_path |
| if t_file.exists(): |
| with open(t_file, "r", encoding="utf-8") as f: |
| dataset_dict["input_text"].append(f"headline: {ocr_content}") |
| dataset_dict["target_text"].append(f.read().strip()) |
| |
| |
| s_file = SUMMARY_ROOT / rel_path |
| if s_file.exists(): |
| with open(s_file, "r", encoding="utf-8") as f: |
| dataset_dict["input_text"].append(f"summarize: {ocr_content}") |
| dataset_dict["target_text"].append(f.read().strip()) |
| |
| return Dataset.from_dict(dataset_dict) |
|
|
| def main(): |
| |
| raw_dataset = load_data() |
| if len(raw_dataset) == 0: |
| print("❌ Nie znaleziono plików w content/titles/summary. Sprawdź ścieżki.") |
| return |
| |
| dataset = raw_dataset.train_test_split(test_size=0.1) |
| |
| |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
| model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_ID) |
|
|
| def preprocess(examples): |
| inputs = [ex for ex in examples["input_text"]] |
| model_inputs = tokenizer(inputs, max_length=MAX_INPUT_LEN, truncation=True, padding="max_length") |
| |
| labels = tokenizer(text_target=examples["target_text"], max_length=MAX_TARGET_LEN, truncation=True, padding="max_length") |
| model_inputs["labels"] = labels["input_ids"] |
| return model_inputs |
|
|
| tokenized_dataset = dataset.map(preprocess, batched=True) |
|
|
| |
| |
| training_args = Seq2SeqTrainingArguments( |
| output_dir="./tmp_results", |
| eval_strategy="epoch", |
| learning_rate=3e-4, |
| per_device_train_batch_size=8, |
| per_device_eval_batch_size=8, |
| weight_decay=0.01, |
| save_total_limit=2, |
| num_train_epochs=15, |
| predict_with_generate=True, |
| fp16=False, |
| logging_steps=10, |
| |
| generation_max_length=MAX_TARGET_LEN, |
| generation_num_beams=4, |
| ) |
|
|
| |
| trainer = Seq2SeqTrainer( |
| model=model, |
| args=training_args, |
| train_dataset=tokenized_dataset["train"], |
| eval_dataset=tokenized_dataset["test"], |
| tokenizer=tokenizer, |
| data_collator=DataCollatorForSeq2Seq(tokenizer, model=model), |
| ) |
|
|
| print(f"🚀 Rozpoczynam uczenie na {len(raw_dataset)} przykładach...") |
| trainer.train() |
|
|
| |
| os.makedirs(OUTPUT_MODEL_DIR, exist_ok=True) |
| model.save_pretrained(OUTPUT_MODEL_DIR) |
| tokenizer.save_pretrained(OUTPUT_MODEL_DIR) |
| print(f"✨ Model wyuczony i zapisany w: {OUTPUT_MODEL_DIR}") |
|
|
| if __name__ == "__main__": |
| main() |