d365stuff-chat / main.py
mafzaal's picture
Update .dockerignore and .gitignore to include new directories and log files; enhance app.py with Dropbox download functionality and logging; modify build-vector-store.sh for improved error handling and options; remove obsolete posts and pipeline.py.
4542bf4
from io import BytesIO, StringIO
import os
import requests
import html2text
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import xml.etree.ElementTree as ET
from datetime import datetime
from markitdown import MarkItDown
import logging
import hashlib
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('d365stuff_downloader.log'),
logging.StreamHandler()
]
)
def create_posts_directory():
"""Create posts directory if it doesn't exist."""
directories = ['posts', 'posts_markitdown', 'html_pages']
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
logging.info(f"Created directory: {directory}")
def get_file_hash(content):
"""Generate a hash of the content for comparison."""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def download_page(url):
"""Download a webpage and return its content."""
filename = get_filename_from_url(url)
html_path = os.path.join('html_pages', f"{filename}.html")
# Check if HTML file already exists
if os.path.exists(html_path):
logging.info(f"HTML file already exists for {url}, reading from disk")
with open(html_path, 'r', encoding='utf-8') as f:
return f.read()
try:
logging.info(f"Downloading page: {url}")
response = requests.get(url)
response.raise_for_status()
content = response.text
# Save HTML content
with open(html_path, 'w', encoding='utf-8') as f:
f.write(content)
logging.info(f"Saved HTML content to: {html_path}")
return content
except requests.RequestException as e:
logging.error(f"Error downloading {url}: {e}")
return None
def extract_main_content(html_content):
"""Extract the main content from HTML using BeautifulSoup."""
soup = BeautifulSoup(html_content, 'html.parser')
# Find the main article content
article = soup.find('article')
if article:
return str(article)
return html_content
def convert_to_markdown_html2text(html_content):
"""Convert HTML content to markdown using html2text."""
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = False
h.ignore_emphasis = False
return h.handle(html_content)
def convert_to_markdown_markitdown(html_content):
"""Convert HTML content to markdown using MarkItDown."""
md = MarkItDown()
# convert html_content to BytesIO
html_content_io = BytesIO(html_content.encode('utf-8'))
result = md.convert(html_content_io)
return result.markdown
def get_filename_from_url(url):
"""Extract filename from URL."""
parsed = urlparse(url)
path = parsed.path.strip('/')
# Remove any trailing slashes and get the last part
filename = path.split('/')[-1]
# Remove any query parameters
filename = filename.split('?')[0]
# Remove any special characters and replace spaces with hyphens
filename = ''.join(c for c in filename if c.isalnum() or c in ('-', '_', '.'))
# Ensure filename is not too long (max 50 chars)
#if len(filename) > 50:
# filename = filename[:50]
return filename
def extract_json_ld_metadata(html_content):
"""Extract metadata from JSON-LD script tag."""
soup = BeautifulSoup(html_content, 'html.parser')
script_tag = soup.find('script', {'type': 'application/ld+json'})
if not script_tag:
logging.warning("No JSON-LD metadata found in the page")
return None
try:
metadata = json.loads(script_tag.string)
return metadata
except json.JSONDecodeError as e:
logging.error(f"Error parsing JSON-LD metadata: {e}")
return None
def create_frontmatter(metadata):
"""Create markdown frontmatter from metadata."""
if not metadata:
return ""
frontmatter = "---\n"
# Add basic metadata
if 'headline' in metadata:
frontmatter += f"title: {metadata['headline']}\n"
if 'datePublished' in metadata:
frontmatter += f"date: {metadata['datePublished']}\n"
if 'dateModified' in metadata:
frontmatter += f"lastmod: {metadata['dateModified']}\n"
if 'description' in metadata:
frontmatter += f"description: {metadata['description']}\n"
if 'keywords' in metadata:
frontmatter += f"tags: {metadata['keywords']}\n"
# Add author information
if 'author' in metadata:
author = metadata['author']
if isinstance(author, dict):
frontmatter += f"author: {author.get('name', '')}\n"
if 'url' in author:
frontmatter += f"author_url: {author['url']}\n"
# Add publisher information
if 'publisher' in metadata:
publisher = metadata['publisher']
if isinstance(publisher, dict):
frontmatter += f"publisher: {publisher.get('name', '')}\n"
if 'url' in publisher:
frontmatter += f"publisher_url: {publisher['url']}\n"
frontmatter += "---\n\n"
return frontmatter
def save_markdown(content, directory, filename, metadata=None):
"""Save markdown content to a file with frontmatter."""
if not filename.endswith('.md'):
filename += '.md'
output_path = os.path.join(directory, filename)
# Add frontmatter if metadata is available
if metadata:
frontmatter = create_frontmatter(metadata)
content = frontmatter + content
# Check if file exists and compare content
if os.path.exists(output_path):
with open(output_path, 'r', encoding='utf-8') as f:
existing_content = f.read()
if get_file_hash(existing_content) == get_file_hash(content):
logging.info(f"Content unchanged for {output_path}, skipping save")
return
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content)
logging.info(f"Saved to: {output_path}")
def process_sitemap(sitemap_url):
"""Process the sitemap and download all pages."""
try:
logging.info(f"Processing sitemap: {sitemap_url}")
response = requests.get(sitemap_url)
response.raise_for_status()
# Parse XML
root = ET.fromstring(response.content)
# Define namespace
ns = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
# Create posts directories
create_posts_directory()
# Process each URL in the sitemap
for url in root.findall('.//ns:url/ns:loc', ns):
page_url = url.text
logging.info(f"Processing: {page_url}")
# Download page
html_content = download_page(page_url)
if html_content:
# Extract metadata
metadata = extract_json_ld_metadata(html_content)
# Extract main content
main_content = extract_main_content(html_content)
# Generate filename
filename = get_filename_from_url(page_url)
# Save both versions with metadata
try:
markdown_content_html2text = convert_to_markdown_html2text(main_content)
save_markdown(markdown_content_html2text, 'posts', filename, metadata)
except Exception as e:
logging.error(f"Error saving HTML2Text markdown: {e}", exc_info=True)
try:
markdown_content_markitdown = convert_to_markdown_markitdown(main_content)
save_markdown(markdown_content_markitdown, 'posts_markitdown', filename, metadata)
except Exception as e:
logging.error(f"Error saving MarkItDown markdown: {e}", exc_info=True)
except Exception as e:
logging.error(f"Error processing sitemap: {e}", exc_info=True)
def main():
sitemap_url = "https://www.d365stuff.co/sitemap-posts.xml"
logging.info("Starting download process...")
process_sitemap(sitemap_url)
logging.info("Download process completed!")
if __name__ == "__main__":
main()