google-links / scrape.py
dejanseo's picture
Upload 22 files
f29b6e6 verified
import asyncio
from playwright.async_api import async_playwright, Playwright, TimeoutError as PlaywrightTimeoutError
from bs4 import BeautifulSoup, NavigableString, Tag
import re
import os
from urllib.parse import urljoin
import sqlite3
import datetime
import time
import random
import xml.etree.ElementTree as ET
# --- Configuration ---
DATABASE_FILE = "scraped.db"
MARKDOWN_OUTPUT_DIR = "scraped_md"
URLS_FILE = "urls.txt"
DELAY_MIN_SECONDS = 0.5
DELAY_MAX_SECONDS = 1.0
NAVIGATION_TIMEOUT_SECONDS = 60 # Set an explicit timeout for page navigation
# --- Database Functions ---
def init_db():
"""Initializes the SQLite database and creates the necessary table."""
os.makedirs(MARKDOWN_OUTPUT_DIR, exist_ok=True)
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS scraped_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
title TEXT,
full_markdown_content TEXT,
status_code INTEGER,
error_message TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
print(f"[*] Database '{DATABASE_FILE}' initialized and '{MARKDOWN_OUTPUT_DIR}' directory ensured.")
def insert_scraped_data(url, title, markdown_content, status_code, error_message=None):
"""Inserts scraped data into the database."""
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO scraped_data (url, title, full_markdown_content, status_code, error_message)
VALUES (?, ?, ?, ?, ?)
''', (url, title, markdown_content, status_code, error_message))
conn.commit()
conn.close()
# --- Core Scraping Function for a single page's content extraction ---
async def process_page_content(page, url):
"""
Extracts text and inline Markdown links from <p> tags on a given page.
"""
try:
html_content = await page.content()
soup = BeautifulSoup(html_content, 'html.parser')
page_title = soup.find('title').text if soup.find('title') else 'Untitled_Page_No_JS'
markdown_paragraphs = []
for p_tag in soup.find_all('p'):
paragraph_markdown = []
for content in p_tag.contents:
if isinstance(content, NavigableString):
paragraph_markdown.append(str(content).strip())
elif isinstance(content, Tag) and content.name == 'a':
link_text = content.get_text(strip=True)
link_href = content.get('href')
if link_href:
if not link_href.startswith(('http://', 'https://', 'mailto:', 'tel:', '#')):
link_href = urljoin(url, link_href)
paragraph_markdown.append(f"[{link_text}]({link_href})")
else:
paragraph_markdown.append(link_text)
else:
paragraph_markdown.append(content.get_text(strip=True))
if any(paragraph_markdown):
joined_paragraph = " ".join(paragraph_markdown).strip()
joined_paragraph = re.sub(r'\s+', ' ', joined_paragraph) # Clean multiple spaces
markdown_paragraphs.append(joined_paragraph)
full_markdown_content = f"# {page_title}\n\n"
full_markdown_content += "\n\n".join(markdown_paragraphs)
return {
"title": page_title,
"markdown_content": full_markdown_content,
"error_message": None
}
except Exception as e:
return {
"title": "Error Processing Content",
"markdown_content": "",
"error_message": f"Error during content parsing: {e}"
}
# --- Main execution function ---
def load_urls_from_sitemap(sitemap_path):
"""Parses an XML sitemap file and returns a list of URLs."""
urls = []
try:
tree = ET.parse(sitemap_path)
root = tree.getroot()
# Namespace for sitemap XML
namespace = {'sitemap': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
for url_element in root.findall('sitemap:url', namespace):
loc_element = url_element.find('sitemap:loc', namespace)
if loc_element is not None:
urls.append(loc_element.text)
print(f"[*] Loaded {len(urls)} URLs from sitemap: '{sitemap_path}'")
return urls
except FileNotFoundError:
print(f"ERROR: Sitemap file '{sitemap_path}' not found.")
return []
except ET.ParseError as e:
print(f"ERROR: Failed to parse sitemap '{sitemap_path}': {e}")
return []
except Exception as e:
print(f"An unexpected error occurred while loading sitemap: {e}")
return []
async def main():
init_db()
urls_to_scrape = []
# Prompt user for sitemap or urls.txt
print("\n--- URL Source Selection ---")
print("1. Load URLs from 'urls.txt' (one URL per line)")
print("2. Load URLs from an XML sitemap file")
choice = input("Enter your choice (1 or 2): ").strip()
if choice == '1':
try:
with open(URLS_FILE, 'r', encoding='utf-8') as f:
urls_to_scrape = [line.strip() for line in f if line.strip()]
if not urls_to_scrape:
print(f"WARNING: '{URLS_FILE}' is empty. No URLs to scrape.")
return
print(f"[*] Using URLs from '{URLS_FILE}'.")
except FileNotFoundError:
print(f"ERROR: '{URLS_FILE}' not found. Please create the file with URLs, one per line.")
return
elif choice == '2':
sitemap_path = input("Enter the path to the XML sitemap file: ").strip()
if not sitemap_path:
print("Sitemap path cannot be empty. Exiting.")
return
urls_to_scrape = load_urls_from_sitemap(sitemap_path)
if not urls_to_scrape:
print("No URLs loaded from sitemap. Exiting.")
return
else:
print("Invalid choice. Please enter 1 or 2.")
return
total_urls = len(urls_to_scrape)
if total_urls == 0:
print("No URLs available for scraping. Exiting.")
return
start_total_time = time.time()
print(f"--- Starting automated scraping of {total_urls} URLs ---")
# This message is crucial for the user to understand what will happen visually
print(f"[*] Browsers will launch non-headless (visible), process the page, and close automatically for each URL.")
print(f"[*] No manual input required after starting. It will proceed to the next URL after a short delay.")
print(f"[*] Delay between requests: {DELAY_MIN_SECONDS:.1f} - {DELAY_MAX_SECONDS:.1f} seconds.")
print(f"[*] Navigation timeout set to {NAVIGATION_TIMEOUT_SECONDS} seconds per page.")
# We will launch and close a browser/context for each URL.
# This ensures a clean state and handles potential hangs more robustly when headless=False.
for i, url in enumerate(urls_to_scrape):
current_index = i + 1
elapsed_time = time.time() - start_total_time
avg_time_per_url = elapsed_time / current_index if current_index > 0 else 0
remaining_urls = total_urls - current_index
eta_seconds = remaining_urls * avg_time_per_url
eta_display = str(datetime.timedelta(seconds=int(eta_seconds)))
print(f"\n--- Progress: {current_index}/{total_urls} --- ETA: {eta_display} ---")
print(f"[*] Attempting to navigate to: {url}")
browser = None
status_code = 0
scraped_data_title = "Not Scraped"
scraped_data_markdown = ""
scraped_data_error = "Unknown error"
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False) # STILL headless=False
context = await browser.new_context(java_script_enabled=False)
# Set realistic headers for the context
await context.set_extra_http_headers({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "en,en-AU;q=0.9,sr;q=0.8,sr-RS;q=0.7,en-GB;q=0.6,en-US;q=0.5,hr;q=0.4",
"Cache-Control": "max-age=0",
"Sec-Ch-Ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1"
})
page = await context.new_page()
# --- Core Navigation with Timeout ---
try:
response = await page.goto(url, wait_until="load", timeout=NAVIGATION_TIMEOUT_SECONDS * 1000) # Playwright timeout is in ms
status_code = response.status if response else 0
print(f"[*] Page loaded. HTTP Status: {status_code}")
if 200 <= status_code < 300: # Success codes
scraped_content_result = await process_page_content(page, url)
scraped_data_title = scraped_content_result["title"]
scraped_data_markdown = scraped_content_result["markdown_content"]
scraped_data_error = scraped_content_result["error_message"]
print(f"[*] Content extraction attempted for: {url}")
else:
scraped_data_title = f"HTTP Error {status_code}"
scraped_data_markdown = ""
scraped_data_error = f"Navigation failed with status {status_code}"
print(f"[*] WARNING: Non-2xx status code: {status_code}")
except PlaywrightTimeoutError:
status_code = 408 # Request Timeout
scraped_data_title = "Navigation Timeout"
scraped_data_markdown = ""
scraped_data_error = f"Navigation timed out after {NAVIGATION_TIMEOUT_SECONDS} seconds."
print(f"[*] ERROR: Navigation timed out for {url}")
except Exception as nav_error:
status_code = 0 # General error
scraped_data_title = "Navigation Error"
scraped_data_markdown = ""
scraped_data_error = f"Error during navigation: {nav_error}"
print(f"[*] ERROR during navigation for {url}: {nav_error}")
# Ensure page and context are closed after each URL
await page.close()
await context.close()
except Exception as browser_launch_error:
# This catches errors if the browser itself fails to launch or something critical
scraped_data_title = "Browser Launch Error"
scraped_data_markdown = ""
scraped_data_error = f"Browser or context launch failed: {browser_launch_error}"
print(f"[*] CRITICAL ERROR (Browser/Context Launch) for {url}: {browser_launch_error}")
finally:
if browser:
await browser.close() # Ensure the browser instance is closed
# Store result in DB
insert_scraped_data(
url,
scraped_data_title,
scraped_data_markdown,
status_code,
scraped_data_error
)
print(f"[*] Data for {url} saved to '{DATABASE_FILE}'.")
# Save to MD file if successful content was extracted and no content parsing error
if scraped_data_markdown and not scraped_data_error:
safe_filename = re.sub(r'[\\/:*?"<>| ]', '_', scraped_data_title)[:100]
md_filename = os.path.join(MARKDOWN_OUTPUT_DIR, f"{safe_filename}.md")
try:
with open(md_filename, 'w', encoding='utf-8') as f:
f.write(scraped_data_markdown)
print(f"[*] Markdown saved to {md_filename}")
except Exception as file_error:
print(f"[*] ERROR: Failed to save MD file for {url}: {file_error}")
elif scraped_data_error:
print(f"[*] Skipping MD file creation for {url} due to an error.")
# Add a random delay between requests, except after the very last URL
if i < total_urls - 1:
delay = random.uniform(DELAY_MIN_SECONDS, DELAY_MAX_SECONDS)
print(f"[*] Waiting {delay:.2f} seconds before next URL...")
await asyncio.sleep(delay)
end_total_time = time.time()
total_duration = str(datetime.timedelta(seconds=int(end_total_time - start_total_time)))
print(f"\n=== Automated scraping process complete! ===")
print(f"Total URLs processed: {total_urls}")
print(f"Total duration: {total_duration}")
print(f"Scraped data saved to '{DATABASE_FILE}' and markdown files in '{MARKDOWN_OUTPUT_DIR}/'.")
if __name__ == "__main__":
asyncio.run(main())