import json import logging from datetime import datetime from typing import Dict, List import pandas as pd from bs4 import BeautifulSoup import requests from urllib.parse import urlparse from config import RAW_DIR, PROCESSED_DIR, LOG_DIR class DataProcessor: def __init__(self): # Set up logging log_file = LOG_DIR / f"processor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename=log_file ) self.processed_data = {} def _extract_domain(self, url: str) -> str: """Extract domain from URL""" try: return urlparse(url).netloc except Exception: return "" def _scrape_webpage(self, url: str) -> str: """Scrape additional content from webpage""" try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} response = requests.get(url, headers=headers, timeout=10) soup = BeautifulSoup(response.text, 'lxml') # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'footer']): element.decompose() return ' '.join(soup.stripped_strings) except Exception as e: logging.error(f"Error scraping {url}: {e}") return "" def process_category(self, category: str) -> List[Dict]: """Process data for a single category""" input_file = RAW_DIR / f"{category}_results.json" try: with open(input_file, 'r') as f: raw_results = json.load(f) except Exception as e: logging.error(f"Error loading {input_file}: {e}") return [] processed_results = [] for result in raw_results: processed_result = { 'title': result.get('title', ''), 'snippet': result.get('snippet', ''), 'url': result.get('link', ''), 'domain': self._extract_domain(result.get('link', '')), 'category': category } # Add additional content for certain domains if any(domain in processed_result['domain'] for domain in ['visitbloomington.com', 'indiana.edu', 'bloomington.in.gov']): additional_content = self._scrape_webpage(processed_result['url']) processed_result['additional_content'] = additional_content[:5000] # Limit content length processed_results.append(processed_result) # Save processed results output_file = PROCESSED_DIR / f"{category}_processed.json" with open(output_file, 'w') as f: json.dump(processed_results, f, indent=2) # Also save as CSV for easy viewing df = pd.DataFrame(processed_results) df.to_csv(PROCESSED_DIR / f"{category}_processed.csv", index=False) self.processed_data[category] = processed_results return processed_results def process_all_categories(self) -> Dict[str, List[Dict]]: """Process all categories""" categories = [f.stem.replace('_results', '') for f in RAW_DIR.glob('*_results.json')] for category in categories: logging.info(f"Processing category: {category}") self.process_category(category) # Save combined results all_results = [] for category_results in self.processed_data.values(): all_results.extend(category_results) combined_df = pd.DataFrame(all_results) combined_df.to_csv(PROCESSED_DIR / "all_processed.csv", index=False) # Generate and save statistics stats = { 'total_results': len(all_results), 'results_per_category': { category: len(results) for category, results in self.processed_data.items() }, 'domains_distribution': combined_df['domain'].value_counts().to_dict(), 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } with open(PROCESSED_DIR / "processing_stats.json", 'w') as f: json.dump(stats, f, indent=2) return self.processed_data