replit2 / update_data.py
Nhughes09
deploy: clean force push
c89a139
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import random
from faker import Faker
from google_play_scraper import app as play_app
import concurrent.futures
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
fake = Faker()
DATA_DIR = os.getenv("DATA_DIR", "data")
os.makedirs(DATA_DIR, exist_ok=True)
class PremiumDataEngine:
def __init__(self):
self.verticals = {
"fintech": self.generate_fintech_data,
"ai_talent": self.generate_ai_talent_data,
"esg": self.generate_esg_data,
"regulatory": self.generate_regulatory_data,
"supply_chain": self.generate_supply_chain_data
}
# State tracking for continuity
self.fintech_state = {}
def generate_date_range(self, days_back=365):
"""Generate a list of dates for backfill."""
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
return pd.date_range(start=start_date, end=end_date).tolist()
# --- 1. FINTECH GROWTH INTELLIGENCE ---
def generate_fintech_data(self, date_obj):
"""
Product 1: Fintech Growth Intelligence
Columns: company, date, download_velocity, review_sentiment, hiring_spike,
feature_lead_score, adoption_velocity, churn_risk, funding_signal,
cac_proxy, premium_insight, alpha_window_days, smart_money_score,
# NEW ML FEATURES
download_acceleration, review_sentiment_trend, engineer_hiring_spike,
executive_departure_score, recruiting_intensity, burn_rate_proxy,
competitor_funding_gap, investor_engagement_score, api_traffic_growth,
feature_release_velocity, tech_stack_modernization
"""
companies = {
"Revolut": "com.revolut.revolut",
"Chime": "com.chime.mobile",
"N26": "de.number26.android",
"Monzo": "co.uk.getmondo",
"SoFi": "com.sofi.mobile"
}
data = []
for name, pkg in companies.items():
# Initialize state if needed
if name not in self.fintech_state:
self.fintech_state[name] = {
"signal_phase": 0, # 0 = Quiet, >0 = Active Signal
"base_velocity": 75,
"sentiment_trend": 4.2,
"prev_downloads": 75
}
state = self.fintech_state[name]
# 1. Determine Signal State (The "Smart Money" Logic)
hiring_spike = "No"
if state["signal_phase"] > 0:
state["signal_phase"] -= 1
if state["signal_phase"] == 12: # Start of signal
hiring_spike = "Yes"
else:
if random.random() < 0.02:
state["signal_phase"] = 14
hiring_spike = "Yes"
# 2. Calculate Metrics
growth_factor = 1.02
days_passed = (date_obj - datetime(2025, 1, 1)).days
exponential_boost = state["base_velocity"] * (growth_factor ** max(0, days_passed/30))
if state["signal_phase"] > 0:
signal_maturity = (14 - state["signal_phase"]) / 14
velocity_boost = 50 * signal_maturity
smart_money_score = int(85 + (10 * (1 - signal_maturity)) + random.uniform(-2, 2))
insight = f"Accumulation detected: {state['signal_phase']} days remaining in Alpha Window"
else:
velocity_boost = 0
smart_money_score = int(random.normalvariate(50, 10))
insight = "Stable accumulation - no institutional anomalies"
download_velocity = int(np.random.normal(exponential_boost + velocity_boost, 10))
# Calculate Acceleration
download_acceleration = download_velocity - state["prev_downloads"]
state["prev_downloads"] = download_velocity
# Sentiment drift
state["sentiment_trend"] += random.uniform(-0.05, 0.05)
state["sentiment_trend"] = max(3.5, min(4.9, state["sentiment_trend"]))
review_sentiment = round(state["sentiment_trend"], 1)
review_sentiment_trend = random.uniform(-0.1, 0.1) # Slope
feature_lead = random.randint(60, 95)
adoption_velocity = int((download_velocity * 0.6) + (feature_lead * 0.4))
churn_risk = max(1, min(10, int((5.0 - review_sentiment) * 10)))
funding_signal = "Strong" if hiring_spike == "Yes" else "Moderate" if adoption_velocity > 100 else "Weak"
cac_proxy = random.randint(35, 85) # Changed to int for ML
alpha_window_days = state["signal_phase"] if state["signal_phase"] > 0 else 0
# NEW ML FEATURES
engineer_hiring_spike = 1 if hiring_spike == "Yes" else 0
executive_departure_score = random.randint(0, 100)
recruiting_intensity = random.uniform(0.5, 5.0)
burn_rate_proxy = random.uniform(1.0, 10.0) # $M/month
competitor_funding_gap = random.randint(0, 365)
investor_engagement_score = random.randint(0, 100)
api_traffic_growth = random.uniform(-10, 50)
feature_release_velocity = random.randint(1, 10)
tech_stack_modernization = random.choice([0, 1])
data.append({
"company": name,
"date": date_obj.strftime("%Y-%m-%d"),
"download_velocity": download_velocity,
"review_sentiment": review_sentiment,
"hiring_spike": hiring_spike,
"feature_lead_score": feature_lead,
"adoption_velocity": adoption_velocity,
"churn_risk": churn_risk,
"funding_signal": funding_signal,
"cac_proxy": cac_proxy,
"premium_insight": insight,
"alpha_window_days": alpha_window_days,
"smart_money_score": smart_money_score,
# ML Features
"download_acceleration": download_acceleration,
"review_sentiment_trend": review_sentiment_trend,
"engineer_hiring_spike": engineer_hiring_spike,
"executive_departure_score": executive_departure_score,
"recruiting_intensity": recruiting_intensity,
"burn_rate_proxy": burn_rate_proxy,
"competitor_funding_gap": competitor_funding_gap,
"investor_engagement_score": investor_engagement_score,
"api_traffic_growth": api_traffic_growth,
"feature_release_velocity": feature_release_velocity,
"tech_stack_modernization": tech_stack_modernization
})
return data
# --- 2. AI TALENT & CAPITAL PREDICTION ---
def generate_ai_talent_data(self, date_obj):
"""
Product 2: AI Talent & Capital Prediction
Columns: company, date, github_stars_7d, arxiv_papers, citations, patents_filed,
investor_engagement, funding_probability, technical_momentum, talent_score, premium_insight,
innovation_delay_days, benchmark_inflation_pct, flight_status,
# ML FEATURES
performance_leap_magnitude, commercialization_timeline
"""
companies = ["OpenAI", "Anthropic", "StabilityAI", "Cohere", "Hugging Face"]
data = []
for co in companies:
# Exponential Interest Curve
days_passed = (date_obj - datetime(2025, 1, 1)).days
interest_compound = 1.015 ** max(0, days_passed/7) # Weekly compounding
base_stars = 200
github_stars = f"+{int(np.random.exponential(base_stars * interest_compound))}"
arxiv = np.random.poisson(2 * (1 + days_passed/365)) # Linear growth for papers
citations = int(np.random.exponential(50))
patents = np.random.poisson(0.5)
investor_engagement = random.choice(["High", "Medium", "Low"])
# Proprietary Metrics
tech_momentum = min(100, int((arxiv * 10) + (citations * 0.5) + (int(github_stars.replace('+',''))/10)))
talent_score = random.randint(60, 99)
funding_prob = f"{min(99, int(tech_momentum * 0.8 + talent_score * 0.1))}%"
# New Profit Metrics
innovation_delay_days = random.choice([0, 0, 0, 30, 60, 90, 180])
benchmark_inflation_pct = random.randint(0, 50)
flight_status = "On Time" if innovation_delay_days == 0 else "Delayed"
if tech_momentum > 90:
flight_status = "Accelerating"
if "High" in investor_engagement and tech_momentum > 80:
insight = "Strong Series D candidate - investor engagement at all-time high"
elif tech_momentum < 40:
insight = "Momentum slowing - may seek acquisition vs. next round"
else:
insight = "Steady technical output, organic growth phase"
# ML Features
performance_leap_magnitude = random.uniform(10.0, 50.0) # % improvement
commercialization_timeline = random.randint(3, 18) # months
data.append({
"company": co,
"date": date_obj.strftime("%Y-%m-%d"),
"github_stars_7d": github_stars,
"arxiv_papers": arxiv,
"citations": citations,
"patents_filed": patents,
"investor_engagement": investor_engagement,
"funding_probability": funding_prob,
"technical_momentum": tech_momentum,
"talent_score": talent_score,
"premium_insight": insight,
"innovation_delay_days": innovation_delay_days,
"benchmark_inflation_pct": benchmark_inflation_pct,
"flight_status": flight_status,
# ML Features
"performance_leap_magnitude": performance_leap_magnitude,
"commercialization_timeline": commercialization_timeline
})
return data
# --- 3. ESG IMPACT & GREENWASHING DETECTOR ---
def generate_esg_data(self, date_obj):
"""
Product 3: ESG Impact & Greenwashing Detector
Columns: company, date, esg_claims, verifiable_actions, greenwashing_index,
regulatory_risk, stakeholder_score, impact_verified, premium_insight,
claims_psi, reality_psi, greenwashing_gap_pct,
# ML FEATURES
audit_gap_size, supplier_esg_score, employee_whistleblower_count,
carbon_credit_validity_score
"""
companies = ["Tesla", "ExxonMobil", "Unilever", "BlackRock", "Patagonia"]
data = []
for co in companies:
claims = random.randint(10, 50)
verified = int(claims * random.uniform(0.2, 0.9))
# Proprietary Metrics
greenwashing_index = int((1 - (verified/claims)) * 100)
reg_risk = "High" if greenwashing_index > 60 else "Medium" if greenwashing_index > 30 else "Low"
stakeholder_score = random.randint(40, 95)
impact_verified = f"{int((verified/claims)*100)}%"
# New Profit Metrics
claims_psi = 100
reality_psi = int((verified/claims) * 100)
greenwashing_gap_pct = claims_psi - reality_psi
if greenwashing_index > 70:
insight = f"High greenwashing risk - {100-int((verified/claims)*100)}% of claims lack verification"
elif stakeholder_score > 85:
insight = "Strong stakeholder alignment driving brand equity"
else:
insight = "Strong on operations but weak on supply chain transparency"
# ML Features
audit_gap_size = claims - verified
supplier_esg_score = random.randint(0, 100)
employee_whistleblower_count = random.randint(0, 5)
carbon_credit_validity_score = random.randint(0, 100)
data.append({
"company": co,
"date": date_obj.strftime("%Y-%m-%d"),
"esg_claims": claims,
"verifiable_actions": verified,
"greenwashing_index": greenwashing_index,
"regulatory_risk": reg_risk,
"stakeholder_score": stakeholder_score,
"impact_verified": impact_verified,
"premium_insight": insight,
"claims_psi": claims_psi,
"reality_psi": reality_psi,
"greenwashing_gap_pct": greenwashing_gap_pct,
# ML Features
"audit_gap_size": audit_gap_size,
"supplier_esg_score": supplier_esg_score,
"employee_whistleblower_count": employee_whistleblower_count,
"carbon_credit_validity_score": carbon_credit_validity_score
})
return data
# --- 4. REGULATORY COMPLIANCE PREDICTION ---
def generate_regulatory_data(self, date_obj):
"""
Product 4: Regulatory Compliance Prediction
Columns: company, date, enforcement_probability, compliance_gap, fines_estimate,
remediation_cost, whistleblower_risk, regulatory_foresight, premium_insight,
enforcement_probability_pct, fine_impact_usd,
# ML FEATURES
action_timeline_days
"""
companies = ["Meta", "Coinbase", "Amazon", "Pfizer", "Goldman Sachs"]
data = []
for co in companies:
enf_prob = random.randint(10, 90)
gap = "Large" if enf_prob > 70 else "Medium" if enf_prob > 40 else "Small"
fines = f"${random.randint(10, 5000)}M"
remediation = f"${random.randint(5, 1000)}M"
whistleblower = "High" if enf_prob > 60 else "Low"
foresight = random.randint(20, 90)
# New Profit Metrics
enforcement_probability_pct = enf_prob
fine_impact_usd = random.randint(10, 5000) * 1000000
if enf_prob > 75:
insight = "High risk of antitrust action - compliance gaps significant"
elif foresight > 80:
insight = "Proactive compliance strategy mitigating sector risks"
else:
insight = "Moderate risk - improving compliance but scrutiny remains"
# ML Features
action_timeline_days = random.randint(30, 180)
data.append({
"company": co,
"date": date_obj.strftime("%Y-%m-%d"),
"enforcement_probability": f"{enf_prob}%",
"compliance_gap": gap,
"fines_estimate": fines,
"remediation_cost": remediation,
"whistleblower_risk": whistleblower,
"regulatory_foresight": foresight,
"premium_insight": insight,
"enforcement_probability_pct": enforcement_probability_pct,
"fine_impact_usd": fine_impact_usd,
# ML Features
"action_timeline_days": action_timeline_days
})
return data
# --- 5. SUPPLY CHAIN RESILIENCE ---
def generate_supply_chain_data(self, date_obj):
"""
Product 5: Supply Chain Resilience
Columns: company, date, disruption_risk, recovery_days, single_point_failure,
cost_inflation, resilience_score, premium_insight,
disruption_probability, days_to_impact,
# ML FEATURES
impact_revenue_pct
"""
companies = ["Apple", "Ford", "Nike", "Toyota", "Samsung"]
data = []
for co in companies:
risk = random.randint(10, 80)
recovery = int(risk * 0.6)
failure_pt = "High" if risk > 60 else "Medium" if risk > 30 else "Low"
inflation = f"{round(random.uniform(1.0, 15.0), 1)}%"
resilience = 100 - risk
# New Profit Metrics
disruption_probability = risk
days_to_impact = random.randint(5, 60)
if risk > 60:
insight = "High battery/chip supply risk - alternative suppliers needed urgently"
elif resilience > 75:
insight = "Strong supplier diversification but regional dependency remains"
else:
insight = "Stable supply chain with moderate inflationary pressure"
# ML Features
impact_revenue_pct = random.uniform(0.5, 5.0)
data.append({
"company": co,
"date": date_obj.strftime("%Y-%m-%d"),
"disruption_risk": risk,
"recovery_days": recovery,
"single_point_failure": failure_pt,
"cost_inflation": inflation,
"resilience_score": resilience,
"premium_insight": insight,
"disruption_probability": disruption_probability,
"days_to_impact": days_to_impact,
# ML Features
"impact_revenue_pct": impact_revenue_pct
})
return data
def run_pipeline(self):
"""Run the full data pipeline (Backfill + Update)."""
logger.info("Starting Premium Data Engine Pipeline...")
# Define file paths
files = {
"fintech": "fintech_growth_digest.csv",
"ai_talent": "ai_talent_heatmap.csv",
"esg": "esg_sentiment_tracker.csv",
"regulatory": "regulatory_risk_index.csv",
"supply_chain": "supply_chain_risk.csv"
}
total_added_bytes = 0
details = {}
for key, generator in self.verticals.items():
base_filename = files[key].replace('.csv', '')
# 1. Generate or Load Full Dataset
full_df = pd.DataFrame()
# Check if we have existing data to append to
# We'll look for the Yearly file as the "master"
yearly_path = os.path.join(DATA_DIR, f"{base_filename}_2025_yearly.csv")
if not os.path.exists(yearly_path):
logger.info(f"Backfilling {key} (365 days)...")
dates = self.generate_date_range(365)
all_data = []
for d in dates:
all_data.extend(generator(d))
full_df = pd.DataFrame(all_data)
else:
logger.info(f"Updating {key} (Daily)...")
# Load existing
full_df = pd.read_csv(yearly_path)
# Generate today's data
today = datetime.now()
today_str = today.strftime("%Y-%m-%d")
# Check if today exists
if today_str not in full_df['date'].values:
today_data = generator(today)
new_row = pd.DataFrame(today_data)
full_df = pd.concat([full_df, new_row], ignore_index=True)
# 2. Save Split Files
# Ensure 'date' is datetime
full_df['date'] = pd.to_datetime(full_df['date'])
# Save Yearly (2025)
df_2025 = full_df[full_df['date'].dt.year == 2025]
if not df_2025.empty:
df_2025.to_csv(yearly_path, index=False)
details[f"{base_filename}_2025_yearly.csv"] = os.path.getsize(yearly_path)
# Save Quarterlys
for q in [1, 2, 3, 4]:
df_q = df_2025[df_2025['date'].dt.quarter == q]
if not df_q.empty:
q_path = os.path.join(DATA_DIR, f"{base_filename}_2025_q{q}.csv")
df_q.to_csv(q_path, index=False)
details[f"{base_filename}_2025_q{q}.csv"] = os.path.getsize(q_path)
# Save "Latest" for Preview API (Legacy support)
# We'll just overwrite the original filename so API doesn't break immediately
legacy_path = os.path.join(DATA_DIR, files[key])
full_df.to_csv(legacy_path, index=False)
return self.finalize_status()
def finalize_status(self):
# Calculate total size of data folder
total_size = sum(os.path.getsize(os.path.join(DATA_DIR, f)) for f in os.listdir(DATA_DIR) if f.endswith('.csv'))
# Save Status
import json
status = {
"last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC"),
"total_data_size_bytes": total_size,
"status": "Premium Data Pipeline Active"
}
with open(os.path.join(DATA_DIR, "status.json"), "w") as f:
json.dump(status, f)
return status
def update_dataset():
engine = PremiumDataEngine()
# Measure sizes before
before_sizes = {}
for f in os.listdir(DATA_DIR):
if f.endswith(".csv"):
before_sizes[f] = os.path.getsize(os.path.join(DATA_DIR, f))
engine.run_pipeline()
# Measure sizes after
total_added = 0
details = {}
for f in os.listdir(DATA_DIR):
if f.endswith(".csv"):
new = os.path.getsize(os.path.join(DATA_DIR, f))
old = before_sizes.get(f, 0)
diff = new - old
if diff > 0:
total_added += diff
details[f] = diff
# Update status with delta
import json
status_path = os.path.join(DATA_DIR, "status.json")
if os.path.exists(status_path):
with open(status_path, 'r') as f:
st = json.load(f)
st['total_added_bytes'] = total_added
st['details'] = details
with open(status_path, 'w') as f:
json.dump(st, f)
return total_added
if __name__ == "__main__":
update_dataset()