Spaces:
Sleeping
Sleeping
| import time | |
| import os | |
| import re | |
| import urllib.parse | |
| import sys | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from datetime import datetime | |
| from cleaning import clean_text | |
| class WebScraper: | |
| def __init__(self, base_url,college, output_dir=None, max_depth=None, max_pages=None, delay=1, headless=True): | |
| """Initialize the web scraper with configuration parameters.""" | |
| parsed_url = urllib.parse.urlparse(base_url) | |
| self.base_url = base_url | |
| self.base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
| self.college = college | |
| self.max_depth = max_depth # None means unlimited depth | |
| self.max_pages = max_pages # None means unlimited pages | |
| self.delay = delay | |
| self.visited_links = set() | |
| self.page_count = 0 | |
| self.start_time = datetime.now() | |
| if output_dir is None: | |
| domain_name = parsed_url.netloc.replace(".", "_") | |
| self.output_dir = f"{domain_name}_scraped_content" | |
| else: | |
| self.output_dir = output_dir | |
| if not os.path.exists(self.output_dir): | |
| os.makedirs(self.output_dir) | |
| self.log_path = os.path.join(self.output_dir, "scrape_log.txt") | |
| with open(self.log_path, "w", encoding="utf-8") as log_file: | |
| log_file.write(f"Scraping started at: {self.start_time}\n") | |
| log_file.write(f"Base URL: {self.base_url}\n") | |
| log_file.write(f"Max depth: {self.max_depth if self.max_depth is not None else 'Unlimited'}\n") | |
| log_file.write(f"Max pages: {self.max_pages if self.max_pages is not None else 'Unlimited'}\n") | |
| log_file.write(f"Delay between requests: {self.delay} seconds\n\n") | |
| # Configure Selenium | |
| options = Options() | |
| if headless: | |
| options.add_argument("--headless") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--window-size=1920,1080") | |
| options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36") | |
| # Initialize the WebDriver | |
| self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) | |
| def log_message(self, message): | |
| """Write a message to both console and log file.""" | |
| print(message) | |
| with open(self.log_path, "a", encoding="utf-8") as log_file: | |
| log_file.write(f"{message}\n") | |
| def is_valid_url(self, url): | |
| """Check if a URL is valid and should be scraped.""" | |
| # Skip URLs that are not part of the same domain | |
| if not url.startswith(self.base_domain): | |
| return False | |
| # Skip common file types that aren't web pages | |
| if re.search(r"\.(pdf|jpg|jpeg|png|gif|svg|css|js|xml|json)$", url, re.IGNORECASE): | |
| return False | |
| # Skip URLs with fragments (#) as they often point to the same page | |
| if "#" in url: | |
| url = url.split("#")[0] | |
| if url in self.visited_links: | |
| return False | |
| return True | |
| def sanitize_filename(self, text): | |
| """Convert text to a safe filename.""" | |
| if not text or len(text.strip()) == 0: | |
| return f"Page_{self.page_count}" | |
| # Replace invalid filename characters | |
| safe_name = re.sub(r'[^\w\s()-]', "_", text) | |
| safe_name = re.sub(r'\s+', "_", safe_name) | |
| # Limit filename length | |
| return safe_name[:100] if len(safe_name) > 100 else safe_name | |
| def extract_links(self): | |
| """Extract all valid links from the current page.""" | |
| links = self.driver.find_elements(By.TAG_NAME, "a") | |
| valid_links = [] | |
| for link in links: | |
| try: | |
| href = link.get_attribute("href") | |
| if href and self.is_valid_url(href) and href not in self.visited_links: | |
| valid_links.append(href) | |
| except Exception: | |
| continue | |
| return list(set(valid_links)) # Remove duplicates | |
| def scrape_page(self, url): | |
| """Scrape content from a single page.""" | |
| if url in self.visited_links: | |
| return [] | |
| # Handle page_count overflow - extremely unlikely but possible during unlimited scraping | |
| if self.page_count >= sys.maxsize - 1: | |
| self.log_message("Warning: Page count approaching integer limit. Resetting to avoid overflow.") | |
| self.page_count = 0 | |
| self.page_count += 1 | |
| self.visited_links.add(url) | |
| # Create status message | |
| if self.max_pages: | |
| status = f"Scraping [{self.page_count}/{self.max_pages}]: {url}" | |
| else: | |
| status = f"Scraping page #{self.page_count}: {url}" | |
| self.log_message(status) | |
| try: | |
| self.driver.get(url) | |
| WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body"))) | |
| time.sleep(self.delay) # Respectful delay | |
| # Extract page title | |
| try: | |
| page_title = self.driver.title | |
| if not page_title or page_title.strip() == "": | |
| page_title = f"Page_{self.page_count}" | |
| except: | |
| page_title = f"Page_{self.page_count}" | |
| # Sanitize for filename | |
| safe_filename = self.sanitize_filename(page_title) | |
| # Add a counter suffix if there's a filename collision | |
| original_filename = safe_filename | |
| counter = 1 | |
| while os.path.exists(os.path.join(self.output_dir, f"{safe_filename}.txt")): | |
| safe_filename = f"{original_filename}_{counter}" | |
| counter += 1 | |
| # Extract page content | |
| page_text = self.driver.find_element(By.TAG_NAME, "body").text | |
| # Save to file | |
| print(self.output_dir) | |
| file_path = os.path.join(self.output_dir, f"{safe_filename}.md") | |
| with open(file_path, "w", encoding="utf-8") as file: | |
| page_text = clean_text(page_text,self.college) | |
| file.write(f"URL: {url}\nTitle: {page_title}\nScraped at: {datetime.now()}\n\n{page_text}\n") | |
| self.log_message(f"✓ Saved: {safe_filename}.txt") | |
| # Extract links for further crawling | |
| new_links = self.extract_links() | |
| self.log_message(f"Found {len(new_links)} new links on this page") | |
| return new_links | |
| except Exception as e: | |
| self.log_message(f"✗ Error scraping {url}: {str(e)}") | |
| return [] | |
| def save_progress_state(self, queue): | |
| """Save current progress to allow for resuming.""" | |
| progress_file = os.path.join(self.output_dir, "scrape_progress.txt") | |
| with open(progress_file, "w", encoding="utf-8") as file: | |
| file.write(f"page_count={self.page_count}\n") | |
| file.write(f"visited_links={len(self.visited_links)}\n") | |
| file.write(f"queue_size={len(queue)}\n") | |
| file.write(f"timestamp={datetime.now()}\n") | |
| # Save a sample of queued URLs (first 100) | |
| file.write("\nQueue sample (first 100):\n") | |
| for i, (url, depth) in enumerate(queue[:100]): | |
| file.write(f"{url} (depth {depth})\n") | |
| if i >= 99: | |
| break | |
| def start(self): | |
| """Start the scraping process using a queue-based approach to avoid recursion limits.""" | |
| try: | |
| self.log_message(f"Starting unlimited web scraping from: {self.base_url}") | |
| self.log_message(f"Output directory: {self.output_dir}") | |
| if self.max_depth is None: | |
| self.log_message(f"Depth limit: Unlimited") | |
| else: | |
| self.log_message(f"Maximum depth: {self.max_depth}") | |
| if self.max_pages is None: | |
| self.log_message(f"Page limit: Unlimited") | |
| else: | |
| self.log_message(f"Maximum pages: {self.max_pages}") | |
| # Initialize queue with starting URL and depth | |
| queue = [(self.base_url, 1)] | |
| last_progress_save = time.time() | |
| # Process the queue | |
| while queue: | |
| # Check if we've hit the maximum pages limit | |
| if self.max_pages is not None and self.page_count >= self.max_pages: | |
| self.log_message(f"Reached maximum page limit ({self.max_pages})") | |
| break | |
| # Get the next URL and its depth from the queue | |
| current_url, current_depth = queue.pop(0) | |
| # Skip if already visited | |
| if current_url in self.visited_links: | |
| continue | |
| # Check depth limit | |
| if self.max_depth is not None and current_depth > self.max_depth: | |
| continue | |
| # Scrape the page | |
| new_links = self.scrape_page(current_url) | |
| # Add new links to the queue | |
| for link in new_links: | |
| if link not in self.visited_links: | |
| queue.append((link, current_depth + 1)) | |
| # Save progress every 5 minutes | |
| if time.time() - last_progress_save > 300: # 300 seconds = 5 minutes | |
| self.save_progress_state(queue) | |
| last_progress_save = time.time() | |
| # Memory management - clear visited links that are not in queue | |
| # Only do this if we've visited a lot of pages | |
| if len(self.visited_links) > 10000: | |
| self.log_message("Performing memory management...") | |
| queue_urls = set(url for url, _ in queue) | |
| self.visited_links = set(url for url in self.visited_links if url in queue_urls or url == self.base_url) | |
| self.log_message(f"Memory cleaned. Keeping track of {len(self.visited_links)} visited URLs.") | |
| # Periodically report progress | |
| if self.page_count % 10 == 0: | |
| elapsed = datetime.now() - self.start_time | |
| self.log_message(f"Progress: {self.page_count} pages scraped, {len(queue)} links in queue, running for {elapsed}") | |
| # Final progress save | |
| self.save_progress_state(queue) | |
| elapsed_time = datetime.now() - self.start_time | |
| self.log_message(f"\n✅ Scraping complete! Scraped {self.page_count} pages in {elapsed_time}.") | |
| self.log_message(f"Content saved in '{self.output_dir}' directory") | |
| except Exception as e: | |
| self.log_message(f"Error during scraping: {str(e)}") | |
| finally: | |
| self.driver.quit() | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Set your base URL here | |
| BASE_URLS = ["https://iiitranchi.ac.in/","https://www.shiksha.com/college/indian-institute-of-information-technology-ranchi-53869"] | |
| # Create and configure the scraper with unlimited depth and pages | |
| for index in range(len(BASE_URLS)): | |
| scraper = WebScraper( | |
| base_url=BASE_URLS[index], | |
| college="IIIT RANCHI", | |
| output_dir= f"IIIT RANCHI/{index+1}", | |
| max_depth=None, # Set to None for unlimited depth | |
| max_pages=5, # Set to None for unlimited pages | |
| delay=5, # Seconds to wait between requests | |
| headless=True # Run browser in headless mode | |
| ) | |
| # Start scraping | |
| scraper.start() |