# backendv1.py # A RAG rendszer motorja: adatfeldolgozás, keresés, generálás és tanulás. # Végleges, refaktorált verzió. Gyors, egylépcsős generálással. import os import time import datetime import json import re from collections import defaultdict from together import Together from elasticsearch import Elasticsearch, exceptions as es_exceptions import torch from sentence_transformers import SentenceTransformer from sentence_transformers.cross_encoder import CrossEncoder from spellchecker import SpellChecker import warnings from dotenv import load_dotenv import sys import nltk from concurrent.futures import ThreadPoolExecutor # === ANSI Színkódok (konzol loggoláshoz) === GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' RESET = '\033[0m' BLUE = '\033[94m' CYAN = '\033[96m' MAGENTA = '\033[95m' # --- Konfiguráció --- CONFIG = { "ELASTIC_PASSWORD": os.environ.get("ES_PASSWORD", "T8xEbqQ4GAPkr73s2knN"), "ELASTIC_HOST": "https://localhost:9200", "VECTOR_INDEX_NAMES": ["duna", "dunawebindexai"], "FEEDBACK_INDEX_NAME": "feedback_index", "ES_CLIENT_TIMEOUT": 90, "EMBEDDING_MODEL_NAME": 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2', "CROSS_ENCODER_MODEL_NAME": 'cross-encoder/mmarco-mMiniLMv2-L12-H384-v1', "TOGETHER_MODEL_NAME": "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", "QUERY_EXPANSION_MODEL": "mistralai/Mixtral-8x7B-Instruct-v0.1", "LLM_CLIENT_TIMEOUT": 120, "NUM_CONTEXT_RESULTS": 5, "RE_RANK_CANDIDATE_COUNT": 50, "RRF_RANK_CONSTANT": 60, "INITIAL_SEARCH_SIZE": 150, "KNN_NUM_CANDIDATES": 200, "MAX_GENERATION_TOKENS": 1024, "GENERATION_TEMPERATURE": 0.6, "USE_QUERY_EXPANSION": True, "SPELLCHECK_LANG": 'hu', "MAX_HISTORY_TURNS": 3, "HUNGARIAN_STOP_WORDS": set( ["a", "az", "egy", "és", "hogy", "ha", "is", "itt", "ki", "mi", "mit", "mikor", "hol", "hogyan", "nem", "ne", "de", "csak", "meg", "megint", "már", "mint", "még", "vagy", "valamint", "van", "volt", "lesz", "kell", "kellett", "lehet", "tud", "tudott", "fog", "fogja", "azt", "ezt", "ott", "ő", "ők", "én", "te", "mi", "ti", "ön", "önök", "maga", "maguk", "ilyen", "olyan", "amely", "amelyek", "aki", "akik", "ahol", "amikor", "mert", "ezért", "akkor", "így", "úgy", "pedig", "illetve", "továbbá", "azonban", "hanem", "viszont", "nélkül", "alatt", "felett", "között", "előtt", "után", "mellett", "bele", "be", "fel", "le", "át", "szembe", "együtt", "mindig", "soha", "gyakran", "néha", "talán", "esetleg", "biztosan", "nagyon", "kicsit", "éppen", "most", "majd", "azután", "először", "utoljára", "igen", "sem", "túl", "kivéve", "szerint"]) } # --- Segédfüggvények --- def correct_spellings(text, spell_checker_instance): """ Kijavítja a helyesírási hibákat a szövegben. """ if not spell_checker_instance: return text try: words = re.findall(r'\b\w+\b', text.lower()) misspelled = spell_checker_instance.unknown(words) if not misspelled: return text corrected_text = text for word in misspelled: correction = spell_checker_instance.correction(word) if correction and correction != word: corrected_text = re.sub(r'\b' + re.escape(word) + r'\b', correction, corrected_text, flags=re.IGNORECASE) return corrected_text except Exception as e: print(f"{RED}Hiba a helyesírás javítása közben: {e}{RESET}") return text def get_query_category_with_llm(client, query): """ LLM-et használ a felhasználói kérdés kategorizálására, előre definiált listából választva. """ if not client: return None print(f" {CYAN}-> Lekérdezés kategorizálása LLM-mel...{RESET}") category_list = ['IT biztonsági szolgáltatások', 'szolgáltatások', 'hardver', 'szoftver', 'hírek', 'audiovizuális konferenciatechnika'] categories_text = ", ".join([f"'{cat}'" for cat in category_list]) prompt = f"""Adott egy felhasználói kérdés. Adj meg egyetlen, rövid kategóriát a következő listából, ami a legjobban jellemzi a kérdést. A válaszodban csak a kategória szerepeljen, más szöveg, magyarázat, vagy írásjelek nélkül. Lehetséges kategóriák: {categories_text} Kérdés: '{query}' Kategória:""" messages = [{"role": "user", "content": prompt}] try: response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.1, max_tokens=30) if response and response.choices: category = response.choices[0].message.content.strip() category = re.sub(r'\(.*?\)', '', category).strip() category = re.sub(r'["\']', '', category).strip() for cat in category_list: if cat.lower() in category.lower(): print(f" {GREEN}-> A kérdés LLM által generált kategóriája: '{cat}'{RESET}") return cat.lower() print(f" {YELLOW}-> Az LLM nem talált megfelelő kategóriát, 'egyéb' kategória használata.{RESET}") return 'egyéb' except Exception as e: print(f"{RED}Hiba LLM kategorizáláskor: {e}{RESET}") return 'egyéb' def expand_or_rewrite_query(original_query, client): """ Bővíti a felhasználói lekérdezést, hogy több releváns találat legyen. """ final_queries = [original_query] if not CONFIG["USE_QUERY_EXPANSION"]: return final_queries print(f" {BLUE}-> Lekérdezés bővítése/átírása...{RESET}") # JAVÍTOTT PROMPT: csak kulcsszavakat kérünk, magyarázat nélkül prompt = f"Adott egy magyar nyelvű felhasználói kérdés: '{original_query}'. Generálj 2 db alternatív, releváns keresőkifejezést. A válaszodban csak ezeket add vissza, vesszővel (,) elválasztva, minden más szöveg nélkül." messages = [{"role": "user", "content": prompt}] try: response = client.chat.completions.create(model=CONFIG["QUERY_EXPANSION_MODEL"], messages=messages, temperature=0.5, max_tokens=100) if response and response.choices: generated_text = response.choices[0].message.content.strip() # Módosítva: eltávolítjuk a felesleges karaktereket és magyarázó szöveget alternatives = [q.strip().replace('"', '').replace("'", '').replace('.', '') for q in generated_text.split(',') if q.strip() and q.strip() != original_query] final_queries.extend(alternatives) print(f" {GREEN}-> Bővített lekérdezések: {final_queries}{RESET}") except Exception as e: print(f"{RED}Hiba a lekérdezés bővítése során: {e}{RESET}") return final_queries def run_separate_searches(es_client, query_text, embedding_model, expanded_queries, query_category=None): """ Párhuzamosan futtatja a kulcsszavas és a kNN kereséseket. """ results = {'knn': {}, 'keyword': {}} es_client_with_timeout = es_client.options(request_timeout=CONFIG["ES_CLIENT_TIMEOUT"]) source_fields = ["text_content", "source_url", "summary", "category"] filters = [] # DRASZTIKUS VÁLTOZTATÁS: # A kategóriaszűrés logikája kikapcsolva. A lekérdezés a teljes indexben fut. # Ha a probléma a szűrésben van, ezzel a lépéssel azonosítható. # A felhasználó igénye szerint vissza lehet kapcsolni, de először a teljes működését kell biztosítani. # if query_category and query_category != 'egyéb': # print(f" {MAGENTA}-> Kategória-alapú szűrés hozzáadása a kereséshez: '{query_category}'{RESET}") # filters.append({"match": {"category": query_category}}) def knn_search(index, query_vector): try: knn_query = {"field": "embedding", "query_vector": query_vector, "k": CONFIG["INITIAL_SEARCH_SIZE"], "num_candidates": CONFIG["KNN_NUM_CANDIDATES"], "filter": filters} response = es_client_with_timeout.search(index=index, knn=knn_query, _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"]) return index, response.get('hits', {}).get('hits', []) except Exception as e: print(f"{RED}Hiba kNN keresés során ({index}): {e}{RESET}") return index, [] def keyword_search(index, expanded_queries): try: should_clauses = [] for q in expanded_queries: should_clauses.append({"match": {"text_content": {"query": q, "operator": "OR", "fuzziness": "AUTO"}}}) query_body = {"query": {"bool": {"should": should_clauses, "minimum_should_match": 1, "filter": filters}}} response = es_client_with_timeout.search(index=index, query=query_body['query'], _source=source_fields, size=CONFIG["INITIAL_SEARCH_SIZE"]) return index, response.get('hits', {}).get('hits', []) except Exception as e: print(f"{RED}Hiba kulcsszavas keresés során ({index}): {e}{RESET}") return index, [] query_vector = None try: query_vector = embedding_model.encode(query_text, normalize_embeddings=True).tolist() except Exception as e: print(f"{RED}Hiba az embedding generálásakor: {e}{RESET}") with ThreadPoolExecutor(max_workers=len(CONFIG["VECTOR_INDEX_NAMES"]) * 2) as executor: knn_futures = {executor.submit(knn_search, index, query_vector) for index in CONFIG["VECTOR_INDEX_NAMES"] if query_vector} keyword_futures = {executor.submit(keyword_search, index, expanded_queries) for index in CONFIG["VECTOR_INDEX_NAMES"]} for future in knn_futures: index, hits = future.result() results['knn'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)] for future in keyword_futures: index, hits = future.result() results['keyword'][index] = [(rank + 1, hit) for rank, hit in enumerate(hits)] # ÚJ LOGOLÁS: Kiírjuk a keresési találatok számát total_knn_hits = sum(len(h) for h in results['knn'].values()) total_keyword_hits = sum(len(h) for h in results['keyword'].values()) print(f"{CYAN}Vektorkeresési találatok száma: {total_knn_hits}{RESET}") print(f"{CYAN}Kulcsszavas keresési találatok száma: {total_keyword_hits}{RESET}") return results def merge_results_rrf(search_results): """ Egyesíti a keresési eredményeket az RRF algoritmussal. """ rrf_scores = defaultdict(float) all_hits_data = {} for search_type in search_results: for index_name in search_results[search_type]: for rank, hit in search_results[search_type][index_name]: doc_id = hit['_id'] rrf_scores[doc_id] += 1.0 / (CONFIG["RRF_RANK_CONSTANT"] + rank) if doc_id not in all_hits_data: all_hits_data[doc_id] = hit combined_results = [(doc_id, score, all_hits_data[doc_id]) for doc_id, score in rrf_scores.items()] combined_results.sort(key=lambda item: item[1], reverse=True) # ÚJ LOGOLÁS: Kiírjuk az RRF által rangsorolt top 5 pontszámot print( f"{CYAN}RRF által rangsorolt Top 5 pontszám: {[f'{score:.4f}' for doc_id, score, hit in combined_results[:5]]}{RESET}") return combined_results def retrieve_context_reranked(backend, query_text, confidence_threshold, fallback_message, query_category): """ Lekéri a kontextust a rangsorolás után. """ es_client = backend["es_client"] embedding_model = backend["embedding_model"] cross_encoder = backend["cross_encoder"] llm_client = backend["llm_client"] # DRASZTIKUS VÁLTOZTATÁS: A kategória-alapú szűrés kikapcsolva. expanded_queries = expand_or_rewrite_query(query_text, llm_client) search_results = run_separate_searches(es_client, query_text, embedding_model, expanded_queries) merged_results = merge_results_rrf(search_results) top_score = None if not merged_results: print(f"{YELLOW}A keresés nem hozott eredményt.{RESET}") return fallback_message, [], top_score candidates_to_rerank = merged_results[:CONFIG["RE_RANK_CANDIDATE_COUNT"]] hits_data_for_reranking = [hit for _, _, hit in candidates_to_rerank] query_chunk_pairs = [[query_text, hit['_source'].get('summary', hit['_source'].get('text_content'))] for hit in hits_data_for_reranking if hit and '_source' in hit] ranked_by_ce = [] if cross_encoder and query_chunk_pairs: try: ce_scores = cross_encoder.predict(query_chunk_pairs, show_progress_bar=False) ranked_by_ce = sorted(zip(ce_scores, hits_data_for_reranking), key=lambda x: x[0], reverse=True) print(f"{CYAN}Cross-Encoder pontszámok (Top 5):{RESET} {[f'{score:.4f}' for score, _ in ranked_by_ce[:5]]}") except Exception as e: print(f"{RED}Hiba a Cross-Encoder során: {e}{RESET}") ranked_by_ce = [] if not ranked_by_ce and candidates_to_rerank: print(f"{YELLOW}[INFO] Cross-Encoder nem futott, RRF sorrend használata.{RESET}") ranked_by_ce = sorted([(score, hit) for _, score, hit in candidates_to_rerank], key=lambda x: x[0], reverse=True) if not ranked_by_ce: return fallback_message, [], top_score top_score = float(ranked_by_ce[0][0]) print(f"{GREEN}Legjobb találat pontszáma: {top_score:.4f}{RESET}") if top_score < confidence_threshold: print( f"{YELLOW}A legjobb találat pontszáma ({top_score:.4f}) nem érte el a beállított küszöböt ({confidence_threshold}). A folyamat leáll.{RESET}") dynamic_fallback = ( f"{fallback_message}\n\n" f"A '{query_text}' kérdésre a legjobb találat megbízhatósági pontszáma ({top_score:.2f}) " f"nem érte el a beállított küszöböt ({confidence_threshold:.2f})." ) return dynamic_fallback, [], top_score print(f"{GREEN}A Cross-Encoder magabiztos (legjobb score: {top_score:.4f}). A rangsorát használjuk.{RESET}") final_hits_for_context = [hit for _, hit in ranked_by_ce[:CONFIG["NUM_CONTEXT_RESULTS"]]] context_parts = [hit['_source'].get('summary', hit['_source'].get('text_content')) for hit in final_hits_for_context if hit and '_source' in hit and (hit['_source'].get('summary') or hit['_source'].get('text_content'))] context_string = "\n\n---\n\n".join(context_parts) sources = [] for hit_data in final_hits_for_context: if hit_data and '_source' in hit_data: source_info = { "url": hit_data['_source'].get('source_url', hit_data.get('_index', '?')), "content": hit_data['_source'].get('text_content', 'N/A') } if source_info not in sources: sources.append(source_info) return context_string, sources, top_score def generate_answer_with_history(client, model_name, messages, temperature): """ Válasz generálása LLM-mel, figyelembe véve az előzményeket. """ try: response = client.chat.completions.create( model=model_name, messages=messages, temperature=temperature, max_tokens=CONFIG["MAX_GENERATION_TOKENS"], timeout=CONFIG["LLM_CLIENT_TIMEOUT"] ) if response and response.choices: return response.choices[0].message.content.strip() return "Hiba: Nem érkezett érvényes válasz az AI modelltől." except Exception as e: error_message = str(e) if "429" in error_message: wait_time = 100 print(f"{YELLOW}Rate limit elérve. A program vár {wait_time} másodpercet...{RESET}") time.sleep(wait_time) return generate_answer_with_history(client, model_name, messages, temperature) print(f"{RED}Hiba a válasz generálásakor: {e}{RESET}") return "Hiba történt az AI modell hívásakor." def search_in_feedback_index(es_client, embedding_model, question, min_score=0.75): """ Keres a visszajelzési adatbázisban a hasonló kérdésekre. """ try: embedding = embedding_model.encode(question, normalize_embeddings=True).tolist() knn_query = {"field": "embedding", "query_vector": embedding, "k": 1, "num_candidates": 10} response = es_client.search(index=CONFIG["FEEDBACK_INDEX_NAME"], knn=knn_query, _source=["question_text", "correction_text"]) hits = response.get('hits', {}).get('hits', []) if hits and hits[0]['_score'] >= min_score: top_hit = hits[0] source = top_hit['_source'] score = top_hit['_score'] if score > 0.98: return "direct_answer", source['correction_text'] instruction = f"Egy nagyon hasonló kérdésre ('{source['question_text']}') korábban a következő javítást/iránymutatást adtad: '{source['correction_text']}'. A válaszodat elsősorban ez alapján alkosd meg, még akkor is, ha a talált kontextus mást sugall!" return "instruction", instruction return None, None except es_exceptions.NotFoundError: return None, None except Exception: return None, None def index_feedback(es_client, embedding_model, question, correction): """ Indexeli a visszajelzést. """ try: embedding = embedding_model.encode(question, normalize_embeddings=True).tolist() doc = {"question_text": question, "correction_text": correction, "embedding": embedding, "timestamp": datetime.datetime.now()} es_client.index(index=CONFIG["FEEDBACK_INDEX_NAME"], document=doc) print(f"Visszajelzés sikeresen indexelve a '{CONFIG['FEEDBACK_INDEX_NAME']}' indexbe.") return True except Exception as e: print(f"{RED}Hiba a visszajelzés indexelése során: {e}{RESET}") return False def get_all_feedback(es_client, index_name): """ Lekéri az összes visszajelzést. """ try: response = es_client.search(index=index_name, query={"match_all": {}}, size=1000, sort=[{"timestamp": {"order": "desc"}}]) return response.get('hits', {}).get('hits', []) except es_exceptions.NotFoundError: return [] except Exception as e: print(f"{RED}Hiba a visszajelzések listázása során: {e}{RESET}") return [] def delete_feedback_by_id(es_client, index_name, doc_id): """ Töröl egy visszajelzést ID alapján. """ try: es_client.delete(index=index_name, id=doc_id) return True except Exception as e: print(f"{RED}Hiba a visszajelzés törlése során (ID: {doc_id}): {e}{RESET}") return False def update_feedback_comment(es_client, index_name, doc_id, new_comment): """ Frissít egy visszajelzést ID alapján. """ try: es_client.update(index=index_name, id=doc_id, doc={"correction_text": new_comment}) return True except Exception as e: print(f"{RED}Hiba a visszajelzés szerkesztése során (ID: {doc_id}): {e}{RESET}") return False def initialize_backend(): """ Inicializálja a backend komponenseit. """ print("----- Backend Motor Inicializálása -----") load_dotenv() try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt', quiet=True) warnings.filterwarnings("ignore", message=".*verify_certs=False.*") spell_checker = None try: spell_checker = SpellChecker(language=CONFIG["SPELLCHECK_LANG"]) custom_words = ["dunaelektronika", "kft", "outsourcing", "dell", "lenovo", "nis2", "szerver", "kliens", "hálózati", "hpe"] spell_checker.word_frequency.load_words(custom_words) except Exception as e: print(f"{RED}Helyesírás-ellenőrző hiba: {e}{RESET}") backend_objects = { "es_client": Elasticsearch(CONFIG["ELASTIC_HOST"], basic_auth=("elastic", CONFIG["ELASTIC_PASSWORD"]), verify_certs=False), "embedding_model": SentenceTransformer(CONFIG["EMBEDDING_MODEL_NAME"], device='cuda' if torch.cuda.is_available() else 'cpu'), "cross_encoder": CrossEncoder(CONFIG["CROSS_ENCODER_MODEL_NAME"], device='cuda' if torch.cuda.is_available() else 'cpu'), "llm_client": Together(api_key=os.getenv("TOGETHER_API_KEY")), "spell_checker": spell_checker } print(f"{GREEN}----- Backend Motor Készen Áll -----{RESET}") return backend_objects def process_query(user_question, chat_history, backend, confidence_threshold, fallback_message): """ A teljes lekérdezés-feldolgozási munkafolyamatot vezérli. """ print(f"\n{BLUE}----- Új lekérdezés feldolgozása ----{RESET}") print(f"{BLUE}Kérdés: {user_question}{RESET}") corrected_question = correct_spellings(user_question, backend["spell_checker"]) print(f"{BLUE}Javított kérdés: {corrected_question}{RESET}") feedback_type, feedback_content = search_in_feedback_index( backend["es_client"], backend["embedding_model"], corrected_question ) if feedback_type == "direct_answer": print(f"{GREEN}Direkt válasz a visszajelzési adatbázisból.{RESET}") return { "answer": feedback_content, "sources": [ {"url": "Személyes visszajelzés alapján", "content": "Ez egy korábban megadott, pontosított válasz."}], "corrected_question": corrected_question, "confidence_score": 10.0 } feedback_instructions = feedback_content if feedback_type == "instruction" else None query_category = get_query_category_with_llm(backend["llm_client"], corrected_question) retrieved_context, sources, confidence_score = retrieve_context_reranked(backend, corrected_question, confidence_threshold, fallback_message, query_category) if not sources and not feedback_instructions: return { "answer": retrieved_context, "sources": [], "corrected_question": corrected_question, "confidence_score": confidence_score } prompt_instructions = "" if feedback_instructions: prompt_instructions = f""" KÜLÖNLEGESEN FONTOS FEJLESZTŐI UTASÍTÁS (ezt vedd figyelembe a leginkább!): --- {feedback_instructions} --- """ system_prompt = f"""Te egy professzionális, segítőkész AI asszisztens vagy. A feladatod, hogy a KONTEXTUS-ból és a FEJLESZTŐI UTASÍTÁSOKBÓL származó információkat egyetlen, jól strukturált és ismétlés-mentes válasszá szintetizálld. {prompt_instructions} KRITIKUS SZABÁLY: Értékeld a kapott KONTEXTUS relevanciáját a felhasználó kérdéséhez képest. Ha egy kontextus-részlet nem kapcsolódik szorosan a kérdéshez, azt hagyd figyelmen kívül! FIGYELEM: Szigorúan csak a megadott KONTEXTUS-ra és a fejlesztői utasításokra támaszkodj. Ha a releváns információk alapján nem tudsz válaszolni, add ezt a választ: '{fallback_message}' KONTEXTUS: --- {retrieved_context if sources else "A tudásbázisban nem található releváns információ."} --- ELŐZMÉNYEK (ha releváns): Lásd a korábbi üzeneteket. """ messages_for_llm = [] if chat_history: messages_for_llm.extend(chat_history[-(CONFIG["MAX_HISTORY_TURNS"] * 2):]) messages_for_llm.append({"role": "system", "content": system_prompt}) messages_for_llm.append({"role": "user", "content": corrected_question}) answer = generate_answer_with_history( backend["llm_client"], CONFIG["TOGETHER_MODEL_NAME"], messages_for_llm, CONFIG["GENERATION_TEMPERATURE"] ) return { "answer": answer, "sources": sources, "corrected_question": corrected_question, "confidence_score": confidence_score }