Spaces:
Runtime error
Runtime error
| import numpy | |
| import glob | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from langchain.docstore.document import Document | |
| from langchain_community.document_loaders import UnstructuredFileLoader | |
| import json | |
| import pandas as pd | |
| import re | |
| from langchain_text_splitters import HTMLHeaderTextSplitter | |
| def retrieve_sources(): | |
| # Die URL der Webseite, die du scrapen möchtest | |
| base_url = 'https://www.fimohealth.com' | |
| url = 'https://www.fimohealth.com/categories/long-covid/' | |
| # Die Anfrage an die Webseite senden | |
| response = requests.get(url) | |
| # Sicherstellen, dass die Anfrage erfolgreich war (Statuscode 200) | |
| if response.status_code == 200: | |
| # Den HTML-Inhalt der Webseite parsen | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Hier kannst du mit BeautifulSoup arbeiten, um spezifische Elemente zu finden und zu extrahieren | |
| # Zum Beispiel, um alle <a> Tags auf der Seite zu finden: | |
| links = soup.find_all('a') | |
| urls = [] | |
| # Die gefundenen Links ausgeben | |
| for link in links: | |
| if "/gesundheitsblog/" in link.get('href'): | |
| complete_url = base_url + link.get('href') | |
| urls.append(complete_url) | |
| else: | |
| print('Fehler beim Abrufen der Webseite:', response.status_code) | |
| return urls | |
| def html_to_chunks(): | |
| urls = retrieve_sources() | |
| docs = [] | |
| for url in urls: | |
| # Assuming urls is a list of URLs and you want to fetch the content of the 5th URL | |
| response = requests.get(url) | |
| # Try decoding with different encodings until you find the correct one | |
| encodings_to_try = ['utf-8', 'latin-1', 'ISO-8859-1'] | |
| for encoding in encodings_to_try: | |
| try: | |
| content = response.content.decode(encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| # Parse the content using Beautiful Soup | |
| #soup = BeautifulSoup(content, 'html.parser') | |
| # Now you can navigate and extract data from the parsed HTML using Beautiful Soup | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| html_string = str(soup.find_all('section', {"class": "section-blog-template-article"})[0]) | |
| def clean_article(text): | |
| # Find the index of the word "Zurück" | |
| index = text.find("Zurück") | |
| # Extract the substring that comes after "Zurück" | |
| substring = text[index + len("Zurück"):].strip() | |
| # Ersetze ":in" durch "*in" | |
| substring = re.sub(r':in', r'\*in', text) | |
| return substring | |
| html_string = clean_article(html_string) | |
| headers_to_split_on = [ | |
| ("h1", "Header 1"), | |
| ("h2", "Header 2"), | |
| ("h3", "Header 3"), | |
| ("h4", "Header 4"), | |
| ("h5", "Header 5"), | |
| ] | |
| html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on) | |
| chunks = html_splitter.split_text(html_string) | |
| for chunk in chunks: | |
| chunk.metadata["source"] = url | |
| docs.append(chunk) | |
| return docs | |
| def retrieve_content(url): | |
| def clean_article(text): | |
| # Find the index of the word "Zurück" | |
| index = text.find("Zurück") | |
| # Extract the substring that comes after "Zurück" | |
| substring = text[index + len("Zurück"):].strip() | |
| # Ersetze ":in" durch "*in" | |
| substring = re.sub(r':in', '\*in', text) | |
| return substring | |
| # Send a GET request to the webpage | |
| response = requests.get(url) | |
| # Check if the request was successful | |
| if response.status_code == 200: | |
| # Parse the HTML content of the webpage | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Find the main elements you want to retrieve | |
| main_elements = soup.find_all('main') | |
| page_content = "" | |
| # Iterate over the main elements and print their text content | |
| for element in main_elements: | |
| page_content += element.text | |
| cleaned_page_content = clean_article(page_content) | |
| return cleaned_page_content | |
| else: | |
| print('Failed to retrieve the webpage') | |
| def create_documents(): | |
| urls = retrieve_sources() | |
| # manually added | |
| urls.append("https://www.fimohealth.com/patienten") | |
| urls.append("https://www.fimohealth.com/patienten/long-covid") | |
| documents = [] | |
| for index, url in enumerate(urls): | |
| content = retrieve_content(url) | |
| documents.append(Document(page_content=content, metadata={"source": url})) | |
| # Get all the filenames from the docs folder | |
| files = glob.glob("./docs/*.txt") | |
| # Load files into readable documents | |
| for file in files: | |
| loader = UnstructuredFileLoader(file) | |
| documents.append(loader.load()[0]) | |
| if len(documents) > 0: | |
| return documents | |
| else: | |
| return TypeError | |
| def create_faq_documents(): | |
| documents = [] | |
| df = pd.read_csv('./docs/faq.csv', sep=",") | |
| for i, j in df.iterrows(): | |
| documents.append(Document(page_content=f"{j['Title']} \n {j['Text - de']}", metadata={"source": f"FAQ - {j['Bereich']}"})) | |
| document_dicts = [doc.__dict__ for doc in documents] | |
| # Write all documents to a single JSON file | |
| file_path = './docs/faq_docs.json' | |
| with open(file_path, "w") as file: | |
| json.dump(document_dicts, file, indent=4) | |
| print(f"All {len(documents)} langchain documents have been saved to '{file_path}'.") | |
| def store_documents(documents, path="./docs/langchain_documents.json"): | |
| # Convert each LangchainDocument object to a dictionary | |
| document_dicts = [doc.__dict__ for doc in documents] | |
| # Write all documents to a single JSON file | |
| file_path = path | |
| with open(file_path, "w") as file: | |
| json.dump(document_dicts, file, indent=4) | |
| print(f"All {len(documents)} langchain documents have been saved to '{file_path}'.") | |
| def read_documents_from_file(file_path="./docs/langchain_documents.json"): | |
| documents = [] | |
| try: | |
| with open(file_path, "r") as file: | |
| document_dicts = json.load(file) | |
| for doc_dict in document_dicts: | |
| document = Document(**doc_dict) | |
| documents.append(document) | |
| print(f"Successfully read {len(documents)} documents from '{file_path}'.") | |
| return documents | |
| except FileNotFoundError: | |
| print(f"File '{file_path}' not found.") | |
| return [] | |
| except Exception as e: | |
| print(f"Error reading documents from '{file_path}': {e}") | |
| return [] |