sales_analytics / data /data_generator.py
cryogenic22's picture
Create data/data_generator.py
bbe7455 verified
raw
history blame
27 kB
import sqlite3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import random
# Ensure data directory exists
os.makedirs("data", exist_ok=True)
# Connect to SQLite database
conn = sqlite3.connect("data/pharma_db.sqlite")
# Helper functions
def create_date_range(start_date, end_date):
"""Create a range of dates"""
return pd.date_range(start=start_date, end=end_date, freq='D')
def apply_trend(series, trend_factor=0.001):
"""Apply a trend to a time series"""
trend = np.arange(len(series)) * trend_factor
return series * (1 + trend)
def apply_seasonality(series, period=365, amplitude=0.1):
"""Apply seasonality to a time series"""
seasonality = amplitude * np.sin(2 * np.pi * np.arange(len(series)) / period)
return series * (1 + seasonality)
def apply_event_impact(series, event_date, impact_factor, recovery_days):
"""Apply an event impact to a time series"""
event_idx = (event_date - series.index[0]).days
if event_idx < 0 or event_idx >= len(series):
return series
impact = np.ones(len(series))
for i in range(len(series)):
if i >= event_idx:
days_since_event = i - event_idx
if days_since_event <= recovery_days:
impact[i] = 1 + impact_factor * (1 - days_since_event / recovery_days)
return series * impact
def generate_regions():
"""Generate region data"""
regions = pd.DataFrame({
'region_id': ['NE', 'SE', 'MW', 'SW', 'W'],
'region_name': ['Northeast', 'Southeast', 'Midwest', 'Southwest', 'West'],
'country': ['USA'] * 5,
'division': ['East', 'East', 'Central', 'Central', 'West'],
'population': [55000000, 62000000, 70000000, 42000000, 65000000]
})
return regions
def generate_territories(regions):
"""Generate territory data based on regions"""
territories = []
territory_mapping = {
'NE': ['NE-NYC', 'NE-BOS', 'NE-PHL', 'NE-DCA'],
'SE': ['SE-ATL', 'SE-MIA', 'SE-CLT', 'SE-NSH'],
'MW': ['MW-CHI', 'MW-DET', 'MW-MIN', 'MW-STL'],
'SW': ['SW-DAL', 'SW-HOU', 'SW-PHX', 'SW-DEN'],
'W': ['W-LAX', 'W-SFO', 'W-SEA', 'W-PDX']
}
territory_names = {
'NE-NYC': 'New York Metro', 'NE-BOS': 'New England', 'NE-PHL': 'Philadelphia', 'NE-DCA': 'DC-Baltimore',
'SE-ATL': 'Atlanta', 'SE-MIA': 'Florida', 'SE-CLT': 'Carolinas', 'SE-NSH': 'Tennessee Valley',
'MW-CHI': 'Chicago', 'MW-DET': 'Great Lakes', 'MW-MIN': 'Upper Midwest', 'MW-STL': 'Missouri Valley',
'SW-DAL': 'North Texas', 'SW-HOU': 'Gulf Coast', 'SW-PHX': 'Southwest Desert', 'SW-DEN': 'Mountain',
'W-LAX': 'Southern California', 'W-SFO': 'Northern California', 'W-SEA': 'Pacific Northwest', 'W-PDX': 'Northwest'
}
sales_reps = ['REP' + str(i).zfill(3) for i in range(1, 41)]
rep_idx = 0
for region_id, territory_ids in territory_mapping.items():
for territory_id in territory_ids:
territories.append({
'territory_id': territory_id,
'territory_name': territory_names[territory_id],
'region_id': region_id,
'sales_rep_id': sales_reps[rep_idx]
})
rep_idx += 1
return pd.DataFrame(territories)
def generate_products():
"""Generate product data"""
products = pd.DataFrame({
'product_id': ['DRX', 'PRX', 'TRX', 'ZRX', 'NRX'],
'product_name': ['DrugX', 'PainRex', 'TranquiX', 'ZymoRex', 'NeuroRex'],
'therapeutic_area': ['Cardiology', 'Pain Management', 'Neurology', 'Immunology', 'Neurology'],
'molecule': ['moleculeX', 'moleculeP', 'moleculeT', 'moleculeZ', 'moleculeN'],
'launch_date': ['2020-01-01', '2018-06-15', '2021-03-10', '2019-11-05', '2022-01-20'],
'status': ['Active', 'Active', 'Active', 'Active', 'Active'],
'list_price': [299.99, 199.99, 499.99, 399.99, 599.99]
})
# Convert date strings to datetime objects
products['launch_date'] = pd.to_datetime(products['launch_date'])
return products
def generate_competitor_products(products):
"""Generate competitor product data"""
competitor_products = pd.DataFrame({
'competitor_product_id': ['CP1', 'CP2', 'CP3', 'CP4', 'CP5', 'CP6'],
'product_name': ['CompDrug1', 'CompDrug2', 'CompDrug3', 'CompDrug4', 'CompDrug5', 'CompDrug6'],
'manufacturer': ['CompPharma', 'MedCorp', 'BioSolutions', 'GeneriCo', 'PharmGiant', 'MoleCorp'],
'therapeutic_area': ['Cardiology', 'Cardiology', 'Pain Management', 'Neurology', 'Immunology', 'Neurology'],
'molecule': ['moleculeC1', 'moleculeC2', 'moleculeC3', 'moleculeC4', 'moleculeC5', 'moleculeC6'],
'launch_date': ['2019-05-10', '2023-01-15', '2017-11-20', '2020-08-05', '2021-03-15', '2022-07-10'],
'list_price': [279.99, 259.99, 189.99, 459.99, 379.99, 549.99],
'competing_with_product_id': ['DRX', 'DRX', 'PRX', 'TRX', 'ZRX', 'NRX']
})
# Convert date strings to datetime objects
competitor_products['launch_date'] = pd.to_datetime(competitor_products['launch_date'])
return competitor_products
def generate_prescribers(territories):
"""Generate prescriber data based on territories"""
prescribers = []
specialties = ['Cardiologist', 'Neurologist', 'Internal Medicine', 'Primary Care', 'Psychiatrist',
'Rheumatologist', 'Oncologist', 'Pediatrician', 'Geriatrician', 'Endocrinologist']
practice_types = ['Hospital', 'Private Practice', 'Clinic', 'Academic', 'Group Practice']
prescriber_id = 1
for _, territory in territories.iterrows():
# Generate between 50-150 prescribers per territory
n_prescribers = np.random.randint(50, 151)
for _ in range(n_prescribers):
prescribers.append({
'prescriber_id': f'PRE{str(prescriber_id).zfill(5)}',
'name': f'Dr. LastName{prescriber_id}',
'specialty': np.random.choice(specialties),
'practice_type': np.random.choice(practice_types),
'territory_id': territory['territory_id'],
'decile': np.random.randint(1, 11) # 1-10 decile ranking
})
prescriber_id += 1
return pd.DataFrame(prescribers)
def generate_pharmacies(territories):
"""Generate pharmacy data based on territories"""
pharmacies = []
pharmacy_types = ['Chain', 'Independent', 'Hospital', 'Mail Order', 'Specialty']
pharmacy_id = 1
for _, territory in territories.iterrows():
# Generate between 30-100 pharmacies per territory
n_pharmacies = np.random.randint(30, 101)
for _ in range(n_pharmacies):
rx_volume = np.random.randint(500, 10001)
pharmacies.append({
'pharmacy_id': f'PHA{str(pharmacy_id).zfill(5)}',
'name': f'Pharmacy{pharmacy_id}',
'address': f'Address{pharmacy_id}',
'territory_id': territory['territory_id'],
'pharmacy_type': np.random.choice(pharmacy_types),
'monthly_rx_volume': rx_volume
})
pharmacy_id += 1
return pd.DataFrame(pharmacies)
def generate_distribution_centers(regions):
"""Generate distribution center data based on regions"""
distribution_centers = []
for idx, region in regions.iterrows():
# 1-2 distribution centers per region
n_dcs = np.random.randint(1, 3)
for i in range(n_dcs):
capacity = np.random.randint(50000, 200001)
distribution_centers.append({
'dc_id': f'DC{idx+1}{i+1}',
'dc_name': f'{region["region_name"]} DC {i+1}',
'region_id': region['region_id'],
'inventory_capacity': capacity
})
return pd.DataFrame(distribution_centers)
def generate_marketing_campaigns(products, start_date, end_date):
"""Generate marketing campaign data for products"""
campaigns = []
campaign_types = ['TV', 'Digital', 'Print', 'HCP Detailing', 'Patient Support', 'Conference']
target_audiences = ['Patients', 'Physicians', 'Payers', 'Pharmacists']
channels = ['Television', 'Social Media', 'Medical Journals', 'Direct Mail', 'Sales Force', 'Online']
campaign_id = 1
for _, product in products.iterrows():
# Generate 3-8 campaigns per product over the time period
n_campaigns = np.random.randint(3, 9)
time_range = (end_date - start_date).days
for _ in range(n_campaigns):
campaign_start = start_date + timedelta(days=np.random.randint(0, time_range - 60))
duration = np.random.randint(30, 121) # 30-120 day campaigns
campaign_end = campaign_start + timedelta(days=duration)
budget = np.random.randint(100000, 5000001)
spend = budget * np.random.uniform(0.85, 1.05) # 85-105% of budget
campaigns.append({
'campaign_id': campaign_id,
'campaign_name': f'{product["product_name"]} Campaign {campaign_id}',
'start_date': campaign_start,
'end_date': campaign_end,
'product_id': product['product_id'],
'campaign_type': np.random.choice(campaign_types),
'target_audience': np.random.choice(target_audiences),
'channels': np.random.choice(channels),
'budget': budget,
'spend': spend
})
campaign_id += 1
return pd.DataFrame(campaigns)
def generate_market_events(start_date, end_date):
"""Generate market events data"""
events = []
event_types = ['Competitor Launch', 'FDA Approval', 'Patent Expiry', 'Safety Alert', 'Guideline Change', 'Formulary Change']
event_id = 1
# Generate 10-20 market events
n_events = np.random.randint(10, 21)
time_range = (end_date - start_date).days
for _ in range(n_events):
event_date = start_date + timedelta(days=np.random.randint(0, time_range))
event_type = np.random.choice(event_types)
# Special event: Competitor launch of CompDrug2 targeting DrugX
if event_id == 5:
event_type = 'Competitor Launch'
event_date = end_date - timedelta(days=45) # 45 days before end date
description = 'CompDrug2 launched by MedCorp targeting the same indication as DrugX'
affected_products = 'DRX'
affected_regions = 'NE' # Northeast region
impact_score = -0.35 # Significant negative impact
else:
description = f'Market event {event_id}: {event_type}'
affected_products = np.random.choice(['DRX', 'PRX', 'TRX', 'ZRX', 'NRX', 'ALL'], p=[0.2, 0.15, 0.15, 0.15, 0.15, 0.2])
affected_regions = np.random.choice(['NE', 'SE', 'MW', 'SW', 'W', 'ALL'], p=[0.15, 0.15, 0.15, 0.15, 0.15, 0.25])
impact_score = np.random.uniform(-0.2, 0.2)
events.append({
'event_id': event_id,
'event_date': event_date,
'event_type': event_type,
'description': description,
'affected_products': affected_products,
'affected_regions': affected_regions,
'impact_score': impact_score
})
event_id += 1
return pd.DataFrame(events)
def generate_sales_targets(products, regions, start_date, end_date):
"""Generate sales target data"""
targets = []
target_id = 1
# Generate quarterly targets
quarters = pd.date_range(start=start_date, end=end_date, freq='Q')
for _, product in products.iterrows():
for _, region in regions.iterrows():
base_target_units = np.random.randint(5000, 20001)
base_target_revenue = base_target_units * product['list_price']
for quarter_start in quarters:
quarter = f'Q{quarter_start.quarter}-{quarter_start.year}'
# Add growth expectations
growth_factor = 1 + (0.03 * (quarter_start - pd.Timestamp(start_date)).days / 90)
target_units = int(base_target_units * growth_factor)
target_revenue = base_target_revenue * growth_factor
targets.append({
'target_id': target_id,
'product_id': product['product_id'],
'region_id': region['region_id'],
'period': quarter,
'target_units': target_units,
'target_revenue': target_revenue
})
target_id += 1
return pd.DataFrame(targets)
def generate_inventory(products, distribution_centers, date_range):
"""Generate inventory data"""
inventory = []
inventory_id = 1
for date in date_range:
for _, product in products.iterrows():
for _, dc in distribution_centers.iterrows():
base_units = np.random.randint(2000, 10001)
# Add some variability over time
time_factor = 1 + (0.1 * np.sin(2 * np.pi * date.dayofyear / 365))
# Special case for Northeast DCs and DrugX to simulate supply issues
supply_issue = False
if product['product_id'] == 'DRX' and dc['region_id'] == 'NE' and date >= (date_range[-1] - timedelta(days=60)):
supply_issue = True
time_factor *= 0.6 # Significant inventory reduction
units_available = int(base_units * time_factor)
units_allocated = int(units_available * np.random.uniform(0.1, 0.4))
units_in_transit = int(base_units * np.random.uniform(0.05, 0.2))
days_of_supply = np.random.uniform(15, 45)
if supply_issue:
days_of_supply *= 0.5 # Reduced days of supply during issue
inventory.append({
'inventory_id': inventory_id,
'product_id': product['product_id'],
'dc_id': dc['dc_id'],
'date': date,
'units_available': units_available,
'units_allocated': units_allocated,
'units_in_transit': units_in_transit,
'days_of_supply': days_of_supply
})
inventory_id += 1
# Sample every 7 days to reduce data volume
inventory_df = pd.DataFrame(inventory)
inventory_df = inventory_df[inventory_df['date'].dt.dayofweek == 0]
return inventory_df
def generate_external_factors(regions, date_range):
"""Generate external factors data"""
factors = []
factor_id = 1
factor_types = ['Weather Index', 'Disease Prevalence', 'Economic Index', 'Healthcare Utilization']
for date in date_range:
for _, region in regions.iterrows():
for factor_type in factor_types:
base_value = np.random.uniform(30, 70)
# Add seasonality
if factor_type == 'Weather Index':
# Weather follows annual cycle
seasonal_factor = np.sin(2 * np.pi * date.dayofyear / 365)
base_value += 20 * seasonal_factor
elif factor_type == 'Disease Prevalence':
# Disease prevalence peaks in winter
winter_factor = -np.cos(2 * np.pi * date.dayofyear / 365)
base_value += 15 * winter_factor
description = f'{factor_type} in {region["region_name"]}'
factors.append({
'factor_id': factor_id,
'date': date,
'region_id': region['region_id'],
'factor_type': factor_type,
'factor_value': base_value,
'description': description
})
factor_id += 1
# Sample every 7 days to reduce data volume
factors_df = pd.DataFrame(factors)
factors_df = factors_df[factors_df['date'].dt.dayofweek == 0]
return factors_df
def generate_daily_sales(products, territories, prescribers, pharmacies, start_date, end_date):
"""Generate daily sales data"""
print("Generating daily sales data...")
date_range = create_date_range(start_date, end_date)
# Main function to generate all data and save to SQLite
def generate_all_data():
"""Generate all synthetic data and save to SQLite database"""
print("Starting data generation...")
# Set date range for a full year of data
start_date = datetime.now() - timedelta(days=365)
end_date = datetime.now()
# Generate base data
regions = generate_regions()
territories = generate_territories(regions)
products = generate_products()
competitor_products = generate_competitor_products(products)
prescribers = generate_prescribers(territories)
pharmacies = generate_pharmacies(territories)
distribution_centers = generate_distribution_centers(regions)
# Generate time-series data (sampled weekly to reduce volume)
weekly_dates = create_date_range(start_date, end_date)[::7]
marketing_campaigns = generate_marketing_campaigns(products, start_date, end_date)
market_events = generate_market_events(start_date, end_date)
sales_targets = generate_sales_targets(products, regions, start_date, end_date)
inventory = generate_inventory(products, distribution_centers, weekly_dates)
external_factors = generate_external_factors(regions, weekly_dates)
# Generate daily sales data (most granular and largest dataset)
sales = generate_daily_sales(products, territories, prescribers, pharmacies, start_date, end_date)
# Save all data to SQLite
print("Saving data to SQLite...")
regions.to_sql('regions', conn, if_exists='replace', index=False)
territories.to_sql('territories', conn, if_exists='replace', index=False)
products.to_sql('products', conn, if_exists='replace', index=False)
competitor_products.to_sql('competitor_products', conn, if_exists='replace', index=False)
prescribers.to_sql('prescribers', conn, if_exists='replace', index=False)
pharmacies.to_sql('pharmacies', conn, if_exists='replace', index=False)
distribution_centers.to_sql('distribution_centers', conn, if_exists='replace', index=False)
marketing_campaigns.to_sql('marketing_campaigns', conn, if_exists='replace', index=False)
market_events.to_sql('market_events', conn, if_exists='replace', index=False)
sales_targets.to_sql('sales_targets', conn, if_exists='replace', index=False)
inventory.to_sql('inventory', conn, if_exists='replace', index=False)
external_factors.to_sql('external_factors', conn, if_exists='replace', index=False)
# Save sales data in batches to handle large volume
batch_size = 100000
for i in range(0, len(sales), batch_size):
batch = sales.iloc[i:i+batch_size]
if i == 0:
batch.to_sql('sales', conn, if_exists='replace', index=False)
else:
batch.to_sql('sales', conn, if_exists='append', index=False)
print(f"Saved sales batch {i//batch_size + 1}/{(len(sales)-1)//batch_size + 1}")
# Create indexes for faster queries
print("Creating database indexes...")
cursor = conn.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sales_date ON sales (sale_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sales_product ON sales (product_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sales_region ON sales (region_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sales_territory ON sales (territory_id)")
conn.commit()
print("Data generation complete!")
if __name__ == "__main__":
generate_all_data()
conn.close() end_date)
sales = []
sale_id = 1
# Get list of prescribers by territory
prescribers_by_territory = {territory: prescribers[prescribers['territory_id'] == territory].index.tolist()
for territory in territories['territory_id']}
# Get list of pharmacies by territory
pharmacies_by_territory = {territory: pharmacies[pharmacies['territory_id'] == territory].index.tolist()
for territory in territories['territory_id']}
# Base sales by product (units per day across all regions)
base_sales = {
'DRX': 1000, # DrugX - our main product
'PRX': 800,
'TRX': 600,
'ZRX': 700,
'NRX': 400
}
# Sales distribution by region (percentage of total)
region_distribution = {
'NE': 0.25, # Northeast has highest share initially
'SE': 0.20,
'MW': 0.22,
'SW': 0.15,
'W': 0.18
}
# Get territories by region
territories_by_region = {region: territories[territories['region_id'] == region]['territory_id'].tolist()
for region in regions['region_id']}
# Create a time series for each product-region combination
for product_id, base_sale in base_sales.items():
product = products[products['product_id'] == product_id].iloc[0]
for region_id, region_share in region_distribution.items():
region_territories = territories_by_region[region_id]
# Initial daily units for this product-region
initial_daily_units = base_sale * region_share
# Create time series with trend and seasonality
days = (end_date - start_date).days + 1
daily_units = pd.Series([initial_daily_units] * days, index=date_range)
# Apply general trend (slight growth)
daily_units = apply_trend(daily_units, trend_factor=0.0005)
# Apply seasonality
daily_units = apply_seasonality(daily_units, period=365, amplitude=0.1)
# Apply specific events for DrugX in Northeast
if product_id == 'DRX' and region_id == 'NE':
# 1. Competitor launch impact (45 days before end)
competitor_launch_date = end_date - timedelta(days=45)
daily_units = apply_event_impact(daily_units, competitor_launch_date, -0.25, 30)
# 2. Supply chain issues (60 days before end)
supply_issue_date = end_date - timedelta(days=60)
daily_units = apply_event_impact(daily_units, supply_issue_date, -0.15, 45)
# Distribute to territories within region (randomly, but consistently)
territory_shares = np.random.dirichlet(np.ones(len(region_territories)) * 5) # Concentration parameter for less variability
territory_distribution = dict(zip(region_territories, territory_shares))
# Generate daily sales entries
for date in date_range:
date_idx = (date - date_range[0]).days
day_units = daily_units.iloc[date_idx]
for territory_id, territory_share in territory_distribution.items():
territory_units = int(day_units * territory_share)
if territory_units > 0:
# Get available prescribers and pharmacies for this territory
territory_prescribers = prescribers_by_territory.get(territory_id, [])
territory_pharmacies = pharmacies_by_territory.get(territory_id, [])
if not territory_prescribers or not territory_pharmacies:
continue
# Determine number of sales records for this territory-day
n_sales = min(territory_units, 50) # Cap at 50 records per territory-day
# Units per sale
units_per_sale = max(1, territory_units // n_sales)
for _ in range(n_sales):
# Select random prescriber and pharmacy
prescriber_idx = np.random.choice(territory_prescribers)
pharmacy_idx = np.random.choice(territory_pharmacies)
prescriber = prescribers.iloc[prescriber_idx]
pharmacy = pharmacies.iloc[pharmacy_idx]
# Calculate revenue and cost
units = max(1, int(np.random.normal(units_per_sale, units_per_sale * 0.2)))
revenue = units * product['list_price']
cost = revenue * np.random.uniform(0.15, 0.25) # 15-25% COGS
margin = revenue - cost
sales.append({
'sale_id': sale_id,
'sale_date': date,
'product_id': product_id,
'region_id': region_id,
'territory_id': territory_id,
'prescriber_id': prescriber['prescriber_id'],
'pharmacy_id': pharmacy['pharmacy_id'],
'units_sold': units,
'revenue': revenue,
'cost': cost,
'margin': margin
})
sale_id += 1
# Convert to DataFrame
sales_df = pd.DataFrame(sales)
# Add some random noise to make data more realistic
sales_df['revenue'] = sales_df['revenue'] * np.random.uniform(0.95, 1.05, size=len(sales_df))
sales_df['cost'] = sales_df['cost'] * np.random.uniform(0.97, 1.03, size=len(sales_df))
sales_df['margin'] = sales_df['revenue'] - sales_df['cost']
return sales_df
date_range = create_date_range(start_date,