import asyncio from playwright.async_api import async_playwright, Playwright, TimeoutError as PlaywrightTimeoutError from bs4 import BeautifulSoup, NavigableString, Tag import re import os from urllib.parse import urljoin import sqlite3 import datetime import time import random import xml.etree.ElementTree as ET # --- Configuration --- DATABASE_FILE = "scraped.db" MARKDOWN_OUTPUT_DIR = "scraped_md" URLS_FILE = "urls.txt" DELAY_MIN_SECONDS = 0.5 DELAY_MAX_SECONDS = 1.0 NAVIGATION_TIMEOUT_SECONDS = 60 # Set an explicit timeout for page navigation # --- Database Functions --- def init_db(): """Initializes the SQLite database and creates the necessary table.""" os.makedirs(MARKDOWN_OUTPUT_DIR, exist_ok=True) conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS scraped_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, title TEXT, full_markdown_content TEXT, status_code INTEGER, error_message TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() print(f"[*] Database '{DATABASE_FILE}' initialized and '{MARKDOWN_OUTPUT_DIR}' directory ensured.") def insert_scraped_data(url, title, markdown_content, status_code, error_message=None): """Inserts scraped data into the database.""" conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute(''' INSERT INTO scraped_data (url, title, full_markdown_content, status_code, error_message) VALUES (?, ?, ?, ?, ?) ''', (url, title, markdown_content, status_code, error_message)) conn.commit() conn.close() # --- Core Scraping Function for a single page's content extraction --- async def process_page_content(page, url): """ Extracts text and inline Markdown links from
tags on a given page. """ try: html_content = await page.content() soup = BeautifulSoup(html_content, 'html.parser') page_title = soup.find('title').text if soup.find('title') else 'Untitled_Page_No_JS' markdown_paragraphs = [] for p_tag in soup.find_all('p'): paragraph_markdown = [] for content in p_tag.contents: if isinstance(content, NavigableString): paragraph_markdown.append(str(content).strip()) elif isinstance(content, Tag) and content.name == 'a': link_text = content.get_text(strip=True) link_href = content.get('href') if link_href: if not link_href.startswith(('http://', 'https://', 'mailto:', 'tel:', '#')): link_href = urljoin(url, link_href) paragraph_markdown.append(f"[{link_text}]({link_href})") else: paragraph_markdown.append(link_text) else: paragraph_markdown.append(content.get_text(strip=True)) if any(paragraph_markdown): joined_paragraph = " ".join(paragraph_markdown).strip() joined_paragraph = re.sub(r'\s+', ' ', joined_paragraph) # Clean multiple spaces markdown_paragraphs.append(joined_paragraph) full_markdown_content = f"# {page_title}\n\n" full_markdown_content += "\n\n".join(markdown_paragraphs) return { "title": page_title, "markdown_content": full_markdown_content, "error_message": None } except Exception as e: return { "title": "Error Processing Content", "markdown_content": "", "error_message": f"Error during content parsing: {e}" } # --- Main execution function --- def load_urls_from_sitemap(sitemap_path): """Parses an XML sitemap file and returns a list of URLs.""" urls = [] try: tree = ET.parse(sitemap_path) root = tree.getroot() # Namespace for sitemap XML namespace = {'sitemap': 'http://www.sitemaps.org/schemas/sitemap/0.9'} for url_element in root.findall('sitemap:url', namespace): loc_element = url_element.find('sitemap:loc', namespace) if loc_element is not None: urls.append(loc_element.text) print(f"[*] Loaded {len(urls)} URLs from sitemap: '{sitemap_path}'") return urls except FileNotFoundError: print(f"ERROR: Sitemap file '{sitemap_path}' not found.") return [] except ET.ParseError as e: print(f"ERROR: Failed to parse sitemap '{sitemap_path}': {e}") return [] except Exception as e: print(f"An unexpected error occurred while loading sitemap: {e}") return [] async def main(): init_db() urls_to_scrape = [] # Prompt user for sitemap or urls.txt print("\n--- URL Source Selection ---") print("1. Load URLs from 'urls.txt' (one URL per line)") print("2. Load URLs from an XML sitemap file") choice = input("Enter your choice (1 or 2): ").strip() if choice == '1': try: with open(URLS_FILE, 'r', encoding='utf-8') as f: urls_to_scrape = [line.strip() for line in f if line.strip()] if not urls_to_scrape: print(f"WARNING: '{URLS_FILE}' is empty. No URLs to scrape.") return print(f"[*] Using URLs from '{URLS_FILE}'.") except FileNotFoundError: print(f"ERROR: '{URLS_FILE}' not found. Please create the file with URLs, one per line.") return elif choice == '2': sitemap_path = input("Enter the path to the XML sitemap file: ").strip() if not sitemap_path: print("Sitemap path cannot be empty. Exiting.") return urls_to_scrape = load_urls_from_sitemap(sitemap_path) if not urls_to_scrape: print("No URLs loaded from sitemap. Exiting.") return else: print("Invalid choice. Please enter 1 or 2.") return total_urls = len(urls_to_scrape) if total_urls == 0: print("No URLs available for scraping. Exiting.") return start_total_time = time.time() print(f"--- Starting automated scraping of {total_urls} URLs ---") # This message is crucial for the user to understand what will happen visually print(f"[*] Browsers will launch non-headless (visible), process the page, and close automatically for each URL.") print(f"[*] No manual input required after starting. It will proceed to the next URL after a short delay.") print(f"[*] Delay between requests: {DELAY_MIN_SECONDS:.1f} - {DELAY_MAX_SECONDS:.1f} seconds.") print(f"[*] Navigation timeout set to {NAVIGATION_TIMEOUT_SECONDS} seconds per page.") # We will launch and close a browser/context for each URL. # This ensures a clean state and handles potential hangs more robustly when headless=False. for i, url in enumerate(urls_to_scrape): current_index = i + 1 elapsed_time = time.time() - start_total_time avg_time_per_url = elapsed_time / current_index if current_index > 0 else 0 remaining_urls = total_urls - current_index eta_seconds = remaining_urls * avg_time_per_url eta_display = str(datetime.timedelta(seconds=int(eta_seconds))) print(f"\n--- Progress: {current_index}/{total_urls} --- ETA: {eta_display} ---") print(f"[*] Attempting to navigate to: {url}") browser = None status_code = 0 scraped_data_title = "Not Scraped" scraped_data_markdown = "" scraped_data_error = "Unknown error" try: async with async_playwright() as p: browser = await p.chromium.launch(headless=False) # STILL headless=False context = await browser.new_context(java_script_enabled=False) # Set realistic headers for the context await context.set_extra_http_headers({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "en,en-AU;q=0.9,sr;q=0.8,sr-RS;q=0.7,en-GB;q=0.6,en-US;q=0.5,hr;q=0.4", "Cache-Control": "max-age=0", "Sec-Ch-Ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1" }) page = await context.new_page() # --- Core Navigation with Timeout --- try: response = await page.goto(url, wait_until="load", timeout=NAVIGATION_TIMEOUT_SECONDS * 1000) # Playwright timeout is in ms status_code = response.status if response else 0 print(f"[*] Page loaded. HTTP Status: {status_code}") if 200 <= status_code < 300: # Success codes scraped_content_result = await process_page_content(page, url) scraped_data_title = scraped_content_result["title"] scraped_data_markdown = scraped_content_result["markdown_content"] scraped_data_error = scraped_content_result["error_message"] print(f"[*] Content extraction attempted for: {url}") else: scraped_data_title = f"HTTP Error {status_code}" scraped_data_markdown = "" scraped_data_error = f"Navigation failed with status {status_code}" print(f"[*] WARNING: Non-2xx status code: {status_code}") except PlaywrightTimeoutError: status_code = 408 # Request Timeout scraped_data_title = "Navigation Timeout" scraped_data_markdown = "" scraped_data_error = f"Navigation timed out after {NAVIGATION_TIMEOUT_SECONDS} seconds." print(f"[*] ERROR: Navigation timed out for {url}") except Exception as nav_error: status_code = 0 # General error scraped_data_title = "Navigation Error" scraped_data_markdown = "" scraped_data_error = f"Error during navigation: {nav_error}" print(f"[*] ERROR during navigation for {url}: {nav_error}") # Ensure page and context are closed after each URL await page.close() await context.close() except Exception as browser_launch_error: # This catches errors if the browser itself fails to launch or something critical scraped_data_title = "Browser Launch Error" scraped_data_markdown = "" scraped_data_error = f"Browser or context launch failed: {browser_launch_error}" print(f"[*] CRITICAL ERROR (Browser/Context Launch) for {url}: {browser_launch_error}") finally: if browser: await browser.close() # Ensure the browser instance is closed # Store result in DB insert_scraped_data( url, scraped_data_title, scraped_data_markdown, status_code, scraped_data_error ) print(f"[*] Data for {url} saved to '{DATABASE_FILE}'.") # Save to MD file if successful content was extracted and no content parsing error if scraped_data_markdown and not scraped_data_error: safe_filename = re.sub(r'[\\/:*?"<>| ]', '_', scraped_data_title)[:100] md_filename = os.path.join(MARKDOWN_OUTPUT_DIR, f"{safe_filename}.md") try: with open(md_filename, 'w', encoding='utf-8') as f: f.write(scraped_data_markdown) print(f"[*] Markdown saved to {md_filename}") except Exception as file_error: print(f"[*] ERROR: Failed to save MD file for {url}: {file_error}") elif scraped_data_error: print(f"[*] Skipping MD file creation for {url} due to an error.") # Add a random delay between requests, except after the very last URL if i < total_urls - 1: delay = random.uniform(DELAY_MIN_SECONDS, DELAY_MAX_SECONDS) print(f"[*] Waiting {delay:.2f} seconds before next URL...") await asyncio.sleep(delay) end_total_time = time.time() total_duration = str(datetime.timedelta(seconds=int(end_total_time - start_total_time))) print(f"\n=== Automated scraping process complete! ===") print(f"Total URLs processed: {total_urls}") print(f"Total duration: {total_duration}") print(f"Scraped data saved to '{DATABASE_FILE}' and markdown files in '{MARKDOWN_OUTPUT_DIR}/'.") if __name__ == "__main__": asyncio.run(main())