scrapper / webscrap.py
adityasproutsai's picture
api setup
0501b9e
import time
import os
import re
import urllib.parse
import sys
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
from cleaning import clean_text
class WebScraper:
def __init__(self, base_url,college, output_dir=None, max_depth=None, max_pages=None, delay=1, headless=True):
"""Initialize the web scraper with configuration parameters."""
parsed_url = urllib.parse.urlparse(base_url)
self.base_url = base_url
self.base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
self.college = college
self.max_depth = max_depth # None means unlimited depth
self.max_pages = max_pages # None means unlimited pages
self.delay = delay
self.visited_links = set()
self.page_count = 0
self.start_time = datetime.now()
if output_dir is None:
domain_name = parsed_url.netloc.replace(".", "_")
self.output_dir = f"{domain_name}_scraped_content"
else:
self.output_dir = output_dir
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
self.log_path = os.path.join(self.output_dir, "scrape_log.txt")
with open(self.log_path, "w", encoding="utf-8") as log_file:
log_file.write(f"Scraping started at: {self.start_time}\n")
log_file.write(f"Base URL: {self.base_url}\n")
log_file.write(f"Max depth: {self.max_depth if self.max_depth is not None else 'Unlimited'}\n")
log_file.write(f"Max pages: {self.max_pages if self.max_pages is not None else 'Unlimited'}\n")
log_file.write(f"Delay between requests: {self.delay} seconds\n\n")
# Configure Selenium
options = Options()
if headless:
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--window-size=1920,1080")
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36")
# Initialize the WebDriver
self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
def log_message(self, message):
"""Write a message to both console and log file."""
print(message)
with open(self.log_path, "a", encoding="utf-8") as log_file:
log_file.write(f"{message}\n")
def is_valid_url(self, url):
"""Check if a URL is valid and should be scraped."""
# Skip URLs that are not part of the same domain
if not url.startswith(self.base_domain):
return False
# Skip common file types that aren't web pages
if re.search(r"\.(pdf|jpg|jpeg|png|gif|svg|css|js|xml|json)$", url, re.IGNORECASE):
return False
# Skip URLs with fragments (#) as they often point to the same page
if "#" in url:
url = url.split("#")[0]
if url in self.visited_links:
return False
return True
def sanitize_filename(self, text):
"""Convert text to a safe filename."""
if not text or len(text.strip()) == 0:
return f"Page_{self.page_count}"
# Replace invalid filename characters
safe_name = re.sub(r'[^\w\s()-]', "_", text)
safe_name = re.sub(r'\s+', "_", safe_name)
# Limit filename length
return safe_name[:100] if len(safe_name) > 100 else safe_name
def extract_links(self):
"""Extract all valid links from the current page."""
links = self.driver.find_elements(By.TAG_NAME, "a")
valid_links = []
for link in links:
try:
href = link.get_attribute("href")
if href and self.is_valid_url(href) and href not in self.visited_links:
valid_links.append(href)
except Exception:
continue
return list(set(valid_links)) # Remove duplicates
def scrape_page(self, url):
"""Scrape content from a single page."""
if url in self.visited_links:
return []
# Handle page_count overflow - extremely unlikely but possible during unlimited scraping
if self.page_count >= sys.maxsize - 1:
self.log_message("Warning: Page count approaching integer limit. Resetting to avoid overflow.")
self.page_count = 0
self.page_count += 1
self.visited_links.add(url)
# Create status message
if self.max_pages:
status = f"Scraping [{self.page_count}/{self.max_pages}]: {url}"
else:
status = f"Scraping page #{self.page_count}: {url}"
self.log_message(status)
try:
self.driver.get(url)
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(self.delay) # Respectful delay
# Extract page title
try:
page_title = self.driver.title
if not page_title or page_title.strip() == "":
page_title = f"Page_{self.page_count}"
except:
page_title = f"Page_{self.page_count}"
# Sanitize for filename
safe_filename = self.sanitize_filename(page_title)
# Add a counter suffix if there's a filename collision
original_filename = safe_filename
counter = 1
while os.path.exists(os.path.join(self.output_dir, f"{safe_filename}.txt")):
safe_filename = f"{original_filename}_{counter}"
counter += 1
# Extract page content
page_text = self.driver.find_element(By.TAG_NAME, "body").text
# Save to file
print(self.output_dir)
file_path = os.path.join(self.output_dir, f"{safe_filename}.md")
with open(file_path, "w", encoding="utf-8") as file:
page_text = clean_text(page_text,self.college)
file.write(f"URL: {url}\nTitle: {page_title}\nScraped at: {datetime.now()}\n\n{page_text}\n")
self.log_message(f"✓ Saved: {safe_filename}.txt")
# Extract links for further crawling
new_links = self.extract_links()
self.log_message(f"Found {len(new_links)} new links on this page")
return new_links
except Exception as e:
self.log_message(f"✗ Error scraping {url}: {str(e)}")
return []
def save_progress_state(self, queue):
"""Save current progress to allow for resuming."""
progress_file = os.path.join(self.output_dir, "scrape_progress.txt")
with open(progress_file, "w", encoding="utf-8") as file:
file.write(f"page_count={self.page_count}\n")
file.write(f"visited_links={len(self.visited_links)}\n")
file.write(f"queue_size={len(queue)}\n")
file.write(f"timestamp={datetime.now()}\n")
# Save a sample of queued URLs (first 100)
file.write("\nQueue sample (first 100):\n")
for i, (url, depth) in enumerate(queue[:100]):
file.write(f"{url} (depth {depth})\n")
if i >= 99:
break
def start(self):
"""Start the scraping process using a queue-based approach to avoid recursion limits."""
try:
self.log_message(f"Starting unlimited web scraping from: {self.base_url}")
self.log_message(f"Output directory: {self.output_dir}")
if self.max_depth is None:
self.log_message(f"Depth limit: Unlimited")
else:
self.log_message(f"Maximum depth: {self.max_depth}")
if self.max_pages is None:
self.log_message(f"Page limit: Unlimited")
else:
self.log_message(f"Maximum pages: {self.max_pages}")
# Initialize queue with starting URL and depth
queue = [(self.base_url, 1)]
last_progress_save = time.time()
# Process the queue
while queue:
# Check if we've hit the maximum pages limit
if self.max_pages is not None and self.page_count >= self.max_pages:
self.log_message(f"Reached maximum page limit ({self.max_pages})")
break
# Get the next URL and its depth from the queue
current_url, current_depth = queue.pop(0)
# Skip if already visited
if current_url in self.visited_links:
continue
# Check depth limit
if self.max_depth is not None and current_depth > self.max_depth:
continue
# Scrape the page
new_links = self.scrape_page(current_url)
# Add new links to the queue
for link in new_links:
if link not in self.visited_links:
queue.append((link, current_depth + 1))
# Save progress every 5 minutes
if time.time() - last_progress_save > 300: # 300 seconds = 5 minutes
self.save_progress_state(queue)
last_progress_save = time.time()
# Memory management - clear visited links that are not in queue
# Only do this if we've visited a lot of pages
if len(self.visited_links) > 10000:
self.log_message("Performing memory management...")
queue_urls = set(url for url, _ in queue)
self.visited_links = set(url for url in self.visited_links if url in queue_urls or url == self.base_url)
self.log_message(f"Memory cleaned. Keeping track of {len(self.visited_links)} visited URLs.")
# Periodically report progress
if self.page_count % 10 == 0:
elapsed = datetime.now() - self.start_time
self.log_message(f"Progress: {self.page_count} pages scraped, {len(queue)} links in queue, running for {elapsed}")
# Final progress save
self.save_progress_state(queue)
elapsed_time = datetime.now() - self.start_time
self.log_message(f"\n✅ Scraping complete! Scraped {self.page_count} pages in {elapsed_time}.")
self.log_message(f"Content saved in '{self.output_dir}' directory")
except Exception as e:
self.log_message(f"Error during scraping: {str(e)}")
finally:
self.driver.quit()
# Example usage
if __name__ == "__main__":
# Set your base URL here
BASE_URLS = ["https://iiitranchi.ac.in/","https://www.shiksha.com/college/indian-institute-of-information-technology-ranchi-53869"]
# Create and configure the scraper with unlimited depth and pages
for index in range(len(BASE_URLS)):
scraper = WebScraper(
base_url=BASE_URLS[index],
college="IIIT RANCHI",
output_dir= f"IIIT RANCHI/{index+1}",
max_depth=None, # Set to None for unlimited depth
max_pages=5, # Set to None for unlimited pages
delay=5, # Seconds to wait between requests
headless=True # Run browser in headless mode
)
# Start scraping
scraper.start()