|
|
|
|
|
from langchain_community.document_loaders import SitemapLoader |
|
|
from bs4 import BeautifulSoup |
|
|
import re |
|
|
from langchain_chroma import Chroma |
|
|
from langchain_openai import OpenAIEmbeddings |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_core.documents import Document |
|
|
import requests |
|
|
from tqdm import tqdm |
|
|
|
|
|
def process_documents(docs: list[Document]) -> list[Document]: |
|
|
""" |
|
|
Przetwarza listę dokumentów, wyodrębniając treść i metadane z HTML. |
|
|
""" |
|
|
processed_docs = [] |
|
|
for doc in docs: |
|
|
soup = BeautifulSoup(doc.page_content, "lxml") |
|
|
|
|
|
|
|
|
article = soup.find("article") |
|
|
if article: |
|
|
content = article.get_text(separator="\n", strip=True) |
|
|
else: |
|
|
content = soup.get_text(separator="\n", strip=True) |
|
|
|
|
|
|
|
|
metadata = doc.metadata.copy() |
|
|
|
|
|
|
|
|
if soup.title: |
|
|
title_text = soup.title.get_text(strip=True) |
|
|
if title_text: |
|
|
metadata["title"] = title_text |
|
|
|
|
|
|
|
|
|
|
|
pub_date_tag = soup.find("meta", property="article:published_time") |
|
|
if pub_date_tag and pub_date_tag.get("content"): |
|
|
metadata["published_time"] = pub_date_tag["content"] |
|
|
else: |
|
|
time_tag = soup.find("time") |
|
|
if time_tag and time_tag.get("datetime"): |
|
|
metadata["published_time"] = time_tag.get("datetime") |
|
|
elif time_tag and time_tag.get_text(strip=True): |
|
|
metadata["published_time"] = time_tag.get_text(strip=True) |
|
|
else: |
|
|
|
|
|
text = soup.get_text(separator="\n", strip=True) |
|
|
m = re.search(r"Opublikowano(?: w dniu)?[:\s]+([0-9]{1,2}\s+\w+\s+\d{4})", text, re.IGNORECASE) |
|
|
if m: |
|
|
metadata["published_time"] = m.group(1) |
|
|
|
|
|
|
|
|
categories = [ |
|
|
tag["content"] |
|
|
for tag in soup.find_all("meta", property="article:section") |
|
|
if tag.get("content") |
|
|
] |
|
|
if categories: |
|
|
metadata["categories"] = ", ".join(categories) |
|
|
|
|
|
|
|
|
keywords = [ |
|
|
tag["content"] |
|
|
for tag in soup.find_all("meta", property="article:tag") |
|
|
if tag.get("content") |
|
|
] |
|
|
if keywords: |
|
|
metadata["keywords"] = ", ".join(keywords) |
|
|
|
|
|
|
|
|
|
|
|
processed_docs.append(Document(page_content=content, metadata=metadata)) |
|
|
return processed_docs |
|
|
|
|
|
|
|
|
embedder=OpenAIEmbeddings(model="text-embedding-3-small", show_progress_bar=True) |
|
|
|
|
|
baza=Chroma(collection_name="szuflada", embedding_function=embedder, persist_directory="./szuflada") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Czyszczenie istniejącej kolekcji w bazie danych...") |
|
|
try: |
|
|
baza.delete_collection() |
|
|
print("Kolekcja została wyczyszczona.") |
|
|
|
|
|
baza=Chroma(collection_name="szuflada", embedding_function=embedder, persist_directory="./szuflada") |
|
|
except Exception as e: |
|
|
print(f"Nie można było wyczyścić kolekcji (może nie istniała): {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Pobieranie i parsowanie mapy strony...") |
|
|
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} |
|
|
sitemap_url = "https://mojaszuflada.pl/wp-sitemap.xml" |
|
|
docs = [] |
|
|
|
|
|
try: |
|
|
response = requests.get(sitemap_url, headers=headers) |
|
|
response.raise_for_status() |
|
|
sitemap_xml = response.text |
|
|
|
|
|
sitemap_soup = BeautifulSoup(sitemap_xml, "xml") |
|
|
urls = [loc.text for loc in sitemap_soup.find_all("loc")] |
|
|
|
|
|
sitemap_urls = [url for url in urls if url.endswith(".xml")] |
|
|
page_urls = [url for url in urls if not url.endswith(".xml")] |
|
|
|
|
|
for sub_sitemap_url in tqdm(sitemap_urls, desc="Parsowanie pod-map"): |
|
|
try: |
|
|
response = requests.get(sub_sitemap_url, headers=headers) |
|
|
response.raise_for_status() |
|
|
sub_sitemap_xml = response.text |
|
|
sub_sitemap_soup = BeautifulSoup(sub_sitemap_xml, "xml") |
|
|
page_urls.extend([loc.text for loc in sub_sitemap_soup.find_all("loc")]) |
|
|
except requests.RequestException as e: |
|
|
print(f"Pominięto pod-mapę {sub_sitemap_url}: {e}") |
|
|
|
|
|
print(f"Znaleziono {len(page_urls)} adresów URL do przetworzenia.") |
|
|
|
|
|
for url in tqdm(page_urls, desc="Pobieranie stron"): |
|
|
try: |
|
|
response = requests.get(url, headers=headers) |
|
|
response.raise_for_status() |
|
|
doc = Document( |
|
|
page_content=response.text, |
|
|
metadata={"source": url, "loc": url} |
|
|
) |
|
|
docs.append(doc) |
|
|
except requests.RequestException as e: |
|
|
print(f"Pominięto stronę {url}: {e}") |
|
|
|
|
|
except requests.RequestException as e: |
|
|
print(f"Krytyczny błąd: Nie udało się pobrać głównej mapy strony: {e}") |
|
|
|
|
|
|
|
|
if not docs: |
|
|
print("Nie załadowano żadnych dokumentów. Zakończenie pracy.") |
|
|
exit() |
|
|
|
|
|
|
|
|
processed_docs = process_documents(docs) |
|
|
|
|
|
print("\nPrzykładowe metadane przetworzonych dokumentów (pierwsze 5):") |
|
|
for pd in processed_docs[:5]: |
|
|
print(pd.metadata) |
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
|
|
chunks = text_splitter.split_documents(processed_docs) |
|
|
|
|
|
|
|
|
batch_size = 1000 |
|
|
|
|
|
|
|
|
required_meta_keys = ["source", "title", "published_time"] |
|
|
missing_counts = {k: 0 for k in required_meta_keys} |
|
|
for idx, chunk in enumerate(chunks): |
|
|
md = chunk.metadata or {} |
|
|
for k in required_meta_keys: |
|
|
if not md.get(k): |
|
|
missing_counts[k] += 1 |
|
|
|
|
|
print(f"Liczba chunków: {len(chunks)}") |
|
|
print("Braki metadanych (liczba chunków bez klucza/wartości):", missing_counts) |
|
|
print("Przykładowe metadane dla pierwszych 5 chunków:") |
|
|
for sample in chunks[:5]: |
|
|
print(sample.metadata) |
|
|
|
|
|
|
|
|
for i in range(0, len(chunks), batch_size): |
|
|
baza.add_documents(documents=chunks[i:i + batch_size]) |
|
|
|
|
|
|