KorChat / hr_assistant.py
jaczad's picture
Bibliografia już się wyświetla prawidłowo
6805005
# --- Funkcje pomocnicze do obsługi źródeł bibliograficznych ---
import os
import csv
def load_bibliography(file_path="bibliografia.csv"):
"""
Wczytuje dane bibliograficzne z pliku CSV i zwraca słownik: {file_stem: opis}
"""
bibliography = {}
if os.path.exists(file_path):
with open(file_path, mode='r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';')
for row in reader:
if len(row) == 2:
# Usuwanie cudzysłowów z obu kolumn
key = row[1].strip().replace('"', '')
value = row[0].strip().replace('"', '')
bibliography[key] = value
return bibliography
bibliography_data = load_bibliography()
def print_unique_sources(sources: list):
"""
Wypisuje unikalne źródła na podstawie filename, page, section, zamieniając nazwę pliku na opis bibliograficzny jeśli to możliwe.
Jeśli źródło to URL lub hardcoded, wypisuje tytuł lub URL.
"""
unique_sources = []
seen = set()
for source in sources:
key = (source.get('filename'), source.get('page'), source.get('section'))
if key not in seen:
seen.add(key)
unique_sources.append(source)
for i, source in enumerate(unique_sources, 1):
opis = None
filename = source.get('filename')
if filename:
file_stem = os.path.splitext(filename)[0]
# Szukaj opisu w bibliografii wg różnych wariantów
opis = bibliography_data.get(filename)
if not opis:
opis = bibliography_data.get(file_stem + '.pdf')
if not opis:
opis = bibliography_data.get(file_stem)
if not opis:
opis = filename # fallback: sama nazwa pliku
else:
# Jeśli nie ma filename, spróbuj użyć tytułu lub źródła (np. URL)
opis = source.get('title') or source.get('source') or "nieznane źródło"
page = source.get('page', '')
section = source.get('section', '')
print(f"{i}. {opis} (str. {page}) - {section}")
"""
Asystent HR dla pracodawców zatrudniających osoby z niepełnosprawnościami.
Funkcjonalności:
- Wykorzystuje dokumenty PDF jako bazę wiedzy, przetwarza je na wektorową bazę danych (FAISS) w pamięci.
- Pozwala na zadawanie pytań w języku polskim z konwersacyjną pamięcią kontekstu (ChatOpenAI, model GPT-4o-mini).
- Odpowiedzi generowane są wyłącznie na podstawie treści dokumentów PDF, wybranych stron internetowych (z pliku urls.txt) oraz hardkodowanych fragmentów (np. wysokości dofinansowań PFRON).
- Każda odpowiedź zawiera źródło informacji (nazwa pliku PDF, strona, sekcja lub URL).
- Obsługuje interaktywny tryb konsolowy z komendami: stats, clear, quit/exit/q.
- Przetwarza dokumenty PDF z zachowaniem struktury (chunkowanie sekcji, nagłówków, stron).
- Pobiera i przetwarza treści z wybranych stron internetowych (BeautifulSoup, requests).
- Loguje przebieg działania i błędy (logging).
Źródła wiedzy:
- Pliki PDF z katalogu "pdfs/"
- Adresy URL z pliku "urls.txt"
- Hardkodowane fragmenty (np. wysokość dofinansowań PFRON)
Wymagania:
- Python 3.10+
- Klucz API OpenAI (zmienna środowiskowa OPENAI_API_KEY)
- Zainstalowane pakiety: langchain, langchain_openai, langchain_community, fitz (PyMuPDF), requests, beautifulsoup4
Autor: Jacek (2024-2025)
"""
import os
import logging
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
import re
import csv
# LangChain imports (aktualne na 2024-06)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores.faiss import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.prompts import PromptTemplate
# Web scraping
import requests
from bs4 import BeautifulSoup
from datetime import datetime
# PDF processing
import fitz # PyMuPDF
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IntelligentPDFChunker:
"""
Inteligentny chunker dla dokumentów PDF, który respektuje strukturę dokumentu.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150): # Zmniejszone dla lepszej wydajności
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def _detect_structure_markers(self, text: str) -> List[Tuple[int, str, int]]:
"""
Wykrywa markery struktury w tekście (nagłówki, punkty, etc.)
Zwraca listę tupli (pozycja, typ, poziom)
"""
markers = []
# Wzorce dla różnych typów struktury
patterns = [
(r'^#{1,6}\s+(.+)', 'header', lambda m: len(m.group(0).split()[0])),
(r'^##\s+(.+)', 'header', 2),
(r'^###\s+(.+)', 'header', 3),
(r'^####\s+(.+)', 'header', 4),
(r'^#####\s+(.+)', 'header', 5),
(r'^######\s+(.+)', 'header', 6),
(r'^\d+\.\s+(.+)', 'numbered_list', 1),
(r'^•\s+(.+)', 'bullet_list', 1),
(r'^-\s+(.+)', 'bullet_list', 1),
(r'^\*\s+(.+)', 'bullet_list', 1),
(r'^[A-ZĄĆĘŁŃÓŚŹŻ][A-ZĄĆĘŁŃÓŚŹŻ\s]+:$', 'section_title', 1),
(r'^Krok\s+\d+', 'step', 1),
(r'^Rozdział\s+\d+', 'chapter', 1),
(r'^Część\s+\d+', 'part', 1),
]
lines = text.split('\n')
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
for pattern, marker_type, level_func in patterns:
match = re.match(pattern, line, re.IGNORECASE)
if match:
level = level_func(match) if callable(level_func) else level_func
markers.append((i, marker_type, level))
break
return markers
def _extract_pdf_structure(self, pdf_path: str) -> List[Document]:
"""
Ekstraktuje tekst z PDF z zachowaniem struktury dokumentu.
"""
doc = fitz.open(pdf_path)
documents = []
for page_num in range(len(doc)):
page = doc[page_num]
# Pobierz tekst z metadanymi o czcionce
blocks = page.get_text("dict")["blocks"]
page_text = ""
current_section = ""
for block in blocks:
if block.get("type") == 0: # text block
for line in block["lines"]:
line_text = ""
max_size = 0
for span in line["spans"]:
text = span.get("text", "").strip()
if text:
line_text += text + " "
max_size = max(max_size, span.get("size", 0))
if line_text.strip():
# Wykryj nagłówki na podstawie rozmiaru czcionki
if max_size > 12: # Większa czcionka = prawdopodobnie nagłówek
if current_section:
# Zapisz poprzednią sekcję
documents.append(Document(
page_content=page_text.strip(),
metadata={
"source": pdf_path,
"page": page_num + 1,
"section": current_section,
"type": "section"
}
))
page_text = ""
current_section = line_text.strip()
page_text += f"\n## {line_text.strip()}\n"
else:
page_text += line_text.strip() + " "
# Dodaj ostatnią sekcję ze strony
if page_text.strip():
documents.append(Document(
page_content=page_text.strip(),
metadata={
"source": pdf_path,
"page": page_num + 1,
"section": current_section or f"Strona {page_num + 1}",
"type": "section"
}
))
doc.close()
return documents
def chunk_documents(self, documents: List[Document]) -> List[Document]:
"""
Dzieli dokumenty na chunki z zachowaniem struktury.
"""
chunked_docs = []
for doc in documents:
# Jeśli dokument jest mały, zostaw go bez dzielenia
if len(doc.page_content) <= self.chunk_size:
chunked_docs.append(doc)
continue
# Użyj RecursiveCharacterTextSplitter z separatorami strukturalnymi
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=[
"\n## ", # Nagłówki H2
"\n### ", # Nagłówki H3
"\n#### ", # Nagłówki H4
"\n\n", # Podwójny enter
"\n", # Pojedynczy enter
". ", # Koniec zdania
", ", # Przecinek
" ", # Spacja
""
]
)
texts = text_splitter.split_text(doc.page_content)
for i, text in enumerate(texts):
chunk_metadata = doc.metadata.copy()
chunk_metadata["chunk_id"] = i
chunk_metadata["total_chunks"] = len(texts)
chunked_docs.append(Document(
page_content=text,
metadata=chunk_metadata
))
return chunked_docs
class HRAssistant:
"""
Asystent HR dla pracodawców zatrudniających osoby z niepełnosprawnościami.
Wykorzystuje dokumenty PDF jako bazę wiedzy, przetwarza je na wektorową bazę danych (FAISS),
a następnie umożliwia zadawanie pytań w języku polskim z konwersacyjną pamięcią kontekstu.
Odpowiedzi generowane są przez model OpenAI GPT na podstawie treści dokumentów.
Parametry:
openai_api_key (str): Klucz API do OpenAI.
pdf_directory (str): Ścieżka do katalogu z plikami PDF.
"""
def __init__(self, openai_api_key: str, pdf_directory: str = "pdfs"):
self.openai_api_key = openai_api_key
self.pdf_directory = Path(pdf_directory)
# Inicjalizuj komponenty
self.embeddings = OpenAIEmbeddings(
api_key=openai_api_key,
model="text-embedding-3-small",
chunk_size=1000 # Przetwarzanie wsadowe dla osadzeń
)
self.llm = ChatOpenAI(
api_key=openai_api_key,
model="gpt-4o-mini",
temperature=0.3
)
self.chunker = IntelligentPDFChunker(
chunk_size=800,
chunk_overlap=150
)
self.vectorstore = None
self.qa_chain = None
self.memory = ConversationBufferWindowMemory(
k=5,
memory_key="chat_history",
return_messages=True,
output_key="answer",
input_key="question"
)
self._load_and_process_documents()
self._setup_qa_chain()
def _list_pdf_files(self) -> List[Path]:
"""
Listuje pliki PDF w katalogu.
"""
return list(self.pdf_directory.glob("*.pdf"))
def _setup_qa_chain(self):
"""
Tworzy i konfiguruje łańcuch pytań i odpowiedzi (ConversationalRetrievalChain) dla asystenta HR.
Łańcuch ten korzysta z modelu LLM, bazy wektorowej oraz pamięci konwersacyjnej.
Prompt jest zoptymalizowany pod polskie realia HR i bazuje wyłącznie na wiedzy z dokumentów PDF.
Raises:
ValueError: Jeśli baza wektorowa nie została zainicjalizowana.
"""
if not self.vectorstore:
raise ValueError("Baza wektorowa nie została zainicjalizowana.")
prompt_template = (
"Jesteś ekspertem HR specjalizującym się w zatrudnianiu osób z niepełnosprawnościami w Polsce.\n"
"Twoja wiedza opiera się na dostarczonych dokumentach, które mogą zawierać oficjalne poradniki i treści ze stron internetowych.\n\n"
"Kontekst z dokumentów:\n{context}\n\n"
"Historia rozmowy:\n{chat_history}\n\n"
"Pytanie: {question}\n\n"
"Instrukcje:\n"
"1. Odpowiadaj w języku polskim.\n"
"2. Bazuj wyłącznie na informacjach z dostarczonego kontekstu.\n"
"3. Jeśli nie masz informacji w kontekście, powiedz to wprost.\n"
"4. Podawaj konkretne i praktyczne porady.\n"
"5. Bądź pomocny i profesjonalny.\n"
"6. Zawsze podawaj źródło informacji (URL lub nazwa dokumentu PDF), jeśli jest dostępne w metadanych kontekstu.\n\n"
"Odpowiedź:"
)
custom_prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "chat_history", "question"]
)
self.qa_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 8}
),
memory=self.memory,
combine_docs_chain_kwargs={"prompt": custom_prompt},
return_source_documents=True,
output_key="answer"
)
def _load_and_process_documents(self):
"""
Ładuje i przetwarza wszystkie dokumenty (PDF, URL, hardcoded) i tworzy jedną bazę wektorową.
"""
logger.info("Rozpoczynam ładowanie i przetwarzanie wszystkich dokumentów...")
all_documents = []
# 1. Przetwarzanie plików PDF
logger.info("Ładowanie dokumentów PDF...")
pdf_files = self._list_pdf_files()
if not pdf_files:
logger.warning(f"Nie znaleziono plików PDF w katalogu: {self.pdf_directory}. Kontynuuję bez nich.")
else:
logger.info(f"Znaleziono {len(pdf_files)} plików PDF")
for pdf_file in pdf_files:
logger.info(f"Przetwarzanie: {pdf_file.name}")
try:
documents = self.chunker._extract_pdf_structure(str(pdf_file))
for doc in documents:
doc.metadata["filename"] = pdf_file.name
doc.metadata["file_stem"] = pdf_file.stem
all_documents.extend(documents)
except Exception as e:
logger.error(f"Błąd podczas przetwarzania pliku PDF {pdf_file.name}: {e}")
logger.info(f"Wyekstraktowano {len(all_documents)} sekcji z plików PDF.")
# 2. Dodawanie treści z URLi
url_docs = self._get_url_documents()
if url_docs:
all_documents.extend(url_docs)
logger.info(f"Dodano {len(url_docs)} dokumentów z adresów URL.")
# 3. Dodawanie dokumentów hardcoded
hardcoded_docs = self._get_hardcoded_documents()
all_documents.extend(hardcoded_docs)
logger.info(f"Dodano {len(hardcoded_docs)} dokumentów hardcoded.")
if not all_documents:
raise ValueError("Nie znaleziono żadnych dokumentów do przetworzenia. Baza wektorowa nie może zostać utworzona.")
# 4. Chunkowanie wszystkich dokumentów razem
logger.info(f"Rozpoczynam chunkowanie {len(all_documents)} wszystkich zebranych dokumentów...")
chunked_documents = self.chunker.chunk_documents(all_documents)
logger.info(f"Utworzono {len(chunked_documents)} chunków.")
# 5. Tworzenie bazy wektorowej z jednego, dużego wywołania wsadowego
logger.info("Tworzenie bazy wektorowej FAISS...")
self.vectorstore = FAISS.from_documents(
chunked_documents,
self.embeddings
)
logger.info("Baza wektorowa została pomyślnie utworzona.")
def _get_hardcoded_documents(self) -> List[Document]:
"""
Zwraca listę hardkodowanych dokumentów z kluczowymi danymi finansowymi.
"""
hardcoded_financial_info = """
WYSOKOŚĆ DOFINANSOWANIA DO WYNAGRODZEŃ PRACOWNIKÓW NIEPEŁNOSPRAWNYCH Z PFRON
Kwoty miesięcznego dofinansowania do wynagrodzenia pracowników niepełnosprawnych:
1) 2300 zł – w przypadku osób niepełnosprawnych zaliczonych do znacznego stopnia niepełnosprawności;
2) 1350 zł – w przypadku osób niepełnosprawnych zaliczonych do umiarkowanego stopnia niepełnosprawności;
3) 500 zł – w przypadku osób niepełnosprawnych zaliczonych do lekkiego stopnia niepełnosprawności.
Kwoty, o których mowa w ust. 1, zwiększa się o 1050 zł w przypadku osób niepełnosprawnych, w odniesieniu do których orzeczono chorobę psychiczną, upośledzenie umysłowe, całościowe zaburzenia rozwojowe lub epilepsję oraz niewidomych.
Miesięczne dofinansowanie do wynagrodzenia pracownika niepełnosprawnego, zwane dalej „miesięcznym dofinansowaniem", przysługuje w kwocie:
1) 2300 zł – w przypadku osób niepełnosprawnych zaliczonych do znacznego stopnia niepełnosprawności;
2) 1350 zł – w przypadku osób niepełnosprawnych zaliczonych do umiarkowanego stopnia niepełnosprawności;
3) 500 zł – w przypadku osób niepełnosprawnych zaliczonych do lekkiego stopnia niepełnosprawności.
Kwoty, o których mowa powyżej, zwiększa się o 1050 zł w przypadku osób niepełnosprawnych, w odniesieniu do których orzeczono chorobę psychiczną, upośledzenie umysłowe, całościowe zaburzenia rozwojowe lub epilepsję oraz niewidomych.
Wysokość miesięcznego dofinansowania nie może przekroczyć 90% faktycznie poniesionych miesięcznych kosztów płacy, a w przypadku pracodawcy wykonującego działalność gospodarczą, w rozumieniu przepisów o postępowaniu w sprawach dotyczących pomocy publicznej, zwanym dalej "pracodawcą wykonującym działalność gospodarczą", 75% tych kosztów.
"""
hardcoded_doc = Document(
page_content=hardcoded_financial_info,
metadata={
'source': 'https://www.pfron.org.pl/pracodawcy/dofinansowanie-wynagrodzen/wysokosc-dofinansowania-do-wynagrodzen-pracownikow-niepelnosprawnych/',
'title': 'Wysokość dofinansowania do wynagrodzeń pracowników niepełnosprawnych',
'added_date': datetime.now().strftime("%Y-%m-%d"),
'contains_financial_data': True,
'is_hardcoded_financial': True
}
)
common_questions_doc = Document(
page_content="""
NAJCZĘŚCIEJ ZADAWANE PYTANIA O KWOTY DOFINANSOWAŃ Z PFRON
Pytanie: Jaka jest kwota dofinansowania do wynagrodzenia pracownika ze znacznym stopniem niepełnosprawności?
Odpowiedź: 2300 zł miesięcznie.
Pytanie: Jaka jest kwota dofinansowania do wynagrodzenia pracownika z umiarkowanym stopniem niepełnosprawności?
Odpowiedź: 1350 zł miesięcznie.
Pytanie: Jaka jest kwota dofinansowania do wynagrodzenia pracownika z lekkim stopniem niepełnosprawności?
Odpowiedź: 500 zł miesięcznie.
Pytanie: O ile zwiększa się dofinansowanie dla pracowników ze schorzeniami szczególnymi?
Odpowiedź: O 1050 zł miesięcznie w przypadku osób, w odniesieniu do których orzeczono chorobę psychiczną, upośledzenie umysłowe, całościowe zaburzenia rozwojowe lub epilepsję oraz niewidomych.
Pytanie: Jaki jest maksymalny poziom dofinansowania do wynagrodzenia pracownika niepełnosprawnego?
Odpowiedź: Wysokość miesięcznego dofinansowania nie może przekroczyć 90% faktycznie poniesionych miesięcznych kosztów płacy, a w przypadku pracodawcy wykonującego działalność gospodarczą 75% tych kosztów.
""",
metadata={
'source': 'https://www.pfron.org.pl/pracodawcy/dofinansowanie-wynagrodzen/wysokosc-dofinansowania-do-wynagrodzen-pracownikow-niepelnosprawnych/',
'title': 'Najczęściej zadawane pytania o kwoty dofinansowań PFRON',
'added_date': datetime.now().strftime("%Y-%m-%d"),
'contains_financial_data': True,
'is_hardcoded_financial': True
}
)
return [hardcoded_doc, common_questions_doc]
def _get_url_documents(self) -> List[Document]:
"""
Pobiera i przetwarza treści z URLi z pliku urls.txt.
"""
urls_file = 'urls.txt'
if not os.path.exists(urls_file):
logger.warning(f"Plik '{urls_file}' nie został znaleziony. Pomijanie dodawania treści z URLi.")
return []
try:
with open(urls_file, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
except Exception as e:
logger.error(f"Błąd podczas odczytu pliku '{urls_file}': {e}")
return []
if not urls:
logger.warning("Brak URLi do przetworzenia w pliku urls.txt.")
return []
logger.info(f"Znaleziono {len(urls)} adresów URL do przetworzenia.")
url_documents = []
for i, url in enumerate(urls, 1):
try:
logger.info(f"[{i}/{len(urls)}] Przetwarzanie URL: {url}")
response = requests.get(url, timeout=15, headers={'User-Agent': 'Mozilla/5.0'})
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.find('title').get_text().strip() if soup.find('title') else 'Brak tytułu'
content = self._extract_url_content(soup)
if content:
logger.info(f" - Znaleziono treść (rozmiar: {len(content)} znaków).")
metadata = {
'source': url,
'title': title,
'added_date': datetime.now().strftime("%Y-%m-%d")
}
if re.search(r'\d+(?:[.,]\d+)?\s*(?:zł|PLN|złot)', content, re.IGNORECASE):
metadata["contains_financial_data"] = True
url_documents.append(Document(page_content=content, metadata=metadata))
else:
logger.warning(f" - Nie znaleziono treści na stronie {url}.")
except requests.RequestException as e:
logger.error(f" - Błąd podczas pobierania {url}: {e}")
except Exception as e:
logger.error(f" - Nieoczekiwany błąd podczas przetwarzania {url}: {e}")
return url_documents
def _extract_url_content(self, soup: BeautifulSoup) -> str:
"""
Wyodrębnia główną treść tekstową ze strony internetowej (obiektu BeautifulSoup).
Próbuje znaleźć kontenery z główną treścią, a jeśli to się nie uda, pobiera cały tekst.
"""
main_content = []
# Priorytetowe selektory dla stron PFRON i podobnych
priority_selectors = [
'.frame.default',
'.csc-default',
'#c101710',
'.content-main',
'article',
'main',
'.content',
'.main-content',
'.post-content',
'#content',
'#main'
]
found_content = False
for selector in priority_selectors:
elements = soup.select(selector)
if elements:
for element in elements:
main_content.append(element.get_text(separator='\n', strip=True))
found_content = True
break # Znaleziono treść, więc przerwij pętlę
# Jeśli nie znaleziono specyficznych kontenerów, pobierz cały tekst z body
if not found_content:
if soup.body:
main_content.append(soup.body.get_text(separator='\n', strip=True))
return "\n\n".join(main_content)
def ask(self, question: str) -> Dict[str, Any]:
"""
Zadaje pytanie asystentowi.
"""
logger.info(f"Otrzymano pytanie: {question}")
try:
response = self.qa_chain.invoke({
"question": question,
"chat_history": self.memory.chat_memory.messages
})
answer = response["answer"]
# Przygotuj odpowiedź z odpowiednimi źródłami
result = {
"answer": answer,
"sources": [],
"confidence": "medium" # Domyślna pewność, można dostosować
}
if "source_documents" in response:
for doc in response["source_documents"]:
source_info = {
"filename": doc.metadata.get("filename", ""),
"page": doc.metadata.get("page", ""),
"section": doc.metadata.get("section", ""),
"title": doc.metadata.get("title", ""),
"source": doc.metadata.get("source", ""),
"added_date": doc.metadata.get("added_date", ""),
"snippet": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content
}
result["sources"].append(source_info)
return result
except Exception as e:
logger.error(f"Błąd podczas przetwarzania pytania: {e}")
return {
"answer": "Przepraszam, wystąpił błąd podczas przetwarzania Twojego pytania.",
"sources": [],
"confidence": "low",
"error": str(e)
}
def get_stats(self) -> Dict[str, Any]:
"""
Zwraca statystyki asystenta.
"""
return {
"total_documents": self.vectorstore.index.ntotal if self.vectorstore else 0,
"pdf_files": len(self._list_pdf_files()),
"memory_messages": len(self.memory.chat_memory.messages),
"model": "gpt-4o-mini",
"embedding_model": "text-embedding-3-small"
}
def clear_memory(self):
"""
Czyści pamięć konwersacji.
"""
self.memory.clear()
logger.info("Pamięć konwersacji została wyczyszczona")
def print_unique_sources(sources: list):
"""
Wypisuje unikalne źródła na podstawie filename, page, section, zamieniając nazwę pliku na opis bibliograficzny jeśli to możliwe.
Jeśli źródło to URL lub hardcoded, wypisuje tytuł lub URL.
"""
unique_sources = []
seen = set()
for source in sources:
key = (source.get('filename'), source.get('page'), source.get('section'))
if key not in seen:
seen.add(key)
unique_sources.append(source)
for i, source in enumerate(unique_sources, 1):
opis = None
filename = source.get('filename')
if filename:
file_stem = os.path.splitext(filename)[0]
# Szukaj opisu w bibliografii wg różnych wariantów
opis = bibliography_data.get(filename)
if not opis:
opis = bibliography_data.get(file_stem + '.pdf')
if not opis:
opis = bibliography_data.get(file_stem)
if not opis:
opis = filename # fallback: sama nazwa pliku
else:
# Jeśli nie ma filename, spróbuj użyć tytułu lub źródła (np. URL)
opis = source.get('title') or source.get('source') or "nieznane źródło"
page = source.get('page', '')
section = source.get('section', '')
print(f"{i}. {opis} (str. {page}) - {section}")
def handle_command(command: str, assistant: HRAssistant) -> bool:
"""
Obsługuje polecenia specjalne. Zwraca True jeśli należy kontynuować pętlę.
"""
cmd = command.lower()
if cmd in ['quit', 'exit', 'q']:
return False
if cmd == 'stats':
print(f"Statystyki: {assistant.get_stats()}")
return True
if cmd == 'clear':
assistant.clear_memory()
print("Pamięć konwersacji została wyczyszczona")
return True
return None
def main():
"""
Przykład użycia asystenta HR.
"""
# Sprawdź czy ustawiono klucz API
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("Ustaw zmienną środowiskową OPENAI_API_KEY")
# Utwórz asystenta
assistant = HRAssistant(
openai_api_key=api_key,
pdf_directory="pdfs"
)
# Przykładowe pytania
test_questions = [
"Jakie są uprawnienia pracownika z niepełnosprawnością?",
"Jak przeprowadzić rekrutację osoby z niepełnosprawnością?",
"Jakie wsparcie może otrzymać pracodawca zatrudniający osoby z niepełnosprawnościami?",
"Czy osoba z orzeczeniem o całkowitej niezdolności do pracy może być zatrudniona?"
]
print("=== Asystent HR - Zatrudnianie osób z niepełnosprawnościami ===\n")
print(f"Statystyki: {assistant.get_stats()}\n")
print("Dostępne komendy:")
print(" stats - wyświetla statystyki bazy wiedzy")
print(" clear - czyści pamięć konwersacji")
print(" quit/exit/q - kończy program\n")
# Interaktywny tryb
while True:
try:
question = input("\nTwoje pytanie (lub 'quit' aby zakończyć): ")
if not question.strip():
continue
cmd_result = handle_command(question, assistant)
if cmd_result is False:
break
if cmd_result is True:
continue
# Uzyskaj odpowiedź
response = assistant.ask(question)
print(f"\n📝 Odpowiedź:")
print(response["answer"])
# Wyświetl unikalne źródła, jeśli są dostępne
if response.get("sources"):
print("\nŹródła:")
print_unique_sources(response["sources"])
if "error" in response:
print(f"\n⚠️ Błąd: {response['error']}")
except KeyboardInterrupt:
print("\n\nDo widzenia!")
break
except Exception as e:
print(f"\n❌ Błąd: {e}")
if __name__ == "__main__":
main()
logger.info("Pamięć konwersacji została wyczyszczona")
# Usunięto metodę reload_knowledge_base, gdyż baza wiedzy jest teraz tylko w pamięci i nie jest aktualizowana
# Wczytywanie danych bibliograficznych z pliku CSV
bibliography_data = {}
file_path = 'bibliography.csv'
if os.path.exists(file_path):
with open(file_path, mode='r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';')
for row in reader:
if len(row) == 2:
bibliography_data[row[1].strip()] = row[0].strip()
print(bibliography_data)