rb1337's picture
Upload 50 files
2cc7f91 verified
"""
Download HTML Content from Verified Online Phishing URLs
This script downloads HTML content from phishing URLs that are verified and online.
Saves HTML files for later feature extraction.
"""
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
import hashlib
import logging
from datetime import datetime
from bs4 import BeautifulSoup
import re
import urllib3
import random
from collections import defaultdict
from threading import Lock
import json
# Disable SSL warnings (expected when downloading phishing sites with invalid certificates)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("html_downloader")
class HTMLDownloader:
"""Optimized HTML downloader with retry, checkpointing, and rate limiting."""
def __init__(self, output_dir='data/html', max_workers=20, timeout=8, checkpoint_interval=100):
"""
Initialize optimized HTML downloader.
Args:
output_dir: Base directory to save HTML files
max_workers: Number of parallel download threads (increased to 20)
timeout: Request timeout in seconds (reduced to 8s for faster failure)
checkpoint_interval: Save progress every N URLs
"""
self.output_dir = Path(output_dir)
self.legit_dir = self.output_dir / 'legitimate'
self.phishing_dir = self.output_dir / 'phishing'
self.legit_dir.mkdir(parents=True, exist_ok=True)
self.phishing_dir.mkdir(parents=True, exist_ok=True)
self.max_workers = max_workers
self.timeout = timeout
self.checkpoint_interval = checkpoint_interval
# Stats
self.stats = {
'total': 0,
'success': 0,
'failed': 0,
'timeout': 0,
'error': 0,
'retried': 0,
'http_fallback': 0
}
# User agents rotation (avoid blocks)
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/120.0.0.0',
]
# Domain rate limiting (delay per domain)
self.domain_last_access = defaultdict(float)
self.domain_lock = Lock()
self.min_domain_delay = 0.5 # 500ms between requests to same domain
# Session pool for connection reuse
self.sessions = []
for _ in range(max_workers):
session = self._create_session()
self.sessions.append(session)
# Checkpoint file
self.checkpoint_file = self.output_dir / 'download_checkpoint.json'
self.completed_urls = self._load_checkpoint()
def _create_session(self):
"""Create optimized requests session with retry and compression."""
session = requests.Session()
# Retry strategy: 3 retries with exponential backoff
retry_strategy = Retry(
total=3,
backoff_factor=0.5, # 0.5s, 1s, 2s
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "HEAD"]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=100,
pool_maxsize=100,
pool_block=False
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Enable compression
session.headers.update({
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Connection': 'keep-alive',
})
return session
def _get_random_user_agent(self):
"""Get random user agent to avoid detection."""
return random.choice(self.user_agents)
def _load_checkpoint(self):
"""Load checkpoint of already downloaded URLs."""
if self.checkpoint_file.exists():
try:
with open(self.checkpoint_file, 'r') as f:
data = json.load(f)
completed = set(data.get('completed_urls', []))
logger.info(f"Loaded checkpoint: {len(completed):,} URLs already downloaded")
return completed
except Exception as e:
logger.warning(f"Failed to load checkpoint: {e}")
return set()
def _save_checkpoint(self, results):
"""Save checkpoint of completed URLs."""
try:
completed = [r['url'] for r in results if r['status'] == 'success']
self.completed_urls.update(completed)
with open(self.checkpoint_file, 'w') as f:
json.dump({
'completed_urls': list(self.completed_urls),
'timestamp': datetime.now().isoformat(),
'total_completed': len(self.completed_urls)
}, f)
except Exception as e:
logger.warning(f"Failed to save checkpoint: {e}")
def _rate_limit_domain(self, url):
"""Apply per-domain rate limiting."""
try:
from urllib.parse import urlparse
domain = urlparse(url).netloc
with self.domain_lock:
last_access = self.domain_last_access[domain]
now = time.time()
time_since_last = now - last_access
if time_since_last < self.min_domain_delay:
sleep_time = self.min_domain_delay - time_since_last
time.sleep(sleep_time)
self.domain_last_access[domain] = time.time()
except:
pass # If rate limiting fails, continue anyway
def _url_to_filename(self, url):
"""Convert URL to safe filename using hash."""
url_hash = hashlib.md5(url.encode()).hexdigest()
return f"{url_hash}.html"
def _optimize_html(self, html_content):
"""
Aggressively optimize HTML for feature extraction.
Removes unnecessary content while preserving structure:
- Comments, excessive whitespace
- Inline styles (keeps style tags for counting)
- Large script/style content (keeps tags for counting)
- Base64 embedded images (huge size, not needed for features)
Args:
html_content: Raw HTML content
Returns:
Optimized HTML string (typically 60-80% smaller)
"""
try:
# Quick regex cleanup before parsing (faster than BeautifulSoup for some tasks)
# Remove HTML comments
html_content = re.sub(r'<!--.*?-->', '', html_content, flags=re.DOTALL)
# Remove base64 embedded images (can be huge, not needed for features)
html_content = re.sub(r'data:image/[^;]+;base64,[A-Za-z0-9+/=]+', 'data:image', html_content)
# Parse HTML (use lxml parser if available, it's faster)
try:
soup = BeautifulSoup(html_content, 'lxml')
except:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove inline styles (but keep style tags for counting)
for tag in soup.find_all(style=True):
del tag['style']
# Truncate large script/style content (keep tags for counting, trim content)
for script in soup.find_all('script'):
if script.string and len(script.string) > 500:
script.string = script.string[:500] + '...'
for style in soup.find_all('style'):
if style.string and len(style.string) > 500:
style.string = style.string[:500] + '...'
# Normalize whitespace in text nodes
for text in soup.find_all(string=True):
if text.parent.name not in ['script', 'style']: # type: ignore
normalized = re.sub(r'\s+', ' ', str(text).strip())
if normalized:
text.replace_with(normalized)
# Convert back to string
optimized = str(soup)
# Final cleanup: remove excessive blank lines
optimized = re.sub(r'\n\s*\n+', '\n', optimized)
return optimized
except Exception as e:
logger.warning(f"HTML optimization failed: {e}, returning original")
# Fallback: at least remove comments and excessive whitespace
html_content = re.sub(r'<!--.*?-->', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'\n\s*\n+', '\n', html_content)
return html_content
def download_single_url(self, url, label, url_id=None, session=None):
"""
Download HTML with retry logic and HTTP fallback.
Args:
url: URL to download
label: Label (0=legitimate, 1=phishing)
url_id: Optional ID from dataset
session: Requests session (for connection pooling)
Returns:
Dictionary with download result
"""
result = {
'url': url,
'label': label,
'url_id': url_id,
'status': 'failed',
'error': None,
'filename': None,
'size': 0,
'original_size': 0
}
# Skip if already downloaded
if url in self.completed_urls:
result['status'] = 'skipped'
result['error'] = 'Already downloaded'
return result
# Apply rate limiting
self._rate_limit_domain(url)
# Use provided session or create temporary one
if session is None:
session = self._create_session()
# Add scheme if missing (default HTTPS)
original_url = url
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
attempts = [url]
# If HTTPS, also try HTTP as fallback
if url.startswith('https://'):
http_url = url.replace('https://', 'http://', 1)
attempts.append(http_url)
# Try each URL variant
for attempt_num, attempt_url in enumerate(attempts):
try:
# Random user agent for each attempt
headers = {'User-Agent': self._get_random_user_agent()}
# Download with timeout and retries (handled by session)
response = session.get(
attempt_url,
headers=headers,
timeout=(3, self.timeout), # (connect timeout, read timeout)
allow_redirects=True,
verify=False, # Phishing sites often have invalid SSL
stream=False # We need full content
)
# Check if successful
if response.status_code == 200:
# Check content type (skip if not HTML)
content_type = response.headers.get('Content-Type', '')
if 'text/html' not in content_type.lower() and 'application/xhtml' not in content_type.lower():
result['status'] = 'failed'
result['error'] = f'Non-HTML content: {content_type}'
continue
# Get HTML content
html_content = response.text
result['original_size'] = len(html_content)
# Skip if too small (likely error page)
if len(html_content) < 200:
result['status'] = 'failed'
result['error'] = 'HTML too small (< 200 bytes)'
continue
# Optimize HTML for feature extraction
optimized_html = self._optimize_html(html_content)
# Save to appropriate directory
filename = self._url_to_filename(original_url)
target_dir = self.legit_dir if label == 0 else self.phishing_dir
filepath = target_dir / filename
with open(filepath, 'w', encoding='utf-8', errors='ignore') as f:
f.write(optimized_html)
result['status'] = 'success'
result['filename'] = filename
result['size'] = len(optimized_html)
result['target_dir'] = str(target_dir.name)
result['compression_ratio'] = f"{(1 - len(optimized_html) / max(result['original_size'], 1)) * 100:.1f}%"
if attempt_num > 0:
result['http_fallback'] = True
self.stats['http_fallback'] += 1
self.stats['success'] += 1
return result # Success!
else:
result['error'] = f"HTTP {response.status_code}"
if attempt_num == len(attempts) - 1: # Last attempt
result['status'] = 'failed'
self.stats['failed'] += 1
except requests.Timeout:
result['error'] = 'Timeout'
if attempt_num == len(attempts) - 1:
result['status'] = 'timeout'
self.stats['timeout'] += 1
except requests.RequestException as e:
result['error'] = f"{type(e).__name__}: {str(e)[:80]}"
if attempt_num == len(attempts) - 1:
result['status'] = 'error'
self.stats['error'] += 1
except Exception as e:
result['error'] = f"Unknown: {str(e)[:80]}"
if attempt_num == len(attempts) - 1:
result['status'] = 'error'
self.stats['error'] += 1
return result
def download_batch(self, urls_df, label_column='label', id_column=None, resume=True):
"""
Download HTML with checkpointing and session pooling.
Args:
urls_df: DataFrame with URLs
label_column: Column name for labels
id_column: Optional column name for IDs
resume: Resume from checkpoint if available
Returns:
DataFrame with download results
"""
self.stats['total'] = len(urls_df)
# Filter already downloaded URLs if resuming
if resume and self.completed_urls:
url_column = 'url' if 'url' in urls_df.columns else 'URL'
urls_df = urls_df[~urls_df[url_column].isin(self.completed_urls)].copy()
skipped = self.stats['total'] - len(urls_df)
if skipped > 0:
logger.info(f"Resuming: {skipped:,} URLs already downloaded, {len(urls_df):,} remaining")
logger.info(f"Starting optimized download of {len(urls_df):,} URLs...")
logger.info(f"Workers: {self.max_workers} | Timeout: {self.timeout}s | Checkpoint: every {self.checkpoint_interval} URLs")
logger.info(f"Output: {self.output_dir.absolute()}")
logger.info(f"Features: Session pooling, retry logic, HTTP fallback, rate limiting, compression")
results = []
session_idx = 0
# Prepare tasks
tasks = []
for idx, row in urls_df.iterrows():
url = row['url'] if 'url' in row else row['URL']
label = row[label_column] if label_column in row else 1
url_id = row[id_column] if id_column and id_column in row else idx
tasks.append((url, label, url_id))
# Download in parallel with progress bar and checkpointing
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit tasks with session pooling
future_to_task = {}
for url, label, url_id in tasks:
# Round-robin session assignment
session = self.sessions[session_idx % len(self.sessions)]
session_idx += 1
future = executor.submit(self.download_single_url, url, label, url_id, session)
future_to_task[future] = (url, label, url_id)
# Process completed tasks with progress bar
with tqdm(total=len(tasks), desc="Downloading", unit="url") as pbar:
checkpoint_counter = 0
for future in as_completed(future_to_task):
result = future.result()
results.append(result)
pbar.update(1)
checkpoint_counter += 1
# Save checkpoint periodically
if checkpoint_counter >= self.checkpoint_interval:
self._save_checkpoint(results)
checkpoint_counter = 0
# Update progress bar with detailed stats
pbar.set_postfix({
'OK': self.stats['success'],
'Fail': self.stats['failed'],
'Timeout': self.stats['timeout'],
'HTTP↓': self.stats['http_fallback']
})
# Final checkpoint save
self._save_checkpoint(results)
# Create results DataFrame
results_df = pd.DataFrame(results)
# Print summary
self._print_summary(results_df)
return results_df
def _print_summary(self, results_df):
"""Print detailed download summary with optimization metrics."""
logger.info("\n" + "="*80)
logger.info("DOWNLOAD SUMMARY")
logger.info("="*80)
total = self.stats['total']
success = self.stats['success']
logger.info(f"\nTotal URLs processed: {total:,}")
logger.info(f" ✓ Successful: {success:,} ({success/max(total,1)*100:.1f}%)")
logger.info(f" ✗ Failed: {self.stats['failed']:,}")
logger.info(f" ⏱ Timeout: {self.stats['timeout']:,}")
logger.info(f" ⚠ Error: {self.stats['error']:,}")
logger.info(f" ↓ HTTP Fallback: {self.stats['http_fallback']:,}")
# Detailed stats if we have results
if not results_df.empty and 'status' in results_df.columns:
# Success by label
if 'label' in results_df.columns:
success_by_label = results_df[results_df['status'] == 'success'].groupby('label').size()
if not success_by_label.empty:
logger.info(f"\nSuccessful downloads by type:")
for label, count in success_by_label.items():
label_name = 'Phishing' if label == 1 else 'Legitimate'
logger.info(f" {label_name}: {count:,}")
# Size statistics
successful = results_df[results_df['status'] == 'success']
if not successful.empty and 'size' in successful.columns:
total_optimized = successful['size'].sum()
total_original = successful.get('original_size', successful['size']).sum()
logger.info(f"\nStorage statistics:")
logger.info(f" Original size: {total_original/1024/1024:.2f} MB")
logger.info(f" Optimized size: {total_optimized/1024/1024:.2f} MB")
if total_original > 0:
saved = (1 - total_optimized / total_original) * 100
logger.info(f" Space saved: {saved:.1f}%")
# Error breakdown
failed = results_df[results_df['status'] != 'success']
if not failed.empty and 'error' in failed.columns:
error_counts = failed['error'].value_counts().head(5)
if not error_counts.empty:
logger.info(f"\nTop failure reasons:")
for error, count in error_counts.items():
logger.info(f" {error}: {count:,}")
logger.info("="*80)
def main():
"""Main function to download HTML from verified online phishing URLs."""
import argparse
parser = argparse.ArgumentParser(description='Download HTML content from URLs and organize by label')
parser.add_argument('--input', type=str, default='data/processed/clean_dataset.csv',
help='Input CSV file with URLs (must have url,label,type columns)')
parser.add_argument('--output', type=str, default='data/html',
help='Base output directory (will create legitimate/ and phishing/ subdirectories)')
parser.add_argument('--workers', type=int, default=20,
help='Number of parallel download workers (default: 20)')
parser.add_argument('--timeout', type=int, default=8,
help='Request timeout in seconds (default: 8s)')
parser.add_argument('--checkpoint', type=int, default=100,
help='Save progress every N URLs (default: 100)')
parser.add_argument('--resume', action='store_true', default=True,
help='Resume from checkpoint (default: True)')
parser.add_argument('--no-resume', dest='resume', action='store_false',
help='Start fresh, ignore checkpoint')
parser.add_argument('--limit', type=int, default=None,
help='Limit number of URLs to download (for testing)')
parser.add_argument('--balance', action='store_true',
help='Download equal number of legitimate and phishing URLs')
args = parser.parse_args()
logger.info("="*80)
logger.info("HTML CONTENT DOWNLOADER - Phishing Detection")
logger.info("="*80)
# Load URLs
script_dir = Path(__file__).parent.parent.parent
input_path = (script_dir / args.input).resolve()
logger.info(f"\nLoading URLs from: {input_path}")
df = pd.read_csv(input_path)
logger.info(f"Loaded: {len(df):,} URLs")
# Show columns
logger.info(f"Columns: {list(df.columns)}")
# Verify required columns
if 'url' not in df.columns and 'URL' not in df.columns:
logger.error("No 'url' or 'URL' column found in dataset!")
return
if 'label' not in df.columns:
logger.error("No 'label' column found in dataset!")
return
# Show label distribution
logger.info(f"\nLabel distribution in dataset:")
label_counts = df['label'].value_counts()
for label, count in label_counts.items():
label_name = 'Legitimate' if label == 0 else 'Phishing'
logger.info(f" {label_name} (label={label}): {count:,}")
# Balance dataset if requested
if args.balance:
min_count = label_counts.min()
df_balanced = pd.concat([
df[df['label'] == 0].sample(n=min(min_count, len(df[df['label'] == 0])), random_state=42),
df[df['label'] == 1].sample(n=min(min_count, len(df[df['label'] == 1])), random_state=42)
]).sample(frac=1, random_state=42).reset_index(drop=True)
df = df_balanced
logger.info(f"\nBalanced dataset to {min_count:,} samples per class")
logger.info(f"Total URLs after balancing: {len(df):,}")
# Limit for testing
if args.limit:
df = df.head(args.limit)
logger.info(f"Limited to first {args.limit:,} URLs for testing")
# Initialize optimized downloader
output_dir = (script_dir / args.output).resolve()
downloader = HTMLDownloader(
output_dir=output_dir,
max_workers=args.workers,
timeout=args.timeout,
checkpoint_interval=args.checkpoint
)
# Download HTML content with checkpointing
results_df = downloader.download_batch(
df,
label_column='label' if 'label' in df.columns else None, # type: ignore
id_column='phish_id' if 'phish_id' in df.columns else None, # type: ignore
resume=args.resume
)
# Save results
results_file = output_dir / f'download_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
results_df.to_csv(results_file, index=False)
logger.info(f"\n✓ Results saved to: {results_file}")
# Save metadata mapping (URL to filename)
metadata = results_df[results_df['status'] == 'success'][['url', 'label', 'filename', 'url_id']]
metadata_file = output_dir / 'html_metadata.csv'
metadata.to_csv(metadata_file, index=False)
logger.info(f"✓ Metadata saved to: {metadata_file}")
logger.info("\n" + "="*80)
logger.info("✓ HTML DOWNLOAD COMPLETE!")
logger.info("="*80)
logger.info(f"\nFiles saved to:")
logger.info(f" Legitimate: {output_dir / 'legitimate'}")
logger.info(f" Phishing: {output_dir / 'phishing'}")
logger.info(f"\nHTML files have been optimized for feature extraction:")
logger.info(f" - Comments removed")
logger.info(f" - Whitespace normalized")
logger.info(f" - Inline styles removed")
logger.info(f" - Structure preserved for feature extraction")
logger.info("="*80)
if __name__ == "__main__":
main()