|
|
import gradio as gr |
|
|
import os |
|
|
import csv |
|
|
|
|
|
|
|
|
try: |
|
|
from hr_assistant import HRAssistant |
|
|
hr_import_error = None |
|
|
except ModuleNotFoundError as e: |
|
|
hr_import_error = str(e) |
|
|
|
|
|
|
|
|
def load_bibliography(file_path="bibliografia.csv"): |
|
|
bibliography = {} |
|
|
try: |
|
|
with open(file_path, mode='r', encoding='utf-8') as csvfile: |
|
|
reader = csv.reader(csvfile, delimiter=';') |
|
|
for row in reader: |
|
|
if len(row) == 2: |
|
|
|
|
|
bibliography[row[1].strip()] = row[0].strip() |
|
|
except FileNotFoundError: |
|
|
print(f"Plik {file_path} nie został znaleziony.") |
|
|
except Exception as e: |
|
|
print(f"Błąd podczas wczytywania pliku {file_path}: {e}") |
|
|
return bibliography |
|
|
|
|
|
bibliography_data = load_bibliography() |
|
|
|
|
|
|
|
|
def initialize_assistant(): |
|
|
if hr_import_error: |
|
|
return None |
|
|
|
|
|
try: |
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
print("Brak klucza OPENAI_API_KEY w zmiennych środowiskowych.") |
|
|
return None |
|
|
hr_assistant = HRAssistant(openai_api_key=api_key, pdf_directory="pdfs") |
|
|
return hr_assistant |
|
|
except Exception as e: |
|
|
print(f"Błąd podczas inicjalizacji asystenta HR: {e}") |
|
|
return None |
|
|
|
|
|
assistant = initialize_assistant() |
|
|
|
|
|
|
|
|
def respond_to_query(message, history): |
|
|
if not assistant: |
|
|
if hr_import_error: |
|
|
error_message = f"Błąd importu: {hr_import_error}\n\nAby uruchomić aplikację, zainstaluj brakujący moduł:\n\npip install sentence-transformers" |
|
|
history.append({"role": "user", "content": message}) |
|
|
history.append({"role": "assistant", "content": error_message}) |
|
|
return history |
|
|
else: |
|
|
error_message = "Asystent HR nie jest dostępny. Sprawdź, czy klucz API został poprawnie skonfigurowany." |
|
|
history.append({"role": "user", "content": message}) |
|
|
history.append({"role": "assistant", "content": error_message}) |
|
|
return history |
|
|
|
|
|
response = assistant.ask(message) |
|
|
answer = response.get("answer", "Przepraszam, wystąpił błąd.") |
|
|
|
|
|
if response.get("sources"): |
|
|
answer += "\n\n**Źródła:**" |
|
|
|
|
|
grouped_sources = {} |
|
|
for source_meta in response["sources"]: |
|
|
source_key = source_meta.get('source') |
|
|
if not source_key: |
|
|
continue |
|
|
|
|
|
if source_key not in grouped_sources: |
|
|
grouped_sources[source_key] = { |
|
|
'type': 'url' if source_key.startswith('http') else 'pdf', |
|
|
'meta': source_meta, |
|
|
'pages': set() |
|
|
} |
|
|
|
|
|
if 'page' in source_meta and source_meta['page'] is not None: |
|
|
grouped_sources[source_key]['pages'].add(source_meta['page']) |
|
|
|
|
|
for key, data in grouped_sources.items(): |
|
|
if data['type'] == 'pdf': |
|
|
file_stem = data['meta'].get('file_stem', os.path.splitext(os.path.basename(key))[0]) |
|
|
display_name = bibliography_data.get(file_stem, os.path.basename(key)) |
|
|
|
|
|
pages = sorted(list(data['pages'])) |
|
|
pages_str = "" |
|
|
if pages: |
|
|
if len(pages) == 1: |
|
|
pages_str = f"str. {pages[0]}" |
|
|
else: |
|
|
pages_str = "str. " + ", ".join(map(str, pages)) |
|
|
|
|
|
answer += f"\n- {display_name} ({pages_str})" if pages_str else f"\n- {display_name}" |
|
|
|
|
|
elif data['type'] == 'url': |
|
|
title = data['meta'].get('title', key) |
|
|
url = key |
|
|
date = data['meta'].get('added_date', '') |
|
|
date_str = f" (dodano: {date})" if date else "" |
|
|
answer += f"\n- [{title}]({url}){date_str}" |
|
|
|
|
|
|
|
|
history.append({"role": "user", "content": message}) |
|
|
history.append({"role": "assistant", "content": answer}) |
|
|
return history |
|
|
|
|
|
|
|
|
def clear_textbox(): |
|
|
return gr.update(value="") |
|
|
|
|
|
|
|
|
def flag_callback(history): |
|
|
if not history: |
|
|
gr.Warning("Brak wiadomości do oznaczenia.") |
|
|
return |
|
|
|
|
|
|
|
|
last_user_message = None |
|
|
last_assistant_message = None |
|
|
|
|
|
for i in range(len(history) - 1, -1, -1): |
|
|
msg = history[i] |
|
|
if msg.get("role") == "assistant" and last_assistant_message is None: |
|
|
last_assistant_message = msg.get("content", "") |
|
|
elif msg.get("role") == "user" and last_user_message is None: |
|
|
last_user_message = msg.get("content", "") |
|
|
break |
|
|
|
|
|
if not last_user_message or not last_assistant_message: |
|
|
gr.Warning("Nie można znaleźć ostatniej wymiany wiadomości.") |
|
|
return |
|
|
|
|
|
with open("flagged_responses.txt", "a", encoding="utf-8") as f: |
|
|
f.write(f"Zapytanie: {last_user_message}\nOdpowiedź: {last_assistant_message}\n") |
|
|
f.write("-" * 50 + "\n") |
|
|
gr.Info("Odpowiedź została oznaczona. Dziękujemy za feedback!") |
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.HTML(""" |
|
|
<div style="display: flex; align-items: center; margin-bottom: 20px;"> |
|
|
<img src="file/logo-korektor.png" alt="Logo" style="width: 150px; margin-right: 20px;" /> |
|
|
<div> |
|
|
<h1>KoREKtor - Asystent HR</h1> |
|
|
<p>Twój inteligentny partner w zatrudnianiu osób z niepełnosprawnościami.</p> |
|
|
</div> |
|
|
</div> |
|
|
<img src="file/belka.png" alt="Belka" style="width: 100%; margin-bottom: 20px;" /> |
|
|
""") |
|
|
|
|
|
if hr_import_error: |
|
|
gr.Markdown(f"**Błąd importu: {hr_import_error}**\n\nAby uruchomić aplikację, zainstaluj brakujący moduł:\n\n`pip install sentence-transformers`") |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
|
[], |
|
|
elem_id="chatbot", |
|
|
height=500, |
|
|
show_label=False, |
|
|
type="messages" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
msg = gr.Textbox( |
|
|
placeholder="Zadaj pytanie...", |
|
|
container=False, |
|
|
scale=9 |
|
|
) |
|
|
submit = gr.Button("Wyślij", scale=1) |
|
|
|
|
|
flag = gr.Button("🚩 Oznacz odpowiedź", scale=1) |
|
|
|
|
|
|
|
|
msg.submit(respond_to_query, [msg, chatbot], chatbot).then(clear_textbox, [], [msg]) |
|
|
submit.click(respond_to_query, [msg, chatbot], chatbot).then(clear_textbox, [], [msg]) |
|
|
|
|
|
|
|
|
flag.click(flag_callback, [chatbot], None) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|
|
|
|
|
|
|