replit2 / product_manager.py
Nhughes09
deploy: clean force push
c89a139
import pandas as pd
import os
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DataProductManager:
def __init__(self, data_dir=None):
self.data_dir = data_dir or os.getenv("DATA_DIR", "data")
# Create directory structure
self.dirs = {
'bundles': os.path.join(self.data_dir, 'bundles'),
'yearly': os.path.join(self.data_dir, 'yearly'),
'quarterly': os.path.join(self.data_dir, 'quarterly'),
'monthly': os.path.join(self.data_dir, 'monthly')
}
for d in self.dirs.values():
os.makedirs(d, exist_ok=True)
def calculate_price(self, file_type, row_count):
"""Calculate optimal pricing based on data volume"""
pricing_model = {
'monthly': {'base': 99, 'per_10k': 5, 'cap': 299},
'quarterly': {'base': 249, 'per_10k': 10, 'cap': 699},
'yearly': {'base': 899, 'per_10k': 20, 'cap': 1999},
'bundle': {'base': 2999, 'per_10k': 50, 'cap': 4999}
}
model = pricing_model.get(file_type, pricing_model['monthly'])
price = model['base'] + ((row_count // 10000) * model['per_10k'])
return min(price, model['cap'])
def smart_split_csv(self, master_file, product_type):
"""
Intelligently split master CSV into marketable products
"""
if not os.path.exists(master_file):
logger.warning(f"Master file not found: {master_file}")
return {}
try:
df = pd.read_csv(master_file)
# Normalize date column
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
elif 'scraped_date' in df.columns:
df['date'] = pd.to_datetime(df['scraped_date'])
else:
# Fallback if no date column
logger.warning(f"No date column found in {master_file}")
return {}
created_files = {}
# 1. Create Bundle (Master File)
bundle_path = os.path.join(self.dirs['bundles'], f'{product_type}_FULL.csv')
df.to_csv(bundle_path, index=False)
created_files[bundle_path] = {
'type': 'bundle',
'period': 'All Time',
'rows': len(df),
'size_mb': os.path.getsize(bundle_path) / (1024*1024),
'price': self.calculate_price('bundle', len(df)),
'description': f'Complete Historical Bundle'
}
# 2. Split by Year
for year, year_data in df.groupby(df['date'].dt.year):
yearly_path = os.path.join(self.dirs['yearly'], f'{product_type}_{year}.csv')
year_data.to_csv(yearly_path, index=False)
created_files[yearly_path] = {
'type': 'yearly',
'period': str(year),
'rows': len(year_data),
'size_mb': os.path.getsize(yearly_path) / (1024*1024),
'price': self.calculate_price('yearly', len(year_data)),
'description': f'{year} Full Year Dataset'
}
# 3. Split by Quarter
for quarter in range(1, 5):
q_data = year_data[year_data['date'].dt.quarter == quarter]
if len(q_data) > 0:
q_path = os.path.join(self.dirs['quarterly'], f'{product_type}_{year}_Q{quarter}.csv')
q_data.to_csv(q_path, index=False)
created_files[q_path] = {
'type': 'quarterly',
'period': f'{year} Q{quarter}',
'rows': len(q_data),
'size_mb': os.path.getsize(q_path) / (1024*1024),
'price': self.calculate_price('quarterly', len(q_data)),
'description': f'{year} Q{quarter} Dataset'
}
# 4. Split by Month (only if we have quarterly data)
# Optimization: Only do this if requested, but for now we do it.
# Actually, let's stick to Q/Y/Bundle to avoid file explosion for this demo
# unless the user explicitly wants monthly. The prompt said "Tier 3: Monthly".
# Okay, let's do monthly.
# 4. Split by Month (DISABLED per user request)
# for month in range((quarter-1)*3 + 1, quarter*3 + 1):
# m_data = q_data[q_data['date'].dt.month == month]
# if len(m_data) > 0:
# m_path = os.path.join(self.dirs['monthly'], f'{product_type}_{year}_{month:02d}.csv')
# m_data.to_csv(m_path, index=False)
# created_files[m_path] = {
# 'type': 'monthly',
# 'period': f'{year}-{month:02d}',
# 'rows': len(m_data),
# 'size_mb': os.path.getsize(m_path) / (1024*1024),
# 'price': self.calculate_price('monthly', len(m_data)),
# 'description': f'{year}-{month:02d} Dataset'
# }
return created_files
except Exception as e:
logger.error(f"Error processing {master_file}: {e}")
return {}
def generate_catalog(self, all_products):
"""Generate a list of products for the UI."""
catalog = []
for filepath, info in all_products.items():
catalog.append({
'filename': os.path.basename(filepath),
'path': filepath, # Internal use
'type': info['type'],
'period': info['period'],
'rows': info['rows'],
'size_mb': f"{info['size_mb']:.2f}",
'price': info['price'],
'description': info['description'],
'download_url': f"/download/{os.path.basename(filepath)}"
})
# Sort by type (Bundle -> Yearly -> Quarterly -> Monthly)
order = {'bundle': 0, 'yearly': 1, 'quarterly': 2, 'monthly': 3}
catalog.sort(key=lambda x: (order.get(x['type'], 99), x['period']))
return catalog