File size: 13,901 Bytes
f29b6e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
import asyncio
from playwright.async_api import async_playwright, Playwright, TimeoutError as PlaywrightTimeoutError
from bs4 import BeautifulSoup, NavigableString, Tag
import re
import os
from urllib.parse import urljoin
import sqlite3
import datetime
import time
import random
import xml.etree.ElementTree as ET
# --- Configuration ---
DATABASE_FILE = "scraped.db"
MARKDOWN_OUTPUT_DIR = "scraped_md"
URLS_FILE = "urls.txt"
DELAY_MIN_SECONDS = 0.5
DELAY_MAX_SECONDS = 1.0
NAVIGATION_TIMEOUT_SECONDS = 60 # Set an explicit timeout for page navigation
# --- Database Functions ---
def init_db():
"""Initializes the SQLite database and creates the necessary table."""
os.makedirs(MARKDOWN_OUTPUT_DIR, exist_ok=True)
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS scraped_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
title TEXT,
full_markdown_content TEXT,
status_code INTEGER,
error_message TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
print(f"[*] Database '{DATABASE_FILE}' initialized and '{MARKDOWN_OUTPUT_DIR}' directory ensured.")
def insert_scraped_data(url, title, markdown_content, status_code, error_message=None):
"""Inserts scraped data into the database."""
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO scraped_data (url, title, full_markdown_content, status_code, error_message)
VALUES (?, ?, ?, ?, ?)
''', (url, title, markdown_content, status_code, error_message))
conn.commit()
conn.close()
# --- Core Scraping Function for a single page's content extraction ---
async def process_page_content(page, url):
"""
Extracts text and inline Markdown links from <p> tags on a given page.
"""
try:
html_content = await page.content()
soup = BeautifulSoup(html_content, 'html.parser')
page_title = soup.find('title').text if soup.find('title') else 'Untitled_Page_No_JS'
markdown_paragraphs = []
for p_tag in soup.find_all('p'):
paragraph_markdown = []
for content in p_tag.contents:
if isinstance(content, NavigableString):
paragraph_markdown.append(str(content).strip())
elif isinstance(content, Tag) and content.name == 'a':
link_text = content.get_text(strip=True)
link_href = content.get('href')
if link_href:
if not link_href.startswith(('http://', 'https://', 'mailto:', 'tel:', '#')):
link_href = urljoin(url, link_href)
paragraph_markdown.append(f"[{link_text}]({link_href})")
else:
paragraph_markdown.append(link_text)
else:
paragraph_markdown.append(content.get_text(strip=True))
if any(paragraph_markdown):
joined_paragraph = " ".join(paragraph_markdown).strip()
joined_paragraph = re.sub(r'\s+', ' ', joined_paragraph) # Clean multiple spaces
markdown_paragraphs.append(joined_paragraph)
full_markdown_content = f"# {page_title}\n\n"
full_markdown_content += "\n\n".join(markdown_paragraphs)
return {
"title": page_title,
"markdown_content": full_markdown_content,
"error_message": None
}
except Exception as e:
return {
"title": "Error Processing Content",
"markdown_content": "",
"error_message": f"Error during content parsing: {e}"
}
# --- Main execution function ---
def load_urls_from_sitemap(sitemap_path):
"""Parses an XML sitemap file and returns a list of URLs."""
urls = []
try:
tree = ET.parse(sitemap_path)
root = tree.getroot()
# Namespace for sitemap XML
namespace = {'sitemap': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
for url_element in root.findall('sitemap:url', namespace):
loc_element = url_element.find('sitemap:loc', namespace)
if loc_element is not None:
urls.append(loc_element.text)
print(f"[*] Loaded {len(urls)} URLs from sitemap: '{sitemap_path}'")
return urls
except FileNotFoundError:
print(f"ERROR: Sitemap file '{sitemap_path}' not found.")
return []
except ET.ParseError as e:
print(f"ERROR: Failed to parse sitemap '{sitemap_path}': {e}")
return []
except Exception as e:
print(f"An unexpected error occurred while loading sitemap: {e}")
return []
async def main():
init_db()
urls_to_scrape = []
# Prompt user for sitemap or urls.txt
print("\n--- URL Source Selection ---")
print("1. Load URLs from 'urls.txt' (one URL per line)")
print("2. Load URLs from an XML sitemap file")
choice = input("Enter your choice (1 or 2): ").strip()
if choice == '1':
try:
with open(URLS_FILE, 'r', encoding='utf-8') as f:
urls_to_scrape = [line.strip() for line in f if line.strip()]
if not urls_to_scrape:
print(f"WARNING: '{URLS_FILE}' is empty. No URLs to scrape.")
return
print(f"[*] Using URLs from '{URLS_FILE}'.")
except FileNotFoundError:
print(f"ERROR: '{URLS_FILE}' not found. Please create the file with URLs, one per line.")
return
elif choice == '2':
sitemap_path = input("Enter the path to the XML sitemap file: ").strip()
if not sitemap_path:
print("Sitemap path cannot be empty. Exiting.")
return
urls_to_scrape = load_urls_from_sitemap(sitemap_path)
if not urls_to_scrape:
print("No URLs loaded from sitemap. Exiting.")
return
else:
print("Invalid choice. Please enter 1 or 2.")
return
total_urls = len(urls_to_scrape)
if total_urls == 0:
print("No URLs available for scraping. Exiting.")
return
start_total_time = time.time()
print(f"--- Starting automated scraping of {total_urls} URLs ---")
# This message is crucial for the user to understand what will happen visually
print(f"[*] Browsers will launch non-headless (visible), process the page, and close automatically for each URL.")
print(f"[*] No manual input required after starting. It will proceed to the next URL after a short delay.")
print(f"[*] Delay between requests: {DELAY_MIN_SECONDS:.1f} - {DELAY_MAX_SECONDS:.1f} seconds.")
print(f"[*] Navigation timeout set to {NAVIGATION_TIMEOUT_SECONDS} seconds per page.")
# We will launch and close a browser/context for each URL.
# This ensures a clean state and handles potential hangs more robustly when headless=False.
for i, url in enumerate(urls_to_scrape):
current_index = i + 1
elapsed_time = time.time() - start_total_time
avg_time_per_url = elapsed_time / current_index if current_index > 0 else 0
remaining_urls = total_urls - current_index
eta_seconds = remaining_urls * avg_time_per_url
eta_display = str(datetime.timedelta(seconds=int(eta_seconds)))
print(f"\n--- Progress: {current_index}/{total_urls} --- ETA: {eta_display} ---")
print(f"[*] Attempting to navigate to: {url}")
browser = None
status_code = 0
scraped_data_title = "Not Scraped"
scraped_data_markdown = ""
scraped_data_error = "Unknown error"
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False) # STILL headless=False
context = await browser.new_context(java_script_enabled=False)
# Set realistic headers for the context
await context.set_extra_http_headers({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "en,en-AU;q=0.9,sr;q=0.8,sr-RS;q=0.7,en-GB;q=0.6,en-US;q=0.5,hr;q=0.4",
"Cache-Control": "max-age=0",
"Sec-Ch-Ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1"
})
page = await context.new_page()
# --- Core Navigation with Timeout ---
try:
response = await page.goto(url, wait_until="load", timeout=NAVIGATION_TIMEOUT_SECONDS * 1000) # Playwright timeout is in ms
status_code = response.status if response else 0
print(f"[*] Page loaded. HTTP Status: {status_code}")
if 200 <= status_code < 300: # Success codes
scraped_content_result = await process_page_content(page, url)
scraped_data_title = scraped_content_result["title"]
scraped_data_markdown = scraped_content_result["markdown_content"]
scraped_data_error = scraped_content_result["error_message"]
print(f"[*] Content extraction attempted for: {url}")
else:
scraped_data_title = f"HTTP Error {status_code}"
scraped_data_markdown = ""
scraped_data_error = f"Navigation failed with status {status_code}"
print(f"[*] WARNING: Non-2xx status code: {status_code}")
except PlaywrightTimeoutError:
status_code = 408 # Request Timeout
scraped_data_title = "Navigation Timeout"
scraped_data_markdown = ""
scraped_data_error = f"Navigation timed out after {NAVIGATION_TIMEOUT_SECONDS} seconds."
print(f"[*] ERROR: Navigation timed out for {url}")
except Exception as nav_error:
status_code = 0 # General error
scraped_data_title = "Navigation Error"
scraped_data_markdown = ""
scraped_data_error = f"Error during navigation: {nav_error}"
print(f"[*] ERROR during navigation for {url}: {nav_error}")
# Ensure page and context are closed after each URL
await page.close()
await context.close()
except Exception as browser_launch_error:
# This catches errors if the browser itself fails to launch or something critical
scraped_data_title = "Browser Launch Error"
scraped_data_markdown = ""
scraped_data_error = f"Browser or context launch failed: {browser_launch_error}"
print(f"[*] CRITICAL ERROR (Browser/Context Launch) for {url}: {browser_launch_error}")
finally:
if browser:
await browser.close() # Ensure the browser instance is closed
# Store result in DB
insert_scraped_data(
url,
scraped_data_title,
scraped_data_markdown,
status_code,
scraped_data_error
)
print(f"[*] Data for {url} saved to '{DATABASE_FILE}'.")
# Save to MD file if successful content was extracted and no content parsing error
if scraped_data_markdown and not scraped_data_error:
safe_filename = re.sub(r'[\\/:*?"<>| ]', '_', scraped_data_title)[:100]
md_filename = os.path.join(MARKDOWN_OUTPUT_DIR, f"{safe_filename}.md")
try:
with open(md_filename, 'w', encoding='utf-8') as f:
f.write(scraped_data_markdown)
print(f"[*] Markdown saved to {md_filename}")
except Exception as file_error:
print(f"[*] ERROR: Failed to save MD file for {url}: {file_error}")
elif scraped_data_error:
print(f"[*] Skipping MD file creation for {url} due to an error.")
# Add a random delay between requests, except after the very last URL
if i < total_urls - 1:
delay = random.uniform(DELAY_MIN_SECONDS, DELAY_MAX_SECONDS)
print(f"[*] Waiting {delay:.2f} seconds before next URL...")
await asyncio.sleep(delay)
end_total_time = time.time()
total_duration = str(datetime.timedelta(seconds=int(end_total_time - start_total_time)))
print(f"\n=== Automated scraping process complete! ===")
print(f"Total URLs processed: {total_urls}")
print(f"Total duration: {total_duration}")
print(f"Scraped data saved to '{DATABASE_FILE}' and markdown files in '{MARKDOWN_OUTPUT_DIR}/'.")
if __name__ == "__main__":
asyncio.run(main()) |