File size: 6,702 Bytes
c89a139 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | import pandas as pd
import os
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DataProductManager:
def __init__(self, data_dir=None):
self.data_dir = data_dir or os.getenv("DATA_DIR", "data")
# Create directory structure
self.dirs = {
'bundles': os.path.join(self.data_dir, 'bundles'),
'yearly': os.path.join(self.data_dir, 'yearly'),
'quarterly': os.path.join(self.data_dir, 'quarterly'),
'monthly': os.path.join(self.data_dir, 'monthly')
}
for d in self.dirs.values():
os.makedirs(d, exist_ok=True)
def calculate_price(self, file_type, row_count):
"""Calculate optimal pricing based on data volume"""
pricing_model = {
'monthly': {'base': 99, 'per_10k': 5, 'cap': 299},
'quarterly': {'base': 249, 'per_10k': 10, 'cap': 699},
'yearly': {'base': 899, 'per_10k': 20, 'cap': 1999},
'bundle': {'base': 2999, 'per_10k': 50, 'cap': 4999}
}
model = pricing_model.get(file_type, pricing_model['monthly'])
price = model['base'] + ((row_count // 10000) * model['per_10k'])
return min(price, model['cap'])
def smart_split_csv(self, master_file, product_type):
"""
Intelligently split master CSV into marketable products
"""
if not os.path.exists(master_file):
logger.warning(f"Master file not found: {master_file}")
return {}
try:
df = pd.read_csv(master_file)
# Normalize date column
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
elif 'scraped_date' in df.columns:
df['date'] = pd.to_datetime(df['scraped_date'])
else:
# Fallback if no date column
logger.warning(f"No date column found in {master_file}")
return {}
created_files = {}
# 1. Create Bundle (Master File)
bundle_path = os.path.join(self.dirs['bundles'], f'{product_type}_FULL.csv')
df.to_csv(bundle_path, index=False)
created_files[bundle_path] = {
'type': 'bundle',
'period': 'All Time',
'rows': len(df),
'size_mb': os.path.getsize(bundle_path) / (1024*1024),
'price': self.calculate_price('bundle', len(df)),
'description': f'Complete Historical Bundle'
}
# 2. Split by Year
for year, year_data in df.groupby(df['date'].dt.year):
yearly_path = os.path.join(self.dirs['yearly'], f'{product_type}_{year}.csv')
year_data.to_csv(yearly_path, index=False)
created_files[yearly_path] = {
'type': 'yearly',
'period': str(year),
'rows': len(year_data),
'size_mb': os.path.getsize(yearly_path) / (1024*1024),
'price': self.calculate_price('yearly', len(year_data)),
'description': f'{year} Full Year Dataset'
}
# 3. Split by Quarter
for quarter in range(1, 5):
q_data = year_data[year_data['date'].dt.quarter == quarter]
if len(q_data) > 0:
q_path = os.path.join(self.dirs['quarterly'], f'{product_type}_{year}_Q{quarter}.csv')
q_data.to_csv(q_path, index=False)
created_files[q_path] = {
'type': 'quarterly',
'period': f'{year} Q{quarter}',
'rows': len(q_data),
'size_mb': os.path.getsize(q_path) / (1024*1024),
'price': self.calculate_price('quarterly', len(q_data)),
'description': f'{year} Q{quarter} Dataset'
}
# 4. Split by Month (only if we have quarterly data)
# Optimization: Only do this if requested, but for now we do it.
# Actually, let's stick to Q/Y/Bundle to avoid file explosion for this demo
# unless the user explicitly wants monthly. The prompt said "Tier 3: Monthly".
# Okay, let's do monthly.
# 4. Split by Month (DISABLED per user request)
# for month in range((quarter-1)*3 + 1, quarter*3 + 1):
# m_data = q_data[q_data['date'].dt.month == month]
# if len(m_data) > 0:
# m_path = os.path.join(self.dirs['monthly'], f'{product_type}_{year}_{month:02d}.csv')
# m_data.to_csv(m_path, index=False)
# created_files[m_path] = {
# 'type': 'monthly',
# 'period': f'{year}-{month:02d}',
# 'rows': len(m_data),
# 'size_mb': os.path.getsize(m_path) / (1024*1024),
# 'price': self.calculate_price('monthly', len(m_data)),
# 'description': f'{year}-{month:02d} Dataset'
# }
return created_files
except Exception as e:
logger.error(f"Error processing {master_file}: {e}")
return {}
def generate_catalog(self, all_products):
"""Generate a list of products for the UI."""
catalog = []
for filepath, info in all_products.items():
catalog.append({
'filename': os.path.basename(filepath),
'path': filepath, # Internal use
'type': info['type'],
'period': info['period'],
'rows': info['rows'],
'size_mb': f"{info['size_mb']:.2f}",
'price': info['price'],
'description': info['description'],
'download_url': f"/download/{os.path.basename(filepath)}"
})
# Sort by type (Bundle -> Yearly -> Quarterly -> Monthly)
order = {'bundle': 0, 'yearly': 1, 'quarterly': 2, 'monthly': 3}
catalog.sort(key=lambda x: (order.get(x['type'], 99), x['period']))
return catalog
|