import pandas as pd import os from datetime import datetime import logging logger = logging.getLogger(__name__) class DataProductManager: def __init__(self, data_dir=None): self.data_dir = data_dir or os.getenv("DATA_DIR", "data") # Create directory structure self.dirs = { 'bundles': os.path.join(self.data_dir, 'bundles'), 'yearly': os.path.join(self.data_dir, 'yearly'), 'quarterly': os.path.join(self.data_dir, 'quarterly'), 'monthly': os.path.join(self.data_dir, 'monthly') } for d in self.dirs.values(): os.makedirs(d, exist_ok=True) def calculate_price(self, file_type, row_count): """Calculate optimal pricing based on data volume""" pricing_model = { 'monthly': {'base': 99, 'per_10k': 5, 'cap': 299}, 'quarterly': {'base': 249, 'per_10k': 10, 'cap': 699}, 'yearly': {'base': 899, 'per_10k': 20, 'cap': 1999}, 'bundle': {'base': 2999, 'per_10k': 50, 'cap': 4999} } model = pricing_model.get(file_type, pricing_model['monthly']) price = model['base'] + ((row_count // 10000) * model['per_10k']) return min(price, model['cap']) def smart_split_csv(self, master_file, product_type): """ Intelligently split master CSV into marketable products """ if not os.path.exists(master_file): logger.warning(f"Master file not found: {master_file}") return {} try: df = pd.read_csv(master_file) # Normalize date column if 'date' in df.columns: df['date'] = pd.to_datetime(df['date']) elif 'scraped_date' in df.columns: df['date'] = pd.to_datetime(df['scraped_date']) else: # Fallback if no date column logger.warning(f"No date column found in {master_file}") return {} created_files = {} # 1. Create Bundle (Master File) bundle_path = os.path.join(self.dirs['bundles'], f'{product_type}_FULL.csv') df.to_csv(bundle_path, index=False) created_files[bundle_path] = { 'type': 'bundle', 'period': 'All Time', 'rows': len(df), 'size_mb': os.path.getsize(bundle_path) / (1024*1024), 'price': self.calculate_price('bundle', len(df)), 'description': f'Complete Historical Bundle' } # 2. Split by Year for year, year_data in df.groupby(df['date'].dt.year): yearly_path = os.path.join(self.dirs['yearly'], f'{product_type}_{year}.csv') year_data.to_csv(yearly_path, index=False) created_files[yearly_path] = { 'type': 'yearly', 'period': str(year), 'rows': len(year_data), 'size_mb': os.path.getsize(yearly_path) / (1024*1024), 'price': self.calculate_price('yearly', len(year_data)), 'description': f'{year} Full Year Dataset' } # 3. Split by Quarter for quarter in range(1, 5): q_data = year_data[year_data['date'].dt.quarter == quarter] if len(q_data) > 0: q_path = os.path.join(self.dirs['quarterly'], f'{product_type}_{year}_Q{quarter}.csv') q_data.to_csv(q_path, index=False) created_files[q_path] = { 'type': 'quarterly', 'period': f'{year} Q{quarter}', 'rows': len(q_data), 'size_mb': os.path.getsize(q_path) / (1024*1024), 'price': self.calculate_price('quarterly', len(q_data)), 'description': f'{year} Q{quarter} Dataset' } # 4. Split by Month (only if we have quarterly data) # Optimization: Only do this if requested, but for now we do it. # Actually, let's stick to Q/Y/Bundle to avoid file explosion for this demo # unless the user explicitly wants monthly. The prompt said "Tier 3: Monthly". # Okay, let's do monthly. # 4. Split by Month (DISABLED per user request) # for month in range((quarter-1)*3 + 1, quarter*3 + 1): # m_data = q_data[q_data['date'].dt.month == month] # if len(m_data) > 0: # m_path = os.path.join(self.dirs['monthly'], f'{product_type}_{year}_{month:02d}.csv') # m_data.to_csv(m_path, index=False) # created_files[m_path] = { # 'type': 'monthly', # 'period': f'{year}-{month:02d}', # 'rows': len(m_data), # 'size_mb': os.path.getsize(m_path) / (1024*1024), # 'price': self.calculate_price('monthly', len(m_data)), # 'description': f'{year}-{month:02d} Dataset' # } return created_files except Exception as e: logger.error(f"Error processing {master_file}: {e}") return {} def generate_catalog(self, all_products): """Generate a list of products for the UI.""" catalog = [] for filepath, info in all_products.items(): catalog.append({ 'filename': os.path.basename(filepath), 'path': filepath, # Internal use 'type': info['type'], 'period': info['period'], 'rows': info['rows'], 'size_mb': f"{info['size_mb']:.2f}", 'price': info['price'], 'description': info['description'], 'download_url': f"/download/{os.path.basename(filepath)}" }) # Sort by type (Bundle -> Yearly -> Quarterly -> Monthly) order = {'bundle': 0, 'yearly': 1, 'quarterly': 2, 'monthly': 3} catalog.sort(key=lambda x: (order.get(x['type'], 99), x['period'])) return catalog