webscrapper / app.py
subhrajit-mohanty's picture
Update app.py
2579762 verified
import streamlit as st
import requests
from bs4 import BeautifulSoup
import time
import csv
import random
import re
import os
import io
import base64
import html2text
import threading
import queue
from urllib.parse import urlparse, urljoin
st.set_page_config(
page_title="Web Scraper",
page_icon="🕸️",
layout="wide",
initial_sidebar_state="collapsed"
)
# Apply basic CSS
st.markdown("""
<style>
header {visibility: hidden;}
footer {visibility: hidden;}
.stButton button {
background-color: #4CAF50;
color: white;
padding: 0.5rem 1rem;
font-size: 1rem;
border-radius: 5px;
}
.scroll-container {
max-height: 500px;
overflow-y: auto;
border: 1px solid #ddd;
padding: 10px;
margin-bottom: 10px;
}
</style>
""", unsafe_allow_html=True)
def scrape_website(base_url, max_pages=None, progress_bar=None, status_text=None):
"""
Scrape all pages of a website and download all content
"""
all_data = []
current_page = 1
has_next_page = True
# Create a pool of user agents to rotate
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:94.0) Gecko/20100101 Firefox/94.0'
]
# Session to maintain cookies
session = requests.Session()
# Parse the base URL to help with navigation
parsed_url = urlparse(base_url)
domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
# For documentation sites, ensure we get the full base URL
if '/docs/' in parsed_url.path or parsed_url.path.endswith('/docs'):
domain_with_path = base_url
# Make sure it ends with a slash
if not domain_with_path.endswith('/'):
domain_with_path += '/'
else:
domain_with_path = domain
# Store debug logs
logs = []
logs.append(f"Finding URLs from {base_url}")
# Initialize the HTML to text converter
html_converter = html2text.HTML2Text()
html_converter.ignore_links = False
html_converter.ignore_images = False
html_converter.ignore_tables = False
html_converter.body_width = 0 # No line wrapping
# Set up tracking for visited URLs to avoid duplicates
visited_urls = set()
all_found_urls = [] # Separate list to track all found URLs
# For documentation sites, we often need a different approach
is_docs_site = '/docs/' in parsed_url.path or parsed_url.path.endswith('/docs')
# Phase 1: Find all URLs
while has_next_page and (max_pages is None or current_page <= max_pages):
logs.append(f"Scanning page {current_page}...")
if status_text:
status_text.text(f"Finding URLs from page {current_page}...")
# Update progress bar if available
if progress_bar is not None and max_pages:
progress_value = min(current_page / (max_pages * 2), 0.5) # First half of progress is finding pages
progress_bar.progress(progress_value)
# Rotate user agents
current_agent = random.choice(user_agents)
session.headers.update({
'User-Agent': current_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': domain,
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
# Different pagination patterns
if current_page == 1:
page_url = base_url
else:
# Try different pagination patterns
if is_docs_site:
# For docs sites, we need a different approach to pagination
# If we've found URLs but still need more pages
if all_found_urls and len(all_data) < max_pages:
# Get the next URL from our list that we haven't visited yet
unvisited_urls = [u for u in all_found_urls if u['url'] not in visited_urls]
if unvisited_urls:
next_item = unvisited_urls[0]
page_url = next_item['url']
# Mark this URL as being processed
visited_urls.add(page_url)
# Add this item to our data (we'll get its content later)
if not any(d['url'] == page_url for d in all_data):
all_data.append(next_item)
else:
has_next_page = False
logs.append("No more unvisited URLs. Ending search phase.")
break
else:
# Use standard pagination for the first few pages
if '?' in base_url:
page_url = f"{base_url}&page={current_page}"
else:
page_url = f"{base_url}?page={current_page}"
else:
# Standard pagination
if '?' in base_url:
page_url = f"{base_url}&page={current_page}"
else:
page_url = f"{base_url}?page={current_page}"
# Mark this URL as visited
visited_urls.add(page_url)
# Add random delay between requests
delay = 1 + random.random() * 2
time.sleep(delay)
try:
# Make request with timeout
response = session.get(page_url, timeout=30)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Extract URLs from the page
page_urls = extract_urls(soup, domain_with_path)
if not page_urls:
logs.append(f"No links found on page {current_page}. Trying alternate extraction method...")
if status_text:
status_text.text(f"No links found on page {current_page}. Trying alternate method...")
# Try alternate extraction method if no links found
page_urls = extract_alternate_urls(soup, domain_with_path)
if not page_urls:
logs.append(f"Still no links found on page {current_page}.")
if current_page > 2 and not all_data:
logs.append("Failed to extract links after multiple pages. Check site structure.")
break
else:
logs.append(f"Found {len(page_urls)} links on page {current_page}")
# Keep track of all found URLs
for url_item in page_urls:
if not any(u['url'] == url_item['url'] for u in all_found_urls):
all_found_urls.append(url_item)
# If not using docs-site approach or this is the first page
if not is_docs_site or current_page == 1:
# Add new URLs to our data collection
for url_item in page_urls:
if not any(d['url'] == url_item['url'] for d in all_data):
all_data.append(url_item)
# For regular sites, check for next page link
if not is_docs_site or current_page == 1:
# Check for next page without using complex selectors
next_page_link = None
# Simple approach: look for links with "next" in text or attributes
for a_tag in soup.find_all('a'):
link_text = a_tag.text.lower()
if 'next' in link_text or 'next page' in link_text or '»' in link_text or '>' in link_text:
next_page_link = a_tag
break
# Check attributes for hints
for attr, value in a_tag.attrs.items():
if isinstance(value, str) and ('next' in value.lower() or 'pagination-next' in value.lower()):
next_page_link = a_tag
break
# If we found a next link
if next_page_link and 'href' in next_page_link.attrs:
next_url = next_page_link['href']
# Handle relative URLs
if not next_url.startswith(('http://', 'https://')):
next_url = urljoin(page_url, next_url)
logs.append(f"Found next page link: {next_url}")
# If the next URL is the same as current, we may be at the end
if next_url == page_url:
if is_docs_site:
# In docs site mode, continue with our list of found URLs
pass
else:
has_next_page = False
else:
# For direct links, update base_url and reset counter
if '/page/' in next_url or 'page=' in next_url:
pass # Continue with our pagination pattern
else:
base_url = next_url
current_page = 1 # Will be incremented to 2 below
else:
# No next link found
if is_docs_site:
# In docs site mode, continue with our list of found URLs
pass
else:
has_next_page = False
logs.append("No next page link found. Reached the end.")
# Check if we've reached our max pages limit for docs sites
if is_docs_site and max_pages and len(all_data) >= max_pages:
has_next_page = False
logs.append(f"Reached maximum number of pages ({max_pages}). Ending search phase.")
break
current_page += 1
except requests.exceptions.RequestException as e:
logs.append(f"Error scanning page {current_page}: {str(e)}")
if status_text:
status_text.text(f"Error on page {current_page}: {str(e)}")
# If we got blocked (403 error), try with more delay
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 403:
logs.append("Possible blocking detected. Waiting longer...")
time.sleep(10) # Wait 10 seconds in the Streamlit app to avoid freezing
continue
else:
# For other errors, if we're in docs site mode, try the next URL
if is_docs_site and all_found_urls:
continue
else:
break
# Limit the number of URLs to process if max_pages is set
if max_pages and len(all_data) > max_pages:
all_data = all_data[:max_pages]
logs.append(f"Found {len(all_data)} URLs to download. Starting content download...")
if status_text:
status_text.text(f"Found {len(all_data)} URLs. Starting content download...")
# Phase 2: Download the actual content from each URL
# Update progress bar for the second phase
total_urls = len(all_data)
if total_urls == 0:
if progress_bar:
progress_bar.progress(1.0) # Complete the progress bar
return all_data, logs
# Use a thread pool to download content in parallel
def download_content(item_queue, result_list, session, html_converter, logs_list, progress_bar, status_text, max_pages):
while not item_queue.empty():
idx, item = item_queue.get()
try:
# Update status
if status_text:
status_text.text(f"Downloading content: {idx+1}/{total_urls} - {item['title']}")
# Update progress bar for second half of the process
if progress_bar and max_pages:
# Calculate progress: 50% for finding URLs + 50% for downloading content
progress_value = 0.5 + 0.5 * ((idx + 1) / total_urls)
progress_bar.progress(progress_value)
# Get the URL content
url = item['url']
# Special handling for documentation sites
if is_docs_site:
# Make sure the URL ends with / or .html for documentation sites
if not url.endswith('/') and not url.endswith('.html') and not url.endswith('.htm'):
url = url + '/'
# Handle documentation sites that might have special URL structures
try:
# First, try the URL as provided
response = session.get(url, timeout=30)
response.raise_for_status()
except requests.exceptions.HTTPError as http_err:
if response.status_code == 404:
# For 404 errors, try different URL variations
alt_urls = []
# Try with trailing slash
if not url.endswith('/'):
alt_urls.append(url + '/')
# Try with index.html
if url.endswith('/'):
alt_urls.append(url + 'index.html')
# Try removing trailing slash and adding .html
if url.endswith('/'):
alt_urls.append(url[:-1] + '.html')
# Try with different extensions
if not url.endswith(('.html', '.htm', '.php', '.asp', '.aspx', '.jsp')):
for ext in ['.html', '.php', '.aspx']:
alt_urls.append(url + ext)
# Try each alternative URL
found = False
for alt_url in alt_urls:
try:
logs_list.append(f"Original URL {url} not found, trying {alt_url}")
response = session.get(alt_url, timeout=30)
response.raise_for_status()
url = alt_url # Update URL if successful
found = True
break
except:
continue
if not found:
# If all alternatives failed, raise the original error
raise
else:
# For other HTTP errors, just raise
raise
# Parse the content
soup = BeautifulSoup(response.text, 'html.parser')
# Remove unwanted elements
for tag in soup.find_all(['script', 'style', 'meta', 'iframe', 'noscript']):
tag.decompose()
# Remove unwanted elements
for tag in soup.find_all(['script', 'style', 'meta', 'iframe', 'noscript']):
tag.decompose()
# Try to find the main content
main_content = None
# Common content containers
content_selectors = [
'article', 'main', '.content', '#content', '.post-content',
'.entry-content', '.article-content', '.post', '.page-content',
'#main', '.main-content', '.body', '#post', '.blog-post'
]
for selector in content_selectors:
try:
if selector.startswith('.'):
element = soup.find(class_=selector[1:])
elif selector.startswith('#'):
element = soup.find(id=selector[1:])
else:
element = soup.find(selector)
if element and element.text.strip():
main_content = element
break
except:
continue
# If we couldn't find a content container, use the whole body
if not main_content:
# Remove navigation, footer, etc.
for tag in soup.find_all(['nav', 'footer', 'header', 'aside']):
tag.decompose()
main_content = soup.body or soup
# Get the page title
title = soup.title.string if soup.title else item['title']
# Convert HTML to Markdown
markdown_content = html_converter.handle(str(main_content))
# Clean up the markdown
markdown_content = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown_content) # Remove excessive newlines
# Check if we actually got useful content
if len(markdown_content.strip()) < 50: # If content is very short, it might be empty or just navigation
# Try one more time with the whole body
markdown_content = html_converter.handle(str(soup.body or soup))
markdown_content = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown_content)
# Update the item with the markdown content
item['content'] = markdown_content
item['title'] = title.strip() if title else item['title']
result_list.append(item)
# Add a small delay to avoid overwhelming the server
time.sleep(0.5 + random.random())
except Exception as e:
error_message = str(e)
logs_list.append(f"Error downloading content for {item['url']}: {error_message}")
item['content'] = f"*Error downloading content: {error_message}*"
result_list.append(item)
finally:
item_queue.task_done()
# Prepare for parallel downloading
item_queue = queue.Queue()
result_list = []
# Fill the queue with items to download
for idx, item in enumerate(all_data):
item_queue.put((idx, item))
# Create threads for downloading (limit to 5 concurrent threads)
num_threads = min(5, total_urls)
threads = []
for _ in range(num_threads):
thread = threading.Thread(
target=download_content,
args=(item_queue, result_list, session, html_converter, logs, progress_bar, status_text, max_pages)
)
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for all downloads to complete
item_queue.join()
if progress_bar:
progress_bar.progress(1.0) # Make sure progress is complete
logs.append(f"Content download complete. Downloaded {len(result_list)} pages.")
if status_text:
status_text.text(f"Scraping complete! Downloaded {len(result_list)} pages with full content.")
return result_list, logs
def extract_urls(soup, domain):
"""
Extract URLs and titles from page
"""
urls = []
base_url = domain
# Look for a base tag which might change the base URL for relative links
base_tag = soup.find('base')
if base_tag and 'href' in base_tag.attrs:
base_href = base_tag['href']
if base_href:
# If it's relative, join with domain
if not base_href.startswith(('http://', 'https://')):
base_url = urljoin(domain, base_href)
else:
base_url = base_href
# Find all links
for link in soup.find_all('a'):
try:
url = link.get('href')
if not url:
continue
# Skip anchors, javascript, mailto, etc.
if url.startswith('#') or url.startswith('javascript:') or url.startswith('mailto:'):
continue
# Handle relative URLs - use the base_url from base tag if available
if not url.startswith(('http://', 'https://')):
url = urljoin(base_url, url)
# Handle URLs with fragments - remove the fragment
url = url.split('#')[0]
# Skip empty URLs after removing fragment
if not url:
continue
# For documentation sites, make some adjustments
parsed_domain = urlparse(domain).netloc
parsed_url = urlparse(url)
# Stay within the same domain
if parsed_url.netloc != parsed_domain:
continue
# For documentation sites, ensure we respect path structure
# Check if this is a docs site by checking the path
domain_path = urlparse(domain).path
if '/docs/' in domain_path or domain_path.endswith('/docs'):
# Make sure we stay within the docs section
if not parsed_url.path.startswith(domain_path):
continue
# Get title from link text or image alt
title = link.text.strip()
if not title or len(title) < 3: # If text is empty or very short
img = link.find('img')
if img and img.get('alt'):
title = img['alt'].strip()
if not title:
# Try to extract title from URL
path_parts = parsed_url.path.rstrip('/').split('/')
if path_parts:
last_part = path_parts[-1].replace('-', ' ').replace('_', ' ').capitalize()
if last_part:
title = last_part
else:
title = "No title"
else:
title = "No title"
# Only add if URL is unique
if not any(x['url'] == url for x in urls):
urls.append({
'title': title,
'url': url,
'content': '' # Will be filled later
})
except Exception as e:
continue
return urls
def extract_alternate_urls(soup, domain):
"""
Alternative URL extraction for complex pages
"""
urls = []
# Try to find clickable areas or javascript links
for element in soup.find_all(['div', 'span', 'button']):
try:
# Look for onclick attributes
if 'onclick' in element.attrs:
onclick = element['onclick']
url_match = re.search(r'(location|window).href\s*=\s*[\'"]([^\'"]+)[\'"]', onclick)
if url_match:
url = url_match.group(2)
# Handle relative URLs
if not url.startswith(('http://', 'https://')):
url = urljoin(domain, url)
title = element.text.strip()
if not title:
title = "No title"
# Only add if URL is unique
if not any(x['url'] == url for x in urls):
urls.append({
'title': title,
'url': url,
'content': '' # Will be filled later
})
except:
continue
# If still no URLs, try to find all possible internal links
if not urls:
all_links = re.findall(r'href=[\'"]([^\'"]+)[\'"]', str(soup))
for url in all_links:
try:
# Skip anchors, javascript, mailto, etc.
if url.startswith('#') or url.startswith('javascript:') or url.startswith('mailto:'):
continue
# Handle relative URLs
if not url.startswith(('http://', 'https://')):
url = urljoin(domain, url)
# Stay within the same domain
if urlparse(url).netloc != urlparse(domain).netloc:
continue
# Only add if URL is unique
if not any(x['url'] == url for x in urls):
urls.append({
'title': "Link " + str(len(urls) + 1),
'url': url,
'content': '' # Will be filled later
})
except:
continue
return urls
def generate_markdown(data, site_url):
"""
Generate a markdown representation of the scraped data with full content
"""
parsed_url = urlparse(site_url)
domain_name = parsed_url.netloc
# Start with a header
md = f"# Content from {domain_name}\n\n"
md += f"Source: {site_url}\n\n"
md += f"Total pages: {len(data)}\n\n"
md += f"Generated on: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
md += "---\n\n"
# Sort data by title
sorted_data = sorted(data, key=lambda x: x['title'])
# Process each item
for i, item in enumerate(sorted_data):
md += f"# {i+1}. {item['title']}\n\n"
# Add URL reference
md += f"URL: {item['url']}\n\n"
# Add content section if available
content = item.get('content', '')
if content and not content.startswith('*Error downloading content'):
md += content + "\n\n"
else:
if content.startswith('*Error downloading content'):
md += content + "\n\n"
else:
md += "*No content available*\n\n"
# Add separator between items
md += "---\n\n"
return md
def download_file(content, filename, mime_type):
"""
Create a download link for a file
"""
b64 = base64.b64encode(content.encode()).decode()
href = f'<a href="data:{mime_type};base64,{b64}" download="{filename}" style="background-color: #4CAF50; color: white; padding: 10px 15px; text-decoration: none; display: inline-block; border-radius: 5px; margin: 10px 0;">Download {filename}</a>'
return href
def main():
# Input form
with st.form("scraper_form"):
url = st.text_input("Enter URL to scrape:", placeholder="https://example.com/products")
col1, col2 = st.columns(2)
with col1:
max_pages = st.number_input("Maximum pages to scrape (0 for unlimited):", min_value=0, value=3)
with col2:
delay_between_requests = st.slider("Delay between requests (seconds):", min_value=1, max_value=10, value=2)
submit_button = st.form_submit_button("Start Scraping")
# Initialize session state to store results
if 'scraped_data' not in st.session_state:
st.session_state.scraped_data = None
if 'markdown_content' not in st.session_state:
st.session_state.markdown_content = None
if 'is_scraping_complete' not in st.session_state:
st.session_state.is_scraping_complete = False
if submit_button:
# Reset session state
st.session_state.is_scraping_complete = False
st.session_state.scraped_data = None
st.session_state.markdown_content = None
if not url:
st.error("Please enter a valid URL")
else:
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
# Convert 0 to None for unlimited pages
if max_pages == 0:
max_pages = None
# Show progress
progress = st.progress(0)
status = st.empty()
# Add a placeholder for logs
log_expander = st.expander("View Logs", expanded=False)
logs_placeholder = log_expander.empty()
try:
status.text("Scraping in progress... Please wait.")
# Perform scraping
start_time = time.time()
scraped_data, logs = scrape_website(url, max_pages, progress, status)
end_time = time.time()
# Display logs
logs_placeholder.text('\n'.join(logs))
if scraped_data:
status.text(f"Scraping completed! Found {len(scraped_data)} pages in {end_time - start_time:.2f} seconds.")
# Generate markdown
markdown_content = generate_markdown(scraped_data, url)
# Save to session state
st.session_state.scraped_data = scraped_data
st.session_state.markdown_content = markdown_content
st.session_state.is_scraping_complete = True
# Show a success message
st.success("✅ Scraping completed successfully! Results are ready below.")
else:
status.error("No data was found. Try adjusting the URL or increasing the page limit.")
except Exception as e:
status.error(f"An error occurred: {str(e)}")
st.exception(e)
# Display results if available
if st.session_state.is_scraping_complete and st.session_state.scraped_data and st.session_state.markdown_content:
st.subheader("Scraped Content")
# Check if we have errors
error_count = sum(1 for item in st.session_state.scraped_data if item.get('content', '').startswith('*Error'))
success_count = len(st.session_state.scraped_data) - error_count
# Show status with error info
if error_count > 0:
st.warning(f"⚠️ Downloaded {success_count} pages successfully. {error_count} pages had errors. See the preview or download to view details.")
else:
st.success(f"✅ Successfully downloaded all {len(st.session_state.scraped_data)} pages!")
# Provide download links
st.markdown("### Download Options")
# Download as Markdown
markdown_filename = f"content_{urlparse(url).netloc.replace('.', '_')}.md"
st.markdown(download_file(st.session_state.markdown_content, markdown_filename, "file/markdown"), unsafe_allow_html=True)
# Download as plain text
txt_filename = f"content_{urlparse(url).netloc.replace('.', '_')}.txt"
st.markdown(download_file(st.session_state.markdown_content, txt_filename, "file/plain"), unsafe_allow_html=True)
# Download as CSV (URLs only)
if st.session_state.scraped_data:
csv_buffer = io.StringIO()
fieldnames = ['title', 'url']
writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames)
writer.writeheader()
# Write rows without the content field
for row in st.session_state.scraped_data:
writer.writerow({'title': row['title'], 'url': row['url']})
csv_filename = f"urls_{urlparse(url).netloc.replace('.', '_')}.csv"
csv_content = csv_buffer.getvalue()
st.markdown(download_file(csv_content, csv_filename, "file/csv"), unsafe_allow_html=True)
# Preview section
st.subheader("Content Preview")
with st.container():
st.markdown('<div class="scroll-container">' + st.session_state.markdown_content + '</div>', unsafe_allow_html=True)
# Minimal footer
if st.session_state.is_scraping_complete:
st.markdown("---")
st.markdown("<small>**Note:** Be respectful of website terms of service when scraping.</small>", unsafe_allow_html=True)
if __name__ == "__main__":
main()