Spaces:
Runtime error
Runtime error
| """ | |
| Extract HTML Features - Direct from Files (No Metadata Needed) | |
| Simplified version that scans directories directly | |
| WITH QUALITY FILTERING to remove low-quality HTML files | |
| """ | |
| import pandas as pd | |
| from pathlib import Path | |
| import logging | |
| from tqdm import tqdm | |
| import sys | |
| import re | |
| from bs4 import BeautifulSoup | |
| # Add scripts directory to path | |
| sys.path.append(str(Path(__file__).parent)) | |
| from html_features import HTMLFeatureExtractor | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Quality filter constants | |
| MIN_FILE_SIZE = 1000 # Minimum 1KB | |
| MIN_WORDS = 50 # Minimum 50 words of text content | |
| MIN_TAGS = 10 # Minimum 10 HTML tags | |
| ERROR_PATTERNS = [ | |
| 'page not found', '404', '403', 'forbidden', 'access denied', | |
| 'error occurred', 'server error', 'not available', 'suspended', | |
| 'domain for sale', 'this site can', 'website expired', | |
| 'coming soon', 'under construction', 'parked domain', | |
| 'buy this domain', 'this domain', 'domain has expired' | |
| ] | |
| def is_quality_html(html_content, filename=""): | |
| """ | |
| Check if HTML file meets quality criteria. | |
| Returns: | |
| tuple: (is_valid, reason) | |
| """ | |
| # Check 1: Minimum file size | |
| if len(html_content) < MIN_FILE_SIZE: | |
| return False, f"Too small ({len(html_content)} bytes)" | |
| try: | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Check 2: Has body tag (basic HTML structure) | |
| if not soup.find('body'): | |
| return False, "No body tag" | |
| # Check 3: Minimum number of tags | |
| num_tags = len(soup.find_all()) | |
| if num_tags < MIN_TAGS: | |
| return False, f"Too few tags ({num_tags})" | |
| # Check 4: Get text content and check word count | |
| text = soup.get_text(separator=' ', strip=True).lower() | |
| words = text.split() | |
| if len(words) < MIN_WORDS: | |
| return False, f"Too few words ({len(words)})" | |
| # Check 5: Not an error page | |
| text_lower = text[:2000] # Check first 2000 chars | |
| for pattern in ERROR_PATTERNS: | |
| if pattern in text_lower: | |
| return False, f"Error page pattern: '{pattern}'" | |
| # Check 6: Has some interactive elements OR substantial content | |
| has_links = len(soup.find_all('a')) > 0 | |
| has_forms = len(soup.find_all('form')) > 0 | |
| has_inputs = len(soup.find_all('input')) > 0 | |
| has_images = len(soup.find_all('img')) > 0 | |
| has_divs = len(soup.find_all('div')) > 3 | |
| if not (has_links or has_forms or has_inputs or has_images or has_divs): | |
| return False, "No interactive elements" | |
| # Check 7: Not mostly JavaScript (JS-only pages are hard to analyze) | |
| script_content = ''.join([s.string or '' for s in soup.find_all('script')]) | |
| if len(script_content) > len(text) * 3 and len(text) < 200: | |
| return False, "Mostly JavaScript, little content" | |
| return True, "OK" | |
| except Exception as e: | |
| return False, f"Parse error: {str(e)[:50]}" | |
| def extract_features_from_directory(html_dir, label, limit=None, apply_filter=True): | |
| """ | |
| Extract features from all HTML files in a directory. | |
| Args: | |
| html_dir: Directory containing HTML files | |
| label: Label for these files (0=legitimate, 1=phishing) | |
| limit: Maximum number of files to process (None = all) | |
| apply_filter: Apply quality filter to remove bad HTML files | |
| Returns: | |
| List of feature dictionaries | |
| """ | |
| html_dir = Path(html_dir) | |
| logger.info(f"\nProcessing: {html_dir}") | |
| logger.info(f" Label: {'Phishing' if label == 1 else 'Legitimate'}") | |
| logger.info(f" Quality filter: {'ENABLED' if apply_filter else 'DISABLED'}") | |
| # Get all HTML files | |
| html_files = sorted(html_dir.glob('*.html')) | |
| total_files = len(html_files) | |
| logger.info(f" Found {total_files:,} HTML files") | |
| # Initialize extractor | |
| extractor = HTMLFeatureExtractor() | |
| results = [] | |
| errors = 0 | |
| filtered_out = 0 | |
| filter_reasons = {} | |
| # Process each HTML file | |
| for html_path in tqdm(html_files, | |
| desc=f"Extracting {'Phishing' if label == 1 else 'Legitimate'} features"): | |
| try: | |
| # Read HTML content | |
| with open(html_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| html_content = f.read() | |
| # Apply quality filter if enabled | |
| if apply_filter: | |
| is_valid, reason = is_quality_html(html_content, html_path.name) | |
| if not is_valid: | |
| filtered_out += 1 | |
| filter_reasons[reason] = filter_reasons.get(reason, 0) + 1 | |
| continue | |
| # Extract features | |
| features = extractor.extract_features(html_content, url=None) | |
| # Add metadata | |
| features['filename'] = html_path.name # type: ignore | |
| features['label'] = label | |
| results.append(features) | |
| # Check if we reached the limit | |
| if limit and len(results) >= limit: | |
| logger.info(f" Reached limit of {limit:,} quality files") | |
| break | |
| except Exception as e: | |
| errors += 1 | |
| if errors < 10: # Show first 10 errors | |
| logger.warning(f" Error processing {html_path.name}: {e}") | |
| logger.info(f" Quality files extracted: {len(results):,}") | |
| logger.info(f" Filtered out (low quality): {filtered_out:,} ({filtered_out/total_files*100:.1f}%)") | |
| if filter_reasons and apply_filter: | |
| logger.info(f" Filter reasons (top 5):") | |
| for reason, count in sorted(filter_reasons.items(), key=lambda x: -x[1])[:5]: | |
| logger.info(f" - {reason}: {count:,}") | |
| if errors > 0: | |
| logger.warning(f" Errors: {errors:,}") | |
| return results | |
| def main(): | |
| logger.info("="*80) | |
| logger.info("BALANCED HTML FEATURES EXTRACTION (WITH QUALITY FILTER)") | |
| logger.info("="*80) | |
| # Quality filter info | |
| logger.info("\nQuality Filter Criteria:") | |
| logger.info(f" - Minimum file size: {MIN_FILE_SIZE} bytes") | |
| logger.info(f" - Minimum word count: {MIN_WORDS} words") | |
| logger.info(f" - Minimum HTML tags: {MIN_TAGS}") | |
| logger.info(f" - Must have body tag") | |
| logger.info(f" - Not an error/parked page") | |
| logger.info(f" - Has interactive elements (links/forms/images)") | |
| # Paths | |
| phishing_html_dir = Path('data/html/phishing_v1') | |
| legit_html_dir = Path('data/html/legitimate_v1') | |
| output_path = Path('data/features/html_features_old.csv') | |
| # Check directories exist | |
| if not phishing_html_dir.exists(): | |
| logger.error(f"Phishing directory not found: {phishing_html_dir}") | |
| return | |
| if not legit_html_dir.exists(): | |
| logger.error(f"Legitimate directory not found: {legit_html_dir}") | |
| return | |
| # Count files | |
| logger.info("\n1. Checking available HTML files...") | |
| phishing_files = list(phishing_html_dir.glob('*.html')) | |
| legit_files = list(legit_html_dir.glob('*.html')) | |
| phishing_count = len(phishing_files) | |
| legit_count = len(legit_files) | |
| logger.info(f" Phishing HTML files: {phishing_count:,}") | |
| logger.info(f" Legitimate HTML files: {legit_count:,}") | |
| # Extract phishing features (with quality filter) | |
| logger.info("\n2. Extracting PHISHING HTML features (with quality filter)...") | |
| phishing_features = extract_features_from_directory( | |
| phishing_html_dir, | |
| label=1, # Phishing | |
| limit=None, # Get all quality files first | |
| apply_filter=True | |
| ) | |
| # Extract legitimate features (with quality filter) | |
| logger.info("\n3. Extracting LEGITIMATE HTML features (with quality filter)...") | |
| legit_features = extract_features_from_directory( | |
| legit_html_dir, | |
| label=0, # Legitimate | |
| limit=None, # Get all quality files first | |
| apply_filter=True | |
| ) | |
| # Balance the dataset | |
| logger.info("\n4. Balancing dataset...") | |
| min_count = min(len(phishing_features), len(legit_features)) | |
| logger.info(f" Quality phishing samples: {len(phishing_features):,}") | |
| logger.info(f" Quality legitimate samples: {len(legit_features):,}") | |
| logger.info(f" Balancing to: {min_count:,} per class") | |
| # Truncate to balanced size | |
| phishing_features = phishing_features[:min_count] | |
| legit_features = legit_features[:min_count] | |
| # Combine results | |
| logger.info("\n5. Combining datasets...") | |
| all_features = phishing_features + legit_features | |
| if len(all_features) == 0: | |
| logger.error("No features extracted! Check error messages above.") | |
| return | |
| # Create DataFrame | |
| logger.info("\n6. Creating features DataFrame...") | |
| features_df = pd.DataFrame(all_features) | |
| # Reorder columns (filename and label first, then features) | |
| feature_cols = [col for col in features_df.columns if col not in ['filename', 'label']] | |
| features_df = features_df[['filename', 'label'] + feature_cols] | |
| # Shuffle dataset | |
| features_df = features_df.sample(frac=1, random_state=42).reset_index(drop=True) | |
| logger.info(f" Shape: {features_df.shape}") | |
| logger.info(f" Features: {len(feature_cols)}") | |
| # Show label distribution | |
| logger.info(f"\n Label distribution:") | |
| label_counts = features_df['label'].value_counts() | |
| for label, count in label_counts.items(): | |
| label_name = 'Phishing' if label == 1 else 'Legitimate' | |
| logger.info(f" {label_name}: {count:,} ({count/len(features_df)*100:.1f}%)") | |
| # Save to CSV | |
| logger.info(f"\n7. Saving features to: {output_path}") | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| features_df.to_csv(output_path, index=False) | |
| logger.info(f" ✓ Saved!") | |
| # Show statistics | |
| logger.info("\n" + "="*80) | |
| logger.info("EXTRACTION SUMMARY") | |
| logger.info("="*80) | |
| logger.info(f"\nTotal samples: {len(features_df):,}") | |
| logger.info(f" Phishing: {len(phishing_features):,}") | |
| logger.info(f" Legitimate: {len(legit_features):,}") | |
| logger.info(f"\nFeatures extracted: {len(feature_cols)}") | |
| logger.info(f"Dataset balance: {(label_counts[0]/label_counts[1])*100:.1f}%") | |
| # Show sample statistics | |
| logger.info(f"\nFeature statistics (first 10 features):") | |
| numeric_cols = features_df.select_dtypes(include=['int64', 'float64']).columns[:10] | |
| stats = features_df[numeric_cols].describe() | |
| logger.info(f"\n{stats.to_string()}") | |
| logger.info("\n" + "="*80) | |
| logger.info("✓ QUALITY-FILTERED HTML FEATURES EXTRACTION COMPLETE!") | |
| logger.info("="*80) | |
| logger.info(f"\nOutput file: {output_path}") | |
| logger.info(f"Shape: {features_df.shape}") | |
| logger.info(f"Quality filter removed low-quality HTML files") | |
| logger.info("="*80) | |
| if __name__ == '__main__': | |
| main() | |