|
|
|
|
|
import os |
|
|
import csv |
|
|
def load_bibliography(file_path="bibliografia.csv"): |
|
|
""" |
|
|
Wczytuje dane bibliograficzne z pliku CSV i zwraca słownik: {file_stem: opis} |
|
|
""" |
|
|
bibliography = {} |
|
|
if os.path.exists(file_path): |
|
|
with open(file_path, mode='r', encoding='utf-8') as csvfile: |
|
|
reader = csv.reader(csvfile, delimiter=';') |
|
|
for row in reader: |
|
|
if len(row) == 2: |
|
|
|
|
|
key = row[1].strip().replace('"', '') |
|
|
value = row[0].strip().replace('"', '') |
|
|
bibliography[key] = value |
|
|
return bibliography |
|
|
|
|
|
bibliography_data = load_bibliography() |
|
|
|
|
|
def print_unique_sources(sources: list): |
|
|
""" |
|
|
Wypisuje unikalne źródła na podstawie filename, page, section, zamieniając nazwę pliku na opis bibliograficzny jeśli to możliwe. |
|
|
Jeśli źródło to URL lub hardcoded, wypisuje tytuł lub URL. |
|
|
""" |
|
|
unique_sources = [] |
|
|
seen = set() |
|
|
for source in sources: |
|
|
key = (source.get('filename'), source.get('page'), source.get('section')) |
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
unique_sources.append(source) |
|
|
for i, source in enumerate(unique_sources, 1): |
|
|
opis = None |
|
|
filename = source.get('filename') |
|
|
if filename: |
|
|
file_stem = os.path.splitext(filename)[0] |
|
|
|
|
|
opis = bibliography_data.get(filename) |
|
|
if not opis: |
|
|
opis = bibliography_data.get(file_stem + '.pdf') |
|
|
if not opis: |
|
|
opis = bibliography_data.get(file_stem) |
|
|
if not opis: |
|
|
opis = filename |
|
|
else: |
|
|
|
|
|
opis = source.get('title') or source.get('source') or "nieznane źródło" |
|
|
page = source.get('page', '') |
|
|
section = source.get('section', '') |
|
|
print(f"{i}. {opis} (str. {page}) - {section}") |
|
|
""" |
|
|
Asystent HR dla pracodawców zatrudniających osoby z niepełnosprawnościami. |
|
|
|
|
|
Funkcjonalności: |
|
|
- Wykorzystuje dokumenty PDF jako bazę wiedzy, przetwarza je na wektorową bazę danych (FAISS) w pamięci. |
|
|
- Pozwala na zadawanie pytań w języku polskim z konwersacyjną pamięcią kontekstu (ChatOpenAI, model GPT-4o-mini). |
|
|
- Odpowiedzi generowane są wyłącznie na podstawie treści dokumentów PDF, wybranych stron internetowych (z pliku urls.txt) oraz hardkodowanych fragmentów (np. wysokości dofinansowań PFRON). |
|
|
- Każda odpowiedź zawiera źródło informacji (nazwa pliku PDF, strona, sekcja lub URL). |
|
|
- Obsługuje interaktywny tryb konsolowy z komendami: stats, clear, quit/exit/q. |
|
|
- Przetwarza dokumenty PDF z zachowaniem struktury (chunkowanie sekcji, nagłówków, stron). |
|
|
- Pobiera i przetwarza treści z wybranych stron internetowych (BeautifulSoup, requests). |
|
|
- Loguje przebieg działania i błędy (logging). |
|
|
|
|
|
Źródła wiedzy: |
|
|
- Pliki PDF z katalogu "pdfs/" |
|
|
- Adresy URL z pliku "urls.txt" |
|
|
- Hardkodowane fragmenty (np. wysokość dofinansowań PFRON) |
|
|
|
|
|
Wymagania: |
|
|
- Python 3.10+ |
|
|
- Klucz API OpenAI (zmienna środowiskowa OPENAI_API_KEY) |
|
|
- Zainstalowane pakiety: langchain, langchain_openai, langchain_community, fitz (PyMuPDF), requests, beautifulsoup4 |
|
|
|
|
|
Autor: Jacek (2024-2025) |
|
|
""" |
|
|
|
|
|
import os |
|
|
import logging |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
from pathlib import Path |
|
|
import re |
|
|
import csv |
|
|
|
|
|
|
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_openai import OpenAIEmbeddings, ChatOpenAI |
|
|
from langchain_community.vectorstores.faiss import FAISS |
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
|
from langchain_core.documents import Document |
|
|
from langchain.chains import ConversationalRetrievalChain |
|
|
from langchain.memory import ConversationBufferWindowMemory |
|
|
from langchain_core.prompts import PromptTemplate |
|
|
|
|
|
|
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
import fitz |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class IntelligentPDFChunker: |
|
|
""" |
|
|
Inteligentny chunker dla dokumentów PDF, który respektuje strukturę dokumentu. |
|
|
""" |
|
|
|
|
|
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150): |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
|
|
|
def _detect_structure_markers(self, text: str) -> List[Tuple[int, str, int]]: |
|
|
""" |
|
|
Wykrywa markery struktury w tekście (nagłówki, punkty, etc.) |
|
|
Zwraca listę tupli (pozycja, typ, poziom) |
|
|
""" |
|
|
markers = [] |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
(r'^#{1,6}\s+(.+)', 'header', lambda m: len(m.group(0).split()[0])), |
|
|
(r'^##\s+(.+)', 'header', 2), |
|
|
(r'^###\s+(.+)', 'header', 3), |
|
|
(r'^####\s+(.+)', 'header', 4), |
|
|
(r'^#####\s+(.+)', 'header', 5), |
|
|
(r'^######\s+(.+)', 'header', 6), |
|
|
(r'^\d+\.\s+(.+)', 'numbered_list', 1), |
|
|
(r'^•\s+(.+)', 'bullet_list', 1), |
|
|
(r'^-\s+(.+)', 'bullet_list', 1), |
|
|
(r'^\*\s+(.+)', 'bullet_list', 1), |
|
|
(r'^[A-ZĄĆĘŁŃÓŚŹŻ][A-ZĄĆĘŁŃÓŚŹŻ\s]+:$', 'section_title', 1), |
|
|
(r'^Krok\s+\d+', 'step', 1), |
|
|
(r'^Rozdział\s+\d+', 'chapter', 1), |
|
|
(r'^Część\s+\d+', 'part', 1), |
|
|
] |
|
|
|
|
|
lines = text.split('\n') |
|
|
for i, line in enumerate(lines): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
for pattern, marker_type, level_func in patterns: |
|
|
match = re.match(pattern, line, re.IGNORECASE) |
|
|
if match: |
|
|
level = level_func(match) if callable(level_func) else level_func |
|
|
markers.append((i, marker_type, level)) |
|
|
break |
|
|
|
|
|
return markers |
|
|
|
|
|
def _extract_pdf_structure(self, pdf_path: str) -> List[Document]: |
|
|
""" |
|
|
Ekstraktuje tekst z PDF z zachowaniem struktury dokumentu. |
|
|
""" |
|
|
doc = fitz.open(pdf_path) |
|
|
documents = [] |
|
|
|
|
|
for page_num in range(len(doc)): |
|
|
page = doc[page_num] |
|
|
|
|
|
|
|
|
blocks = page.get_text("dict")["blocks"] |
|
|
|
|
|
page_text = "" |
|
|
current_section = "" |
|
|
|
|
|
for block in blocks: |
|
|
if block.get("type") == 0: |
|
|
for line in block["lines"]: |
|
|
line_text = "" |
|
|
max_size = 0 |
|
|
|
|
|
for span in line["spans"]: |
|
|
text = span.get("text", "").strip() |
|
|
if text: |
|
|
line_text += text + " " |
|
|
max_size = max(max_size, span.get("size", 0)) |
|
|
|
|
|
if line_text.strip(): |
|
|
|
|
|
if max_size > 12: |
|
|
if current_section: |
|
|
|
|
|
documents.append(Document( |
|
|
page_content=page_text.strip(), |
|
|
metadata={ |
|
|
"source": pdf_path, |
|
|
"page": page_num + 1, |
|
|
"section": current_section, |
|
|
"type": "section" |
|
|
} |
|
|
)) |
|
|
page_text = "" |
|
|
|
|
|
current_section = line_text.strip() |
|
|
page_text += f"\n## {line_text.strip()}\n" |
|
|
else: |
|
|
page_text += line_text.strip() + " " |
|
|
|
|
|
|
|
|
if page_text.strip(): |
|
|
documents.append(Document( |
|
|
page_content=page_text.strip(), |
|
|
metadata={ |
|
|
"source": pdf_path, |
|
|
"page": page_num + 1, |
|
|
"section": current_section or f"Strona {page_num + 1}", |
|
|
"type": "section" |
|
|
} |
|
|
)) |
|
|
|
|
|
doc.close() |
|
|
return documents |
|
|
|
|
|
def chunk_documents(self, documents: List[Document]) -> List[Document]: |
|
|
""" |
|
|
Dzieli dokumenty na chunki z zachowaniem struktury. |
|
|
""" |
|
|
chunked_docs = [] |
|
|
|
|
|
for doc in documents: |
|
|
|
|
|
if len(doc.page_content) <= self.chunk_size: |
|
|
chunked_docs.append(doc) |
|
|
continue |
|
|
|
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=self.chunk_size, |
|
|
chunk_overlap=self.chunk_overlap, |
|
|
separators=[ |
|
|
"\n## ", |
|
|
"\n### ", |
|
|
"\n#### ", |
|
|
"\n\n", |
|
|
"\n", |
|
|
". ", |
|
|
", ", |
|
|
" ", |
|
|
"" |
|
|
] |
|
|
) |
|
|
|
|
|
texts = text_splitter.split_text(doc.page_content) |
|
|
|
|
|
for i, text in enumerate(texts): |
|
|
chunk_metadata = doc.metadata.copy() |
|
|
chunk_metadata["chunk_id"] = i |
|
|
chunk_metadata["total_chunks"] = len(texts) |
|
|
|
|
|
chunked_docs.append(Document( |
|
|
page_content=text, |
|
|
metadata=chunk_metadata |
|
|
)) |
|
|
|
|
|
return chunked_docs |
|
|
|
|
|
|
|
|
class HRAssistant: |
|
|
""" |
|
|
Asystent HR dla pracodawców zatrudniających osoby z niepełnosprawnościami. |
|
|
|
|
|
Wykorzystuje dokumenty PDF jako bazę wiedzy, przetwarza je na wektorową bazę danych (FAISS), |
|
|
a następnie umożliwia zadawanie pytań w języku polskim z konwersacyjną pamięcią kontekstu. |
|
|
Odpowiedzi generowane są przez model OpenAI GPT na podstawie treści dokumentów. |
|
|
|
|
|
Parametry: |
|
|
openai_api_key (str): Klucz API do OpenAI. |
|
|
pdf_directory (str): Ścieżka do katalogu z plikami PDF. |
|
|
""" |
|
|
|
|
|
def __init__(self, openai_api_key: str, pdf_directory: str = "pdfs"): |
|
|
self.openai_api_key = openai_api_key |
|
|
self.pdf_directory = Path(pdf_directory) |
|
|
|
|
|
|
|
|
self.embeddings = OpenAIEmbeddings( |
|
|
api_key=openai_api_key, |
|
|
model="text-embedding-3-small", |
|
|
chunk_size=1000 |
|
|
) |
|
|
self.llm = ChatOpenAI( |
|
|
api_key=openai_api_key, |
|
|
model="gpt-4o-mini", |
|
|
temperature=0.3 |
|
|
) |
|
|
self.chunker = IntelligentPDFChunker( |
|
|
chunk_size=800, |
|
|
chunk_overlap=150 |
|
|
) |
|
|
self.vectorstore = None |
|
|
self.qa_chain = None |
|
|
self.memory = ConversationBufferWindowMemory( |
|
|
k=5, |
|
|
memory_key="chat_history", |
|
|
return_messages=True, |
|
|
output_key="answer", |
|
|
input_key="question" |
|
|
) |
|
|
self._load_and_process_documents() |
|
|
self._setup_qa_chain() |
|
|
|
|
|
def _list_pdf_files(self) -> List[Path]: |
|
|
""" |
|
|
Listuje pliki PDF w katalogu. |
|
|
""" |
|
|
return list(self.pdf_directory.glob("*.pdf")) |
|
|
|
|
|
def _setup_qa_chain(self): |
|
|
""" |
|
|
Tworzy i konfiguruje łańcuch pytań i odpowiedzi (ConversationalRetrievalChain) dla asystenta HR. |
|
|
Łańcuch ten korzysta z modelu LLM, bazy wektorowej oraz pamięci konwersacyjnej. |
|
|
Prompt jest zoptymalizowany pod polskie realia HR i bazuje wyłącznie na wiedzy z dokumentów PDF. |
|
|
|
|
|
Raises: |
|
|
ValueError: Jeśli baza wektorowa nie została zainicjalizowana. |
|
|
""" |
|
|
if not self.vectorstore: |
|
|
raise ValueError("Baza wektorowa nie została zainicjalizowana.") |
|
|
|
|
|
prompt_template = ( |
|
|
"Jesteś ekspertem HR specjalizującym się w zatrudnianiu osób z niepełnosprawnościami w Polsce.\n" |
|
|
"Twoja wiedza opiera się na dostarczonych dokumentach, które mogą zawierać oficjalne poradniki i treści ze stron internetowych.\n\n" |
|
|
"Kontekst z dokumentów:\n{context}\n\n" |
|
|
"Historia rozmowy:\n{chat_history}\n\n" |
|
|
"Pytanie: {question}\n\n" |
|
|
"Instrukcje:\n" |
|
|
"1. Odpowiadaj w języku polskim.\n" |
|
|
"2. Bazuj wyłącznie na informacjach z dostarczonego kontekstu.\n" |
|
|
"3. Jeśli nie masz informacji w kontekście, powiedz to wprost.\n" |
|
|
"4. Podawaj konkretne i praktyczne porady.\n" |
|
|
"5. Bądź pomocny i profesjonalny.\n" |
|
|
"6. Zawsze podawaj źródło informacji (URL lub nazwa dokumentu PDF), jeśli jest dostępne w metadanych kontekstu.\n\n" |
|
|
"Odpowiedź:" |
|
|
) |
|
|
custom_prompt = PromptTemplate( |
|
|
template=prompt_template, |
|
|
input_variables=["context", "chat_history", "question"] |
|
|
) |
|
|
self.qa_chain = ConversationalRetrievalChain.from_llm( |
|
|
llm=self.llm, |
|
|
retriever=self.vectorstore.as_retriever( |
|
|
search_type="similarity", |
|
|
search_kwargs={"k": 8} |
|
|
), |
|
|
memory=self.memory, |
|
|
combine_docs_chain_kwargs={"prompt": custom_prompt}, |
|
|
return_source_documents=True, |
|
|
output_key="answer" |
|
|
) |
|
|
|
|
|
def _load_and_process_documents(self): |
|
|
""" |
|
|
Ładuje i przetwarza wszystkie dokumenty (PDF, URL, hardcoded) i tworzy jedną bazę wektorową. |
|
|
""" |
|
|
logger.info("Rozpoczynam ładowanie i przetwarzanie wszystkich dokumentów...") |
|
|
all_documents = [] |
|
|
|
|
|
|
|
|
logger.info("Ładowanie dokumentów PDF...") |
|
|
pdf_files = self._list_pdf_files() |
|
|
if not pdf_files: |
|
|
logger.warning(f"Nie znaleziono plików PDF w katalogu: {self.pdf_directory}. Kontynuuję bez nich.") |
|
|
else: |
|
|
logger.info(f"Znaleziono {len(pdf_files)} plików PDF") |
|
|
for pdf_file in pdf_files: |
|
|
logger.info(f"Przetwarzanie: {pdf_file.name}") |
|
|
try: |
|
|
documents = self.chunker._extract_pdf_structure(str(pdf_file)) |
|
|
for doc in documents: |
|
|
doc.metadata["filename"] = pdf_file.name |
|
|
doc.metadata["file_stem"] = pdf_file.stem |
|
|
all_documents.extend(documents) |
|
|
except Exception as e: |
|
|
logger.error(f"Błąd podczas przetwarzania pliku PDF {pdf_file.name}: {e}") |
|
|
logger.info(f"Wyekstraktowano {len(all_documents)} sekcji z plików PDF.") |
|
|
|
|
|
|
|
|
url_docs = self._get_url_documents() |
|
|
if url_docs: |
|
|
all_documents.extend(url_docs) |
|
|
logger.info(f"Dodano {len(url_docs)} dokumentów z adresów URL.") |
|
|
|
|
|
|
|
|
hardcoded_docs = self._get_hardcoded_documents() |
|
|
all_documents.extend(hardcoded_docs) |
|
|
logger.info(f"Dodano {len(hardcoded_docs)} dokumentów hardcoded.") |
|
|
|
|
|
if not all_documents: |
|
|
raise ValueError("Nie znaleziono żadnych dokumentów do przetworzenia. Baza wektorowa nie może zostać utworzona.") |
|
|
|
|
|
|
|
|
logger.info(f"Rozpoczynam chunkowanie {len(all_documents)} wszystkich zebranych dokumentów...") |
|
|
chunked_documents = self.chunker.chunk_documents(all_documents) |
|
|
logger.info(f"Utworzono {len(chunked_documents)} chunków.") |
|
|
|
|
|
|
|
|
logger.info("Tworzenie bazy wektorowej FAISS...") |
|
|
self.vectorstore = FAISS.from_documents( |
|
|
chunked_documents, |
|
|
self.embeddings |
|
|
) |
|
|
logger.info("Baza wektorowa została pomyślnie utworzona.") |
|
|
|
|
|
def _get_hardcoded_documents(self) -> List[Document]: |
|
|
""" |
|
|
Zwraca listę hardkodowanych dokumentów z kluczowymi danymi finansowymi. |
|
|
""" |
|
|
hardcoded_financial_info = """ |
|
|
WYSOKOŚĆ DOFINANSOWANIA DO WYNAGRODZEŃ PRACOWNIKÓW NIEPEŁNOSPRAWNYCH Z PFRON |
|
|
|
|
|
Kwoty miesięcznego dofinansowania do wynagrodzenia pracowników niepełnosprawnych: |
|
|
|
|
|
1) 2300 zł – w przypadku osób niepełnosprawnych zaliczonych do znacznego stopnia niepełnosprawności; |
|
|
2) 1350 zł – w przypadku osób niepełnosprawnych zaliczonych do umiarkowanego stopnia niepełnosprawności; |
|
|
3) 500 zł – w przypadku osób niepełnosprawnych zaliczonych do lekkiego stopnia niepełnosprawności. |
|
|
|
|
|
Kwoty, o których mowa w ust. 1, zwiększa się o 1050 zł w przypadku osób niepełnosprawnych, w odniesieniu do których orzeczono chorobę psychiczną, upośledzenie umysłowe, całościowe zaburzenia rozwojowe lub epilepsję oraz niewidomych. |
|
|
|
|
|
Miesięczne dofinansowanie do wynagrodzenia pracownika niepełnosprawnego, zwane dalej „miesięcznym dofinansowaniem", przysługuje w kwocie: |
|
|
1) 2300 zł – w przypadku osób niepełnosprawnych zaliczonych do znacznego stopnia niepełnosprawności; |
|
|
2) 1350 zł – w przypadku osób niepełnosprawnych zaliczonych do umiarkowanego stopnia niepełnosprawności; |
|
|
3) 500 zł – w przypadku osób niepełnosprawnych zaliczonych do lekkiego stopnia niepełnosprawności. |
|
|
|
|
|
Kwoty, o których mowa powyżej, zwiększa się o 1050 zł w przypadku osób niepełnosprawnych, w odniesieniu do których orzeczono chorobę psychiczną, upośledzenie umysłowe, całościowe zaburzenia rozwojowe lub epilepsję oraz niewidomych. |
|
|
|
|
|
Wysokość miesięcznego dofinansowania nie może przekroczyć 90% faktycznie poniesionych miesięcznych kosztów płacy, a w przypadku pracodawcy wykonującego działalność gospodarczą, w rozumieniu przepisów o postępowaniu w sprawach dotyczących pomocy publicznej, zwanym dalej "pracodawcą wykonującym działalność gospodarczą", 75% tych kosztów. |
|
|
""" |
|
|
|
|
|
hardcoded_doc = Document( |
|
|
page_content=hardcoded_financial_info, |
|
|
metadata={ |
|
|
'source': 'https://www.pfron.org.pl/pracodawcy/dofinansowanie-wynagrodzen/wysokosc-dofinansowania-do-wynagrodzen-pracownikow-niepelnosprawnych/', |
|
|
'title': 'Wysokość dofinansowania do wynagrodzeń pracowników niepełnosprawnych', |
|
|
'added_date': datetime.now().strftime("%Y-%m-%d"), |
|
|
'contains_financial_data': True, |
|
|
'is_hardcoded_financial': True |
|
|
} |
|
|
) |
|
|
|
|
|
common_questions_doc = Document( |
|
|
page_content=""" |
|
|
NAJCZĘŚCIEJ ZADAWANE PYTANIA O KWOTY DOFINANSOWAŃ Z PFRON |
|
|
|
|
|
Pytanie: Jaka jest kwota dofinansowania do wynagrodzenia pracownika ze znacznym stopniem niepełnosprawności? |
|
|
Odpowiedź: 2300 zł miesięcznie. |
|
|
|
|
|
Pytanie: Jaka jest kwota dofinansowania do wynagrodzenia pracownika z umiarkowanym stopniem niepełnosprawności? |
|
|
Odpowiedź: 1350 zł miesięcznie. |
|
|
|
|
|
Pytanie: Jaka jest kwota dofinansowania do wynagrodzenia pracownika z lekkim stopniem niepełnosprawności? |
|
|
Odpowiedź: 500 zł miesięcznie. |
|
|
|
|
|
Pytanie: O ile zwiększa się dofinansowanie dla pracowników ze schorzeniami szczególnymi? |
|
|
Odpowiedź: O 1050 zł miesięcznie w przypadku osób, w odniesieniu do których orzeczono chorobę psychiczną, upośledzenie umysłowe, całościowe zaburzenia rozwojowe lub epilepsję oraz niewidomych. |
|
|
|
|
|
Pytanie: Jaki jest maksymalny poziom dofinansowania do wynagrodzenia pracownika niepełnosprawnego? |
|
|
Odpowiedź: Wysokość miesięcznego dofinansowania nie może przekroczyć 90% faktycznie poniesionych miesięcznych kosztów płacy, a w przypadku pracodawcy wykonującego działalność gospodarczą 75% tych kosztów. |
|
|
""", |
|
|
metadata={ |
|
|
'source': 'https://www.pfron.org.pl/pracodawcy/dofinansowanie-wynagrodzen/wysokosc-dofinansowania-do-wynagrodzen-pracownikow-niepelnosprawnych/', |
|
|
'title': 'Najczęściej zadawane pytania o kwoty dofinansowań PFRON', |
|
|
'added_date': datetime.now().strftime("%Y-%m-%d"), |
|
|
'contains_financial_data': True, |
|
|
'is_hardcoded_financial': True |
|
|
} |
|
|
) |
|
|
|
|
|
return [hardcoded_doc, common_questions_doc] |
|
|
|
|
|
def _get_url_documents(self) -> List[Document]: |
|
|
""" |
|
|
Pobiera i przetwarza treści z URLi z pliku urls.txt. |
|
|
""" |
|
|
urls_file = 'urls.txt' |
|
|
if not os.path.exists(urls_file): |
|
|
logger.warning(f"Plik '{urls_file}' nie został znaleziony. Pomijanie dodawania treści z URLi.") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
with open(urls_file, 'r', encoding='utf-8') as f: |
|
|
urls = [line.strip() for line in f if line.strip()] |
|
|
except Exception as e: |
|
|
logger.error(f"Błąd podczas odczytu pliku '{urls_file}': {e}") |
|
|
return [] |
|
|
|
|
|
if not urls: |
|
|
logger.warning("Brak URLi do przetworzenia w pliku urls.txt.") |
|
|
return [] |
|
|
|
|
|
logger.info(f"Znaleziono {len(urls)} adresów URL do przetworzenia.") |
|
|
url_documents = [] |
|
|
for i, url in enumerate(urls, 1): |
|
|
try: |
|
|
logger.info(f"[{i}/{len(urls)}] Przetwarzanie URL: {url}") |
|
|
response = requests.get(url, timeout=15, headers={'User-Agent': 'Mozilla/5.0'}) |
|
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
title = soup.find('title').get_text().strip() if soup.find('title') else 'Brak tytułu' |
|
|
content = self._extract_url_content(soup) |
|
|
|
|
|
if content: |
|
|
logger.info(f" - Znaleziono treść (rozmiar: {len(content)} znaków).") |
|
|
metadata = { |
|
|
'source': url, |
|
|
'title': title, |
|
|
'added_date': datetime.now().strftime("%Y-%m-%d") |
|
|
} |
|
|
if re.search(r'\d+(?:[.,]\d+)?\s*(?:zł|PLN|złot)', content, re.IGNORECASE): |
|
|
metadata["contains_financial_data"] = True |
|
|
|
|
|
url_documents.append(Document(page_content=content, metadata=metadata)) |
|
|
else: |
|
|
logger.warning(f" - Nie znaleziono treści na stronie {url}.") |
|
|
|
|
|
except requests.RequestException as e: |
|
|
logger.error(f" - Błąd podczas pobierania {url}: {e}") |
|
|
except Exception as e: |
|
|
logger.error(f" - Nieoczekiwany błąd podczas przetwarzania {url}: {e}") |
|
|
|
|
|
return url_documents |
|
|
|
|
|
def _extract_url_content(self, soup: BeautifulSoup) -> str: |
|
|
""" |
|
|
Wyodrębnia główną treść tekstową ze strony internetowej (obiektu BeautifulSoup). |
|
|
Próbuje znaleźć kontenery z główną treścią, a jeśli to się nie uda, pobiera cały tekst. |
|
|
""" |
|
|
main_content = [] |
|
|
|
|
|
|
|
|
priority_selectors = [ |
|
|
'.frame.default', |
|
|
'.csc-default', |
|
|
'#c101710', |
|
|
'.content-main', |
|
|
'article', |
|
|
'main', |
|
|
'.content', |
|
|
'.main-content', |
|
|
'.post-content', |
|
|
'#content', |
|
|
'#main' |
|
|
] |
|
|
|
|
|
found_content = False |
|
|
for selector in priority_selectors: |
|
|
elements = soup.select(selector) |
|
|
if elements: |
|
|
for element in elements: |
|
|
main_content.append(element.get_text(separator='\n', strip=True)) |
|
|
found_content = True |
|
|
break |
|
|
|
|
|
|
|
|
if not found_content: |
|
|
if soup.body: |
|
|
main_content.append(soup.body.get_text(separator='\n', strip=True)) |
|
|
|
|
|
return "\n\n".join(main_content) |
|
|
|
|
|
def ask(self, question: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Zadaje pytanie asystentowi. |
|
|
""" |
|
|
logger.info(f"Otrzymano pytanie: {question}") |
|
|
|
|
|
try: |
|
|
response = self.qa_chain.invoke({ |
|
|
"question": question, |
|
|
"chat_history": self.memory.chat_memory.messages |
|
|
}) |
|
|
|
|
|
answer = response["answer"] |
|
|
|
|
|
|
|
|
result = { |
|
|
"answer": answer, |
|
|
"sources": [], |
|
|
"confidence": "medium" |
|
|
} |
|
|
|
|
|
if "source_documents" in response: |
|
|
for doc in response["source_documents"]: |
|
|
source_info = { |
|
|
"filename": doc.metadata.get("filename", ""), |
|
|
"page": doc.metadata.get("page", ""), |
|
|
"section": doc.metadata.get("section", ""), |
|
|
"title": doc.metadata.get("title", ""), |
|
|
"source": doc.metadata.get("source", ""), |
|
|
"added_date": doc.metadata.get("added_date", ""), |
|
|
"snippet": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content |
|
|
} |
|
|
result["sources"].append(source_info) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Błąd podczas przetwarzania pytania: {e}") |
|
|
return { |
|
|
"answer": "Przepraszam, wystąpił błąd podczas przetwarzania Twojego pytania.", |
|
|
"sources": [], |
|
|
"confidence": "low", |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Zwraca statystyki asystenta. |
|
|
""" |
|
|
return { |
|
|
"total_documents": self.vectorstore.index.ntotal if self.vectorstore else 0, |
|
|
"pdf_files": len(self._list_pdf_files()), |
|
|
"memory_messages": len(self.memory.chat_memory.messages), |
|
|
"model": "gpt-4o-mini", |
|
|
"embedding_model": "text-embedding-3-small" |
|
|
} |
|
|
|
|
|
def clear_memory(self): |
|
|
""" |
|
|
Czyści pamięć konwersacji. |
|
|
""" |
|
|
self.memory.clear() |
|
|
logger.info("Pamięć konwersacji została wyczyszczona") |
|
|
|
|
|
|
|
|
def print_unique_sources(sources: list): |
|
|
""" |
|
|
Wypisuje unikalne źródła na podstawie filename, page, section, zamieniając nazwę pliku na opis bibliograficzny jeśli to możliwe. |
|
|
Jeśli źródło to URL lub hardcoded, wypisuje tytuł lub URL. |
|
|
""" |
|
|
unique_sources = [] |
|
|
seen = set() |
|
|
for source in sources: |
|
|
key = (source.get('filename'), source.get('page'), source.get('section')) |
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
unique_sources.append(source) |
|
|
for i, source in enumerate(unique_sources, 1): |
|
|
opis = None |
|
|
filename = source.get('filename') |
|
|
if filename: |
|
|
file_stem = os.path.splitext(filename)[0] |
|
|
|
|
|
opis = bibliography_data.get(filename) |
|
|
if not opis: |
|
|
opis = bibliography_data.get(file_stem + '.pdf') |
|
|
if not opis: |
|
|
opis = bibliography_data.get(file_stem) |
|
|
if not opis: |
|
|
opis = filename |
|
|
else: |
|
|
|
|
|
opis = source.get('title') or source.get('source') or "nieznane źródło" |
|
|
page = source.get('page', '') |
|
|
section = source.get('section', '') |
|
|
print(f"{i}. {opis} (str. {page}) - {section}") |
|
|
|
|
|
def handle_command(command: str, assistant: HRAssistant) -> bool: |
|
|
""" |
|
|
Obsługuje polecenia specjalne. Zwraca True jeśli należy kontynuować pętlę. |
|
|
""" |
|
|
cmd = command.lower() |
|
|
if cmd in ['quit', 'exit', 'q']: |
|
|
return False |
|
|
if cmd == 'stats': |
|
|
print(f"Statystyki: {assistant.get_stats()}") |
|
|
return True |
|
|
if cmd == 'clear': |
|
|
assistant.clear_memory() |
|
|
print("Pamięć konwersacji została wyczyszczona") |
|
|
return True |
|
|
return None |
|
|
|
|
|
def main(): |
|
|
""" |
|
|
Przykład użycia asystenta HR. |
|
|
""" |
|
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("Ustaw zmienną środowiskową OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
assistant = HRAssistant( |
|
|
openai_api_key=api_key, |
|
|
pdf_directory="pdfs" |
|
|
) |
|
|
|
|
|
|
|
|
test_questions = [ |
|
|
"Jakie są uprawnienia pracownika z niepełnosprawnością?", |
|
|
"Jak przeprowadzić rekrutację osoby z niepełnosprawnością?", |
|
|
"Jakie wsparcie może otrzymać pracodawca zatrudniający osoby z niepełnosprawnościami?", |
|
|
"Czy osoba z orzeczeniem o całkowitej niezdolności do pracy może być zatrudniona?" |
|
|
] |
|
|
|
|
|
print("=== Asystent HR - Zatrudnianie osób z niepełnosprawnościami ===\n") |
|
|
print(f"Statystyki: {assistant.get_stats()}\n") |
|
|
print("Dostępne komendy:") |
|
|
print(" stats - wyświetla statystyki bazy wiedzy") |
|
|
print(" clear - czyści pamięć konwersacji") |
|
|
print(" quit/exit/q - kończy program\n") |
|
|
|
|
|
|
|
|
while True: |
|
|
try: |
|
|
question = input("\nTwoje pytanie (lub 'quit' aby zakończyć): ") |
|
|
if not question.strip(): |
|
|
continue |
|
|
|
|
|
cmd_result = handle_command(question, assistant) |
|
|
if cmd_result is False: |
|
|
break |
|
|
if cmd_result is True: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
response = assistant.ask(question) |
|
|
print(f"\n📝 Odpowiedź:") |
|
|
print(response["answer"]) |
|
|
|
|
|
|
|
|
if response.get("sources"): |
|
|
print("\nŹródła:") |
|
|
print_unique_sources(response["sources"]) |
|
|
|
|
|
if "error" in response: |
|
|
print(f"\n⚠️ Błąd: {response['error']}") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n\nDo widzenia!") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"\n❌ Błąd: {e}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
logger.info("Pamięć konwersacji została wyczyszczona") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bibliography_data = {} |
|
|
file_path = 'bibliography.csv' |
|
|
if os.path.exists(file_path): |
|
|
with open(file_path, mode='r', encoding='utf-8') as csvfile: |
|
|
reader = csv.reader(csvfile, delimiter=';') |
|
|
for row in reader: |
|
|
if len(row) == 2: |
|
|
bibliography_data[row[1].strip()] = row[0].strip() |
|
|
|
|
|
print(bibliography_data) |
|
|
|
|
|
|