KorChat / chatbot.py
jaczad's picture
Bibliografia już się wyświetla prawidłowo
6805005
import gradio as gr
import os
import csv
# --- Próba importu HRAssistant ---
try:
from hr_assistant import HRAssistant
hr_import_error = None
except ModuleNotFoundError as e:
hr_import_error = str(e)
# --- Wczytywanie bibliografii ---
def load_bibliography(file_path="bibliografia.csv"):
bibliography = {}
try:
with open(file_path, mode='r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';')
for row in reader:
if len(row) == 2:
# Klucz to nazwa pliku bez rozszerzenia, wartość to pełny opis
bibliography[row[1].strip()] = row[0].strip()
except FileNotFoundError:
print(f"Plik {file_path} nie został znaleziony.")
except Exception as e:
print(f"Błąd podczas wczytywania pliku {file_path}: {e}")
return bibliography
bibliography_data = load_bibliography()
# --- Inicjalizacja asystenta ---
def initialize_assistant():
if hr_import_error:
return None
try:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("Brak klucza OPENAI_API_KEY w zmiennych środowiskowych.")
return None
hr_assistant = HRAssistant(openai_api_key=api_key, pdf_directory="pdfs")
return hr_assistant
except Exception as e:
print(f"Błąd podczas inicjalizacji asystenta HR: {e}")
return None
assistant = initialize_assistant()
# --- Funkcja obsługująca zapytania ---
def respond_to_query(message, history):
if not assistant:
if hr_import_error:
error_message = f"Błąd importu: {hr_import_error}\n\nAby uruchomić aplikację, zainstaluj brakujący moduł:\n\npip install sentence-transformers"
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_message})
return history
else:
error_message = "Asystent HR nie jest dostępny. Sprawdź, czy klucz API został poprawnie skonfigurowany."
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_message})
return history
response = assistant.ask(message)
answer = response.get("answer", "Przepraszam, wystąpił błąd.")
if response.get("sources"):
answer += "\n\n**Źródła:**"
grouped_sources = {}
for source_meta in response["sources"]:
source_key = source_meta.get('source')
if not source_key:
continue
if source_key not in grouped_sources:
grouped_sources[source_key] = {
'type': 'url' if source_key.startswith('http') else 'pdf',
'meta': source_meta,
'pages': set()
}
if 'page' in source_meta and source_meta['page'] is not None:
grouped_sources[source_key]['pages'].add(source_meta['page'])
for key, data in grouped_sources.items():
if data['type'] == 'pdf':
file_stem = data['meta'].get('file_stem', os.path.splitext(os.path.basename(key))[0])
display_name = bibliography_data.get(file_stem, os.path.basename(key))
pages = sorted(list(data['pages']))
pages_str = ""
if pages:
if len(pages) == 1:
pages_str = f"str. {pages[0]}"
else:
pages_str = "str. " + ", ".join(map(str, pages))
answer += f"\n- {display_name} ({pages_str})" if pages_str else f"\n- {display_name}"
elif data['type'] == 'url':
title = data['meta'].get('title', key)
url = key
date = data['meta'].get('added_date', '')
date_str = f" (dodano: {date})" if date else ""
answer += f"\n- [{title}]({url}){date_str}"
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": answer})
return history
# --- Funkcja do czyszczenia pola tekstowego ---
def clear_textbox():
return gr.update(value="")
# --- Funkcja do flagowania ---
def flag_callback(history):
if not history:
gr.Warning("Brak wiadomości do oznaczenia.")
return
# Find the last user message and assistant response
last_user_message = None
last_assistant_message = None
for i in range(len(history) - 1, -1, -1):
msg = history[i]
if msg.get("role") == "assistant" and last_assistant_message is None:
last_assistant_message = msg.get("content", "")
elif msg.get("role") == "user" and last_user_message is None:
last_user_message = msg.get("content", "")
break
if not last_user_message or not last_assistant_message:
gr.Warning("Nie można znaleźć ostatniej wymiany wiadomości.")
return
with open("flagged_responses.txt", "a", encoding="utf-8") as f:
f.write(f"Zapytanie: {last_user_message}\nOdpowiedź: {last_assistant_message}\n")
f.write("-" * 50 + "\n")
gr.Info("Odpowiedź została oznaczona. Dziękujemy za feedback!")
# --- Konfiguracja interfejsu Gradio ---
with gr.Blocks() as demo:
gr.HTML("""
<div style="display: flex; align-items: center; margin-bottom: 20px;">
<img src="file/logo-korektor.png" alt="Logo" style="width: 150px; margin-right: 20px;" />
<div>
<h1>KoREKtor - Asystent HR</h1>
<p>Twój inteligentny partner w zatrudnianiu osób z niepełnosprawnościami.</p>
</div>
</div>
<img src="file/belka.png" alt="Belka" style="width: 100%; margin-bottom: 20px;" />
""")
if hr_import_error:
gr.Markdown(f"**Błąd importu: {hr_import_error}**\n\nAby uruchomić aplikację, zainstaluj brakujący moduł:\n\n`pip install sentence-transformers`")
chatbot = gr.Chatbot(
[],
elem_id="chatbot",
height=500,
show_label=False,
type="messages"
)
with gr.Row():
msg = gr.Textbox(
placeholder="Zadaj pytanie...",
container=False,
scale=9
)
submit = gr.Button("Wyślij", scale=1)
flag = gr.Button("🚩 Oznacz odpowiedź", scale=1)
# Po wysłaniu wiadomości, wywołaj chatbota, a następnie wyczyść pole tekstowe
msg.submit(respond_to_query, [msg, chatbot], chatbot).then(clear_textbox, [], [msg])
submit.click(respond_to_query, [msg, chatbot], chatbot).then(clear_textbox, [], [msg])
# Flagowanie nie wymaga dodatkowych akcji po kliknięciu
flag.click(flag_callback, [chatbot], None)
# --- Uruchomienie aplikacji ---
if __name__ == "__main__":
demo.launch()