| | import os |
| | import json |
| | from pathlib import Path |
| | from langchain_ollama import OllamaLLM |
| |
|
| | |
| | INPUT_DIR = "synthetic_content" |
| | OUTPUT_ROOT = "synthetic_dataset" |
| | HISTORY_FILE = "processed_synthetic_scans_contents.txt" |
| | MODEL_NAME = "llama3" |
| |
|
| | |
| | TARGET_LANGUAGES = { |
| | "pl": "Polish", |
| | "en": "English", |
| | "de": "German", |
| | "fr": "French", |
| | "es": "Spanish", |
| | "it": "Italian", |
| | "uk": "Ukrainian" |
| | } |
| |
|
| | |
| | llm = OllamaLLM(model=MODEL_NAME, temperature=0) |
| |
|
| | |
| | def load_history(): |
| | if not os.path.exists(HISTORY_FILE): |
| | return set() |
| | with open(HISTORY_FILE, 'r', encoding='utf-8') as f: |
| | return set(line.strip() for line in f if line.strip()) |
| |
|
| | def mark_as_done(rel_path): |
| | with open(HISTORY_FILE, 'a', encoding='utf-8') as f: |
| | f.write(f"{rel_path}\n") |
| |
|
| | |
| | def ask_llm_json(prompt): |
| | """Wywołuje LLM w trybie JSON i bezpiecznie parsuje wynik.""" |
| | try: |
| | |
| | response = llm.invoke(prompt, format="json") |
| | return json.loads(response) |
| | except json.JSONDecodeError as e: |
| | print(f"\n ⚠️ Błąd składni JSON od AI: {e}") |
| | return None |
| | except Exception as e: |
| | print(f"\n ⚠️ Błąd komunikacji z LLM: {e}") |
| | return None |
| |
|
| | def ask_llm_text(prompt): |
| | try: |
| | response = llm.invoke(prompt) |
| | return response.strip().strip('"').strip("'") |
| | except Exception: |
| | return "Translation Error" |
| |
|
| | def get_metadata(text, hinted_type): |
| | |
| | prompt = f""" |
| | Analyze this document text. |
| | Folder hint: {hinted_type} |
| | |
| | Return ONLY a JSON object with these keys: |
| | - "title_base": Factual title in ENGLISH (format: "[Type] - [Entity] - [Date]") |
| | - "summary_base": Factual summary in ENGLISH (exactly 5 sentences) |
| | - "category": One of: financial, legal, personal, health, property, other |
| | - "info": Key details (e.g. document ID or service name) |
| | |
| | Ensure all quotes inside the text are properly escaped. |
| | |
| | TEXT: |
| | {text[:3500]} |
| | """ |
| | return ask_llm_json(prompt) |
| |
|
| | def translate_section(text, target_lang, content_type="text"): |
| | prompt = f""" |
| | Translate the following {content_type} into {target_lang}. |
| | Output ONLY the translation. No conversational text or markdown. |
| | |
| | TEXT TO TRANSLATE: |
| | {text} |
| | """ |
| | return ask_llm_text(prompt) |
| |
|
| | def save_output(root, kind, lang, subdir, filename, content): |
| | if lang: |
| | path = Path(root) / kind / lang / subdir |
| | else: |
| | path = Path(root) / kind / subdir |
| | |
| | path.mkdir(parents=True, exist_ok=True) |
| | with open(path / filename, "w", encoding="utf-8") as f: |
| | f.write(str(content)) |
| |
|
| | |
| | def process_file(file_path, input_root): |
| | rel_path = file_path.relative_to(input_root) |
| | base_filename = rel_path.name |
| | sub_dir = rel_path.parent |
| | doc_type = sub_dir.name |
| |
|
| | try: |
| | raw_text = file_path.read_text(encoding='utf-8') |
| | except Exception as e: |
| | print(f" ❌ Błąd odczytu pliku: {e}") |
| | return |
| |
|
| | |
| | meta = get_metadata(raw_text, doc_type) |
| | if not meta or not isinstance(meta, dict): |
| | print(" ❌ Błąd AI: Nie udało się wygenerować poprawnego JSONa.") |
| | return |
| |
|
| | |
| | save_output(OUTPUT_ROOT, "content", None, sub_dir, base_filename, raw_text) |
| | save_output(OUTPUT_ROOT, "category", None, sub_dir, base_filename, meta.get("category", "other")) |
| | save_output(OUTPUT_ROOT, "type", None, sub_dir, base_filename, doc_type) |
| | save_output(OUTPUT_ROOT, "info", None, sub_dir, base_filename, meta.get("info", "none")) |
| |
|
| | base_title = meta.get("title_base", "Document") |
| | base_summary = meta.get("summary_base", "No summary available.") |
| |
|
| | |
| | print(f" 🌍 Tłumaczenie na {len(TARGET_LANGUAGES)} języków...", end="", flush=True) |
| | |
| | for code, lang_name in TARGET_LANGUAGES.items(): |
| | |
| | if code == "en": |
| | title = base_title |
| | else: |
| | title = translate_section(base_title, lang_name, "title") |
| | save_output(OUTPUT_ROOT, "titles", code, sub_dir, base_filename, title) |
| |
|
| | |
| | if code == "en": |
| | summary = base_summary |
| | else: |
| | summary = translate_section(base_summary, lang_name, "summary") |
| | save_output(OUTPUT_ROOT, "summary", code, sub_dir, base_filename, summary) |
| |
|
| | print(".", end="", flush=True) |
| |
|
| | print(" OK") |
| | mark_as_done(str(rel_path)) |
| |
|
| | def main(): |
| | input_path = Path(INPUT_DIR) |
| | if not input_path.exists(): |
| | print(f"❌ Brak folderu wejściowego: {INPUT_DIR}") |
| | return |
| |
|
| | processed = load_history() |
| | print(f"📂 Historia: {len(processed)} plików już przetworzonych.") |
| |
|
| | files = list(input_path.rglob("*.txt")) |
| | print(f"🚀 Start: {len(files)} plików do analizy.") |
| |
|
| | for f in files: |
| | rel_path = str(f.relative_to(input_path)) |
| | |
| | if rel_path in processed: |
| | continue |
| | |
| | print(f"📄 Przetwarzam: {rel_path}") |
| | try: |
| | process_file(f, input_path) |
| | except KeyboardInterrupt: |
| | print("\n🛑 Przerwano ręcznie. Postęp zapisany.") |
| | break |
| | except Exception as e: |
| | print(f"\n❌ Błąd krytyczny przy pliku {rel_path}: {e}") |
| |
|
| | if __name__ == "__main__": |
| | main() |