| import streamlit as st |
| import requests |
| from bs4 import BeautifulSoup |
| import time |
| import csv |
| import random |
| import re |
| import os |
| import io |
| import base64 |
| import html2text |
| import threading |
| import queue |
| from urllib.parse import urlparse, urljoin |
|
|
| st.set_page_config( |
| page_title="Web Scraper", |
| page_icon="🕸️", |
| layout="wide", |
| initial_sidebar_state="collapsed" |
| ) |
|
|
| |
| st.markdown(""" |
| <style> |
| header {visibility: hidden;} |
| footer {visibility: hidden;} |
| .stButton button { |
| background-color: #4CAF50; |
| color: white; |
| padding: 0.5rem 1rem; |
| font-size: 1rem; |
| border-radius: 5px; |
| } |
| .scroll-container { |
| max-height: 500px; |
| overflow-y: auto; |
| border: 1px solid #ddd; |
| padding: 10px; |
| margin-bottom: 10px; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
| def scrape_website(base_url, max_pages=None, progress_bar=None, status_text=None): |
| """ |
| Scrape all pages of a website and download all content |
| """ |
| all_data = [] |
| current_page = 1 |
| has_next_page = True |
| |
| |
| user_agents = [ |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36', |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15', |
| 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36', |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:94.0) Gecko/20100101 Firefox/94.0' |
| ] |
| |
| |
| session = requests.Session() |
| |
| |
| parsed_url = urlparse(base_url) |
| domain = f"{parsed_url.scheme}://{parsed_url.netloc}" |
| |
| |
| if '/docs/' in parsed_url.path or parsed_url.path.endswith('/docs'): |
| domain_with_path = base_url |
| |
| if not domain_with_path.endswith('/'): |
| domain_with_path += '/' |
| else: |
| domain_with_path = domain |
| |
| |
| logs = [] |
| logs.append(f"Finding URLs from {base_url}") |
| |
| |
| html_converter = html2text.HTML2Text() |
| html_converter.ignore_links = False |
| html_converter.ignore_images = False |
| html_converter.ignore_tables = False |
| html_converter.body_width = 0 |
| |
| |
| visited_urls = set() |
| all_found_urls = [] |
| |
| |
| is_docs_site = '/docs/' in parsed_url.path or parsed_url.path.endswith('/docs') |
| |
| |
| while has_next_page and (max_pages is None or current_page <= max_pages): |
| logs.append(f"Scanning page {current_page}...") |
| if status_text: |
| status_text.text(f"Finding URLs from page {current_page}...") |
| |
| |
| if progress_bar is not None and max_pages: |
| progress_value = min(current_page / (max_pages * 2), 0.5) |
| progress_bar.progress(progress_value) |
| |
| |
| current_agent = random.choice(user_agents) |
| session.headers.update({ |
| 'User-Agent': current_agent, |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
| 'Accept-Language': 'en-US,en;q=0.5', |
| 'Referer': domain, |
| 'DNT': '1', |
| 'Connection': 'keep-alive', |
| 'Upgrade-Insecure-Requests': '1' |
| }) |
| |
| |
| if current_page == 1: |
| page_url = base_url |
| else: |
| |
| if is_docs_site: |
| |
| |
| if all_found_urls and len(all_data) < max_pages: |
| |
| unvisited_urls = [u for u in all_found_urls if u['url'] not in visited_urls] |
| if unvisited_urls: |
| next_item = unvisited_urls[0] |
| page_url = next_item['url'] |
| |
| visited_urls.add(page_url) |
| |
| if not any(d['url'] == page_url for d in all_data): |
| all_data.append(next_item) |
| else: |
| has_next_page = False |
| logs.append("No more unvisited URLs. Ending search phase.") |
| break |
| else: |
| |
| if '?' in base_url: |
| page_url = f"{base_url}&page={current_page}" |
| else: |
| page_url = f"{base_url}?page={current_page}" |
| else: |
| |
| if '?' in base_url: |
| page_url = f"{base_url}&page={current_page}" |
| else: |
| page_url = f"{base_url}?page={current_page}" |
| |
| |
| visited_urls.add(page_url) |
| |
| |
| delay = 1 + random.random() * 2 |
| time.sleep(delay) |
| |
| try: |
| |
| response = session.get(page_url, timeout=30) |
| response.raise_for_status() |
| |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| |
| page_urls = extract_urls(soup, domain_with_path) |
| |
| if not page_urls: |
| logs.append(f"No links found on page {current_page}. Trying alternate extraction method...") |
| if status_text: |
| status_text.text(f"No links found on page {current_page}. Trying alternate method...") |
| |
| page_urls = extract_alternate_urls(soup, domain_with_path) |
| |
| if not page_urls: |
| logs.append(f"Still no links found on page {current_page}.") |
| if current_page > 2 and not all_data: |
| logs.append("Failed to extract links after multiple pages. Check site structure.") |
| break |
| else: |
| logs.append(f"Found {len(page_urls)} links on page {current_page}") |
| |
| |
| for url_item in page_urls: |
| if not any(u['url'] == url_item['url'] for u in all_found_urls): |
| all_found_urls.append(url_item) |
| |
| |
| if not is_docs_site or current_page == 1: |
| |
| for url_item in page_urls: |
| if not any(d['url'] == url_item['url'] for d in all_data): |
| all_data.append(url_item) |
| |
| |
| if not is_docs_site or current_page == 1: |
| |
| next_page_link = None |
| |
| |
| for a_tag in soup.find_all('a'): |
| link_text = a_tag.text.lower() |
| if 'next' in link_text or 'next page' in link_text or '»' in link_text or '>' in link_text: |
| next_page_link = a_tag |
| break |
| |
| |
| for attr, value in a_tag.attrs.items(): |
| if isinstance(value, str) and ('next' in value.lower() or 'pagination-next' in value.lower()): |
| next_page_link = a_tag |
| break |
| |
| |
| if next_page_link and 'href' in next_page_link.attrs: |
| next_url = next_page_link['href'] |
| |
| if not next_url.startswith(('http://', 'https://')): |
| next_url = urljoin(page_url, next_url) |
| |
| logs.append(f"Found next page link: {next_url}") |
| |
| if next_url == page_url: |
| if is_docs_site: |
| |
| pass |
| else: |
| has_next_page = False |
| else: |
| |
| if '/page/' in next_url or 'page=' in next_url: |
| pass |
| else: |
| base_url = next_url |
| current_page = 1 |
| else: |
| |
| if is_docs_site: |
| |
| pass |
| else: |
| has_next_page = False |
| logs.append("No next page link found. Reached the end.") |
| |
| |
| if is_docs_site and max_pages and len(all_data) >= max_pages: |
| has_next_page = False |
| logs.append(f"Reached maximum number of pages ({max_pages}). Ending search phase.") |
| break |
| |
| current_page += 1 |
| |
| except requests.exceptions.RequestException as e: |
| logs.append(f"Error scanning page {current_page}: {str(e)}") |
| if status_text: |
| status_text.text(f"Error on page {current_page}: {str(e)}") |
| |
| |
| if hasattr(e, 'response') and e.response is not None and e.response.status_code == 403: |
| logs.append("Possible blocking detected. Waiting longer...") |
| time.sleep(10) |
| continue |
| else: |
| |
| if is_docs_site and all_found_urls: |
| continue |
| else: |
| break |
| |
| |
| if max_pages and len(all_data) > max_pages: |
| all_data = all_data[:max_pages] |
| |
| logs.append(f"Found {len(all_data)} URLs to download. Starting content download...") |
| if status_text: |
| status_text.text(f"Found {len(all_data)} URLs. Starting content download...") |
| |
| |
| |
| total_urls = len(all_data) |
| if total_urls == 0: |
| if progress_bar: |
| progress_bar.progress(1.0) |
| return all_data, logs |
| |
| |
| def download_content(item_queue, result_list, session, html_converter, logs_list, progress_bar, status_text, max_pages): |
| while not item_queue.empty(): |
| idx, item = item_queue.get() |
| try: |
| |
| if status_text: |
| status_text.text(f"Downloading content: {idx+1}/{total_urls} - {item['title']}") |
| |
| |
| if progress_bar and max_pages: |
| |
| progress_value = 0.5 + 0.5 * ((idx + 1) / total_urls) |
| progress_bar.progress(progress_value) |
| |
| |
| url = item['url'] |
| |
| |
| if is_docs_site: |
| |
| if not url.endswith('/') and not url.endswith('.html') and not url.endswith('.htm'): |
| url = url + '/' |
| |
| |
| try: |
| |
| response = session.get(url, timeout=30) |
| response.raise_for_status() |
| except requests.exceptions.HTTPError as http_err: |
| if response.status_code == 404: |
| |
| alt_urls = [] |
| |
| |
| if not url.endswith('/'): |
| alt_urls.append(url + '/') |
| |
| |
| if url.endswith('/'): |
| alt_urls.append(url + 'index.html') |
| |
| |
| if url.endswith('/'): |
| alt_urls.append(url[:-1] + '.html') |
| |
| |
| if not url.endswith(('.html', '.htm', '.php', '.asp', '.aspx', '.jsp')): |
| for ext in ['.html', '.php', '.aspx']: |
| alt_urls.append(url + ext) |
| |
| |
| found = False |
| for alt_url in alt_urls: |
| try: |
| logs_list.append(f"Original URL {url} not found, trying {alt_url}") |
| response = session.get(alt_url, timeout=30) |
| response.raise_for_status() |
| url = alt_url |
| found = True |
| break |
| except: |
| continue |
| |
| if not found: |
| |
| raise |
| else: |
| |
| raise |
| |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| |
| for tag in soup.find_all(['script', 'style', 'meta', 'iframe', 'noscript']): |
| tag.decompose() |
| |
| |
| for tag in soup.find_all(['script', 'style', 'meta', 'iframe', 'noscript']): |
| tag.decompose() |
| |
| |
| main_content = None |
| |
| |
| content_selectors = [ |
| 'article', 'main', '.content', '#content', '.post-content', |
| '.entry-content', '.article-content', '.post', '.page-content', |
| '#main', '.main-content', '.body', '#post', '.blog-post' |
| ] |
| |
| for selector in content_selectors: |
| try: |
| if selector.startswith('.'): |
| element = soup.find(class_=selector[1:]) |
| elif selector.startswith('#'): |
| element = soup.find(id=selector[1:]) |
| else: |
| element = soup.find(selector) |
| |
| if element and element.text.strip(): |
| main_content = element |
| break |
| except: |
| continue |
| |
| |
| if not main_content: |
| |
| for tag in soup.find_all(['nav', 'footer', 'header', 'aside']): |
| tag.decompose() |
| |
| main_content = soup.body or soup |
| |
| |
| title = soup.title.string if soup.title else item['title'] |
| |
| |
| markdown_content = html_converter.handle(str(main_content)) |
| |
| |
| markdown_content = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown_content) |
| |
| |
| if len(markdown_content.strip()) < 50: |
| |
| markdown_content = html_converter.handle(str(soup.body or soup)) |
| markdown_content = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown_content) |
| |
| |
| item['content'] = markdown_content |
| item['title'] = title.strip() if title else item['title'] |
| result_list.append(item) |
| |
| |
| time.sleep(0.5 + random.random()) |
| |
| except Exception as e: |
| error_message = str(e) |
| logs_list.append(f"Error downloading content for {item['url']}: {error_message}") |
| item['content'] = f"*Error downloading content: {error_message}*" |
| result_list.append(item) |
| |
| finally: |
| item_queue.task_done() |
| |
| |
| item_queue = queue.Queue() |
| result_list = [] |
| |
| |
| for idx, item in enumerate(all_data): |
| item_queue.put((idx, item)) |
| |
| |
| num_threads = min(5, total_urls) |
| threads = [] |
| for _ in range(num_threads): |
| thread = threading.Thread( |
| target=download_content, |
| args=(item_queue, result_list, session, html_converter, logs, progress_bar, status_text, max_pages) |
| ) |
| thread.daemon = True |
| thread.start() |
| threads.append(thread) |
| |
| |
| item_queue.join() |
| |
| if progress_bar: |
| progress_bar.progress(1.0) |
| |
| logs.append(f"Content download complete. Downloaded {len(result_list)} pages.") |
| if status_text: |
| status_text.text(f"Scraping complete! Downloaded {len(result_list)} pages with full content.") |
| |
| return result_list, logs |
|
|
| def extract_urls(soup, domain): |
| """ |
| Extract URLs and titles from page |
| """ |
| urls = [] |
| base_url = domain |
| |
| |
| base_tag = soup.find('base') |
| if base_tag and 'href' in base_tag.attrs: |
| base_href = base_tag['href'] |
| if base_href: |
| |
| if not base_href.startswith(('http://', 'https://')): |
| base_url = urljoin(domain, base_href) |
| else: |
| base_url = base_href |
| |
| |
| for link in soup.find_all('a'): |
| try: |
| url = link.get('href') |
| if not url: |
| continue |
| |
| |
| if url.startswith('#') or url.startswith('javascript:') or url.startswith('mailto:'): |
| continue |
| |
| |
| if not url.startswith(('http://', 'https://')): |
| url = urljoin(base_url, url) |
| |
| |
| url = url.split('#')[0] |
| |
| |
| if not url: |
| continue |
| |
| |
| parsed_domain = urlparse(domain).netloc |
| parsed_url = urlparse(url) |
| |
| |
| if parsed_url.netloc != parsed_domain: |
| continue |
| |
| |
| |
| domain_path = urlparse(domain).path |
| if '/docs/' in domain_path or domain_path.endswith('/docs'): |
| |
| if not parsed_url.path.startswith(domain_path): |
| continue |
| |
| |
| title = link.text.strip() |
| if not title or len(title) < 3: |
| img = link.find('img') |
| if img and img.get('alt'): |
| title = img['alt'].strip() |
| if not title: |
| |
| path_parts = parsed_url.path.rstrip('/').split('/') |
| if path_parts: |
| last_part = path_parts[-1].replace('-', ' ').replace('_', ' ').capitalize() |
| if last_part: |
| title = last_part |
| else: |
| title = "No title" |
| else: |
| title = "No title" |
| |
| |
| if not any(x['url'] == url for x in urls): |
| urls.append({ |
| 'title': title, |
| 'url': url, |
| 'content': '' |
| }) |
| except Exception as e: |
| continue |
| |
| return urls |
|
|
| def extract_alternate_urls(soup, domain): |
| """ |
| Alternative URL extraction for complex pages |
| """ |
| urls = [] |
| |
| |
| for element in soup.find_all(['div', 'span', 'button']): |
| try: |
| |
| if 'onclick' in element.attrs: |
| onclick = element['onclick'] |
| url_match = re.search(r'(location|window).href\s*=\s*[\'"]([^\'"]+)[\'"]', onclick) |
| if url_match: |
| url = url_match.group(2) |
| |
| if not url.startswith(('http://', 'https://')): |
| url = urljoin(domain, url) |
| |
| title = element.text.strip() |
| if not title: |
| title = "No title" |
| |
| |
| if not any(x['url'] == url for x in urls): |
| urls.append({ |
| 'title': title, |
| 'url': url, |
| 'content': '' |
| }) |
| except: |
| continue |
| |
| |
| if not urls: |
| all_links = re.findall(r'href=[\'"]([^\'"]+)[\'"]', str(soup)) |
| for url in all_links: |
| try: |
| |
| if url.startswith('#') or url.startswith('javascript:') or url.startswith('mailto:'): |
| continue |
| |
| |
| if not url.startswith(('http://', 'https://')): |
| url = urljoin(domain, url) |
| |
| |
| if urlparse(url).netloc != urlparse(domain).netloc: |
| continue |
| |
| |
| if not any(x['url'] == url for x in urls): |
| urls.append({ |
| 'title': "Link " + str(len(urls) + 1), |
| 'url': url, |
| 'content': '' |
| }) |
| except: |
| continue |
| |
| return urls |
|
|
| def generate_markdown(data, site_url): |
| """ |
| Generate a markdown representation of the scraped data with full content |
| """ |
| parsed_url = urlparse(site_url) |
| domain_name = parsed_url.netloc |
| |
| |
| md = f"# Content from {domain_name}\n\n" |
| md += f"Source: {site_url}\n\n" |
| md += f"Total pages: {len(data)}\n\n" |
| md += f"Generated on: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n" |
| md += "---\n\n" |
| |
| |
| sorted_data = sorted(data, key=lambda x: x['title']) |
| |
| |
| for i, item in enumerate(sorted_data): |
| md += f"# {i+1}. {item['title']}\n\n" |
| |
| |
| md += f"URL: {item['url']}\n\n" |
| |
| |
| content = item.get('content', '') |
| if content and not content.startswith('*Error downloading content'): |
| md += content + "\n\n" |
| else: |
| if content.startswith('*Error downloading content'): |
| md += content + "\n\n" |
| else: |
| md += "*No content available*\n\n" |
| |
| |
| md += "---\n\n" |
| |
| return md |
|
|
| def download_file(content, filename, mime_type): |
| """ |
| Create a download link for a file |
| """ |
| b64 = base64.b64encode(content.encode()).decode() |
| href = f'<a href="data:{mime_type};base64,{b64}" download="{filename}" style="background-color: #4CAF50; color: white; padding: 10px 15px; text-decoration: none; display: inline-block; border-radius: 5px; margin: 10px 0;">Download {filename}</a>' |
| return href |
|
|
| def main(): |
| |
| with st.form("scraper_form"): |
| url = st.text_input("Enter URL to scrape:", placeholder="https://example.com/products") |
| |
| col1, col2 = st.columns(2) |
| with col1: |
| max_pages = st.number_input("Maximum pages to scrape (0 for unlimited):", min_value=0, value=3) |
| with col2: |
| delay_between_requests = st.slider("Delay between requests (seconds):", min_value=1, max_value=10, value=2) |
| |
| submit_button = st.form_submit_button("Start Scraping") |
| |
| |
| if 'scraped_data' not in st.session_state: |
| st.session_state.scraped_data = None |
| if 'markdown_content' not in st.session_state: |
| st.session_state.markdown_content = None |
| if 'is_scraping_complete' not in st.session_state: |
| st.session_state.is_scraping_complete = False |
| |
| if submit_button: |
| |
| st.session_state.is_scraping_complete = False |
| st.session_state.scraped_data = None |
| st.session_state.markdown_content = None |
| |
| if not url: |
| st.error("Please enter a valid URL") |
| else: |
| if not url.startswith(('http://', 'https://')): |
| url = 'https://' + url |
| |
| |
| if max_pages == 0: |
| max_pages = None |
| |
| |
| progress = st.progress(0) |
| status = st.empty() |
| |
| |
| log_expander = st.expander("View Logs", expanded=False) |
| logs_placeholder = log_expander.empty() |
| |
| try: |
| status.text("Scraping in progress... Please wait.") |
| |
| |
| start_time = time.time() |
| scraped_data, logs = scrape_website(url, max_pages, progress, status) |
| end_time = time.time() |
| |
| |
| logs_placeholder.text('\n'.join(logs)) |
| |
| if scraped_data: |
| status.text(f"Scraping completed! Found {len(scraped_data)} pages in {end_time - start_time:.2f} seconds.") |
| |
| |
| markdown_content = generate_markdown(scraped_data, url) |
| |
| |
| st.session_state.scraped_data = scraped_data |
| st.session_state.markdown_content = markdown_content |
| st.session_state.is_scraping_complete = True |
| |
| |
| st.success("✅ Scraping completed successfully! Results are ready below.") |
| |
| else: |
| status.error("No data was found. Try adjusting the URL or increasing the page limit.") |
| |
| except Exception as e: |
| status.error(f"An error occurred: {str(e)}") |
| st.exception(e) |
| |
| |
| if st.session_state.is_scraping_complete and st.session_state.scraped_data and st.session_state.markdown_content: |
| st.subheader("Scraped Content") |
| |
| |
| error_count = sum(1 for item in st.session_state.scraped_data if item.get('content', '').startswith('*Error')) |
| success_count = len(st.session_state.scraped_data) - error_count |
| |
| |
| if error_count > 0: |
| st.warning(f"⚠️ Downloaded {success_count} pages successfully. {error_count} pages had errors. See the preview or download to view details.") |
| else: |
| st.success(f"✅ Successfully downloaded all {len(st.session_state.scraped_data)} pages!") |
| |
| |
| st.markdown("### Download Options") |
| |
| |
| markdown_filename = f"content_{urlparse(url).netloc.replace('.', '_')}.md" |
| st.markdown(download_file(st.session_state.markdown_content, markdown_filename, "file/markdown"), unsafe_allow_html=True) |
| |
| |
| txt_filename = f"content_{urlparse(url).netloc.replace('.', '_')}.txt" |
| st.markdown(download_file(st.session_state.markdown_content, txt_filename, "file/plain"), unsafe_allow_html=True) |
| |
| |
| if st.session_state.scraped_data: |
| csv_buffer = io.StringIO() |
| fieldnames = ['title', 'url'] |
| writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames) |
| writer.writeheader() |
| |
| |
| for row in st.session_state.scraped_data: |
| writer.writerow({'title': row['title'], 'url': row['url']}) |
| |
| csv_filename = f"urls_{urlparse(url).netloc.replace('.', '_')}.csv" |
| csv_content = csv_buffer.getvalue() |
| st.markdown(download_file(csv_content, csv_filename, "file/csv"), unsafe_allow_html=True) |
| |
| |
| st.subheader("Content Preview") |
| with st.container(): |
| st.markdown('<div class="scroll-container">' + st.session_state.markdown_content + '</div>', unsafe_allow_html=True) |
| |
| |
| if st.session_state.is_scraping_complete: |
| st.markdown("---") |
| st.markdown("<small>**Note:** Be respectful of website terms of service when scraping.</small>", unsafe_allow_html=True) |
|
|
| if __name__ == "__main__": |
| main() |