|
|
import os |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from datetime import datetime, timedelta |
|
|
import logging |
|
|
import random |
|
|
from faker import Faker |
|
|
from google_play_scraper import app as play_app |
|
|
import concurrent.futures |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
|
handlers=[logging.StreamHandler()] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
fake = Faker() |
|
|
DATA_DIR = os.getenv("DATA_DIR", "data") |
|
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
|
|
class PremiumDataEngine: |
|
|
def __init__(self): |
|
|
self.verticals = { |
|
|
"fintech": self.generate_fintech_data, |
|
|
"ai_talent": self.generate_ai_talent_data, |
|
|
"esg": self.generate_esg_data, |
|
|
"regulatory": self.generate_regulatory_data, |
|
|
"supply_chain": self.generate_supply_chain_data |
|
|
} |
|
|
|
|
|
self.fintech_state = {} |
|
|
|
|
|
def generate_date_range(self, days_back=365): |
|
|
"""Generate a list of dates for backfill.""" |
|
|
end_date = datetime.now() |
|
|
start_date = end_date - timedelta(days=days_back) |
|
|
return pd.date_range(start=start_date, end=end_date).tolist() |
|
|
|
|
|
|
|
|
def generate_fintech_data(self, date_obj): |
|
|
""" |
|
|
Product 1: Fintech Growth Intelligence |
|
|
Columns: company, date, download_velocity, review_sentiment, hiring_spike, |
|
|
feature_lead_score, adoption_velocity, churn_risk, funding_signal, |
|
|
cac_proxy, premium_insight, alpha_window_days, smart_money_score, |
|
|
# NEW ML FEATURES |
|
|
download_acceleration, review_sentiment_trend, engineer_hiring_spike, |
|
|
executive_departure_score, recruiting_intensity, burn_rate_proxy, |
|
|
competitor_funding_gap, investor_engagement_score, api_traffic_growth, |
|
|
feature_release_velocity, tech_stack_modernization |
|
|
""" |
|
|
companies = { |
|
|
"Revolut": "com.revolut.revolut", |
|
|
"Chime": "com.chime.mobile", |
|
|
"N26": "de.number26.android", |
|
|
"Monzo": "co.uk.getmondo", |
|
|
"SoFi": "com.sofi.mobile" |
|
|
} |
|
|
|
|
|
data = [] |
|
|
for name, pkg in companies.items(): |
|
|
|
|
|
if name not in self.fintech_state: |
|
|
self.fintech_state[name] = { |
|
|
"signal_phase": 0, |
|
|
"base_velocity": 75, |
|
|
"sentiment_trend": 4.2, |
|
|
"prev_downloads": 75 |
|
|
} |
|
|
|
|
|
state = self.fintech_state[name] |
|
|
|
|
|
|
|
|
hiring_spike = "No" |
|
|
if state["signal_phase"] > 0: |
|
|
state["signal_phase"] -= 1 |
|
|
if state["signal_phase"] == 12: |
|
|
hiring_spike = "Yes" |
|
|
else: |
|
|
if random.random() < 0.02: |
|
|
state["signal_phase"] = 14 |
|
|
hiring_spike = "Yes" |
|
|
|
|
|
|
|
|
growth_factor = 1.02 |
|
|
days_passed = (date_obj - datetime(2025, 1, 1)).days |
|
|
exponential_boost = state["base_velocity"] * (growth_factor ** max(0, days_passed/30)) |
|
|
|
|
|
if state["signal_phase"] > 0: |
|
|
signal_maturity = (14 - state["signal_phase"]) / 14 |
|
|
velocity_boost = 50 * signal_maturity |
|
|
smart_money_score = int(85 + (10 * (1 - signal_maturity)) + random.uniform(-2, 2)) |
|
|
insight = f"Accumulation detected: {state['signal_phase']} days remaining in Alpha Window" |
|
|
else: |
|
|
velocity_boost = 0 |
|
|
smart_money_score = int(random.normalvariate(50, 10)) |
|
|
insight = "Stable accumulation - no institutional anomalies" |
|
|
|
|
|
download_velocity = int(np.random.normal(exponential_boost + velocity_boost, 10)) |
|
|
|
|
|
|
|
|
download_acceleration = download_velocity - state["prev_downloads"] |
|
|
state["prev_downloads"] = download_velocity |
|
|
|
|
|
|
|
|
state["sentiment_trend"] += random.uniform(-0.05, 0.05) |
|
|
state["sentiment_trend"] = max(3.5, min(4.9, state["sentiment_trend"])) |
|
|
review_sentiment = round(state["sentiment_trend"], 1) |
|
|
review_sentiment_trend = random.uniform(-0.1, 0.1) |
|
|
|
|
|
feature_lead = random.randint(60, 95) |
|
|
adoption_velocity = int((download_velocity * 0.6) + (feature_lead * 0.4)) |
|
|
churn_risk = max(1, min(10, int((5.0 - review_sentiment) * 10))) |
|
|
funding_signal = "Strong" if hiring_spike == "Yes" else "Moderate" if adoption_velocity > 100 else "Weak" |
|
|
cac_proxy = random.randint(35, 85) |
|
|
alpha_window_days = state["signal_phase"] if state["signal_phase"] > 0 else 0 |
|
|
|
|
|
|
|
|
engineer_hiring_spike = 1 if hiring_spike == "Yes" else 0 |
|
|
executive_departure_score = random.randint(0, 100) |
|
|
recruiting_intensity = random.uniform(0.5, 5.0) |
|
|
burn_rate_proxy = random.uniform(1.0, 10.0) |
|
|
competitor_funding_gap = random.randint(0, 365) |
|
|
investor_engagement_score = random.randint(0, 100) |
|
|
api_traffic_growth = random.uniform(-10, 50) |
|
|
feature_release_velocity = random.randint(1, 10) |
|
|
tech_stack_modernization = random.choice([0, 1]) |
|
|
|
|
|
data.append({ |
|
|
"company": name, |
|
|
"date": date_obj.strftime("%Y-%m-%d"), |
|
|
"download_velocity": download_velocity, |
|
|
"review_sentiment": review_sentiment, |
|
|
"hiring_spike": hiring_spike, |
|
|
"feature_lead_score": feature_lead, |
|
|
"adoption_velocity": adoption_velocity, |
|
|
"churn_risk": churn_risk, |
|
|
"funding_signal": funding_signal, |
|
|
"cac_proxy": cac_proxy, |
|
|
"premium_insight": insight, |
|
|
"alpha_window_days": alpha_window_days, |
|
|
"smart_money_score": smart_money_score, |
|
|
|
|
|
"download_acceleration": download_acceleration, |
|
|
"review_sentiment_trend": review_sentiment_trend, |
|
|
"engineer_hiring_spike": engineer_hiring_spike, |
|
|
"executive_departure_score": executive_departure_score, |
|
|
"recruiting_intensity": recruiting_intensity, |
|
|
"burn_rate_proxy": burn_rate_proxy, |
|
|
"competitor_funding_gap": competitor_funding_gap, |
|
|
"investor_engagement_score": investor_engagement_score, |
|
|
"api_traffic_growth": api_traffic_growth, |
|
|
"feature_release_velocity": feature_release_velocity, |
|
|
"tech_stack_modernization": tech_stack_modernization |
|
|
}) |
|
|
return data |
|
|
|
|
|
|
|
|
def generate_ai_talent_data(self, date_obj): |
|
|
""" |
|
|
Product 2: AI Talent & Capital Prediction |
|
|
Columns: company, date, github_stars_7d, arxiv_papers, citations, patents_filed, |
|
|
investor_engagement, funding_probability, technical_momentum, talent_score, premium_insight, |
|
|
innovation_delay_days, benchmark_inflation_pct, flight_status, |
|
|
# ML FEATURES |
|
|
performance_leap_magnitude, commercialization_timeline |
|
|
""" |
|
|
companies = ["OpenAI", "Anthropic", "StabilityAI", "Cohere", "Hugging Face"] |
|
|
|
|
|
data = [] |
|
|
for co in companies: |
|
|
|
|
|
days_passed = (date_obj - datetime(2025, 1, 1)).days |
|
|
interest_compound = 1.015 ** max(0, days_passed/7) |
|
|
|
|
|
base_stars = 200 |
|
|
github_stars = f"+{int(np.random.exponential(base_stars * interest_compound))}" |
|
|
arxiv = np.random.poisson(2 * (1 + days_passed/365)) |
|
|
citations = int(np.random.exponential(50)) |
|
|
patents = np.random.poisson(0.5) |
|
|
investor_engagement = random.choice(["High", "Medium", "Low"]) |
|
|
|
|
|
|
|
|
tech_momentum = min(100, int((arxiv * 10) + (citations * 0.5) + (int(github_stars.replace('+',''))/10))) |
|
|
talent_score = random.randint(60, 99) |
|
|
funding_prob = f"{min(99, int(tech_momentum * 0.8 + talent_score * 0.1))}%" |
|
|
|
|
|
|
|
|
innovation_delay_days = random.choice([0, 0, 0, 30, 60, 90, 180]) |
|
|
benchmark_inflation_pct = random.randint(0, 50) |
|
|
flight_status = "On Time" if innovation_delay_days == 0 else "Delayed" |
|
|
if tech_momentum > 90: |
|
|
flight_status = "Accelerating" |
|
|
|
|
|
if "High" in investor_engagement and tech_momentum > 80: |
|
|
insight = "Strong Series D candidate - investor engagement at all-time high" |
|
|
elif tech_momentum < 40: |
|
|
insight = "Momentum slowing - may seek acquisition vs. next round" |
|
|
else: |
|
|
insight = "Steady technical output, organic growth phase" |
|
|
|
|
|
|
|
|
performance_leap_magnitude = random.uniform(10.0, 50.0) |
|
|
commercialization_timeline = random.randint(3, 18) |
|
|
|
|
|
data.append({ |
|
|
"company": co, |
|
|
"date": date_obj.strftime("%Y-%m-%d"), |
|
|
"github_stars_7d": github_stars, |
|
|
"arxiv_papers": arxiv, |
|
|
"citations": citations, |
|
|
"patents_filed": patents, |
|
|
"investor_engagement": investor_engagement, |
|
|
"funding_probability": funding_prob, |
|
|
"technical_momentum": tech_momentum, |
|
|
"talent_score": talent_score, |
|
|
"premium_insight": insight, |
|
|
"innovation_delay_days": innovation_delay_days, |
|
|
"benchmark_inflation_pct": benchmark_inflation_pct, |
|
|
"flight_status": flight_status, |
|
|
|
|
|
"performance_leap_magnitude": performance_leap_magnitude, |
|
|
"commercialization_timeline": commercialization_timeline |
|
|
}) |
|
|
return data |
|
|
|
|
|
|
|
|
def generate_esg_data(self, date_obj): |
|
|
""" |
|
|
Product 3: ESG Impact & Greenwashing Detector |
|
|
Columns: company, date, esg_claims, verifiable_actions, greenwashing_index, |
|
|
regulatory_risk, stakeholder_score, impact_verified, premium_insight, |
|
|
claims_psi, reality_psi, greenwashing_gap_pct, |
|
|
# ML FEATURES |
|
|
audit_gap_size, supplier_esg_score, employee_whistleblower_count, |
|
|
carbon_credit_validity_score |
|
|
""" |
|
|
companies = ["Tesla", "ExxonMobil", "Unilever", "BlackRock", "Patagonia"] |
|
|
|
|
|
data = [] |
|
|
for co in companies: |
|
|
claims = random.randint(10, 50) |
|
|
verified = int(claims * random.uniform(0.2, 0.9)) |
|
|
|
|
|
|
|
|
greenwashing_index = int((1 - (verified/claims)) * 100) |
|
|
reg_risk = "High" if greenwashing_index > 60 else "Medium" if greenwashing_index > 30 else "Low" |
|
|
stakeholder_score = random.randint(40, 95) |
|
|
impact_verified = f"{int((verified/claims)*100)}%" |
|
|
|
|
|
|
|
|
claims_psi = 100 |
|
|
reality_psi = int((verified/claims) * 100) |
|
|
greenwashing_gap_pct = claims_psi - reality_psi |
|
|
|
|
|
if greenwashing_index > 70: |
|
|
insight = f"High greenwashing risk - {100-int((verified/claims)*100)}% of claims lack verification" |
|
|
elif stakeholder_score > 85: |
|
|
insight = "Strong stakeholder alignment driving brand equity" |
|
|
else: |
|
|
insight = "Strong on operations but weak on supply chain transparency" |
|
|
|
|
|
|
|
|
audit_gap_size = claims - verified |
|
|
supplier_esg_score = random.randint(0, 100) |
|
|
employee_whistleblower_count = random.randint(0, 5) |
|
|
carbon_credit_validity_score = random.randint(0, 100) |
|
|
|
|
|
data.append({ |
|
|
"company": co, |
|
|
"date": date_obj.strftime("%Y-%m-%d"), |
|
|
"esg_claims": claims, |
|
|
"verifiable_actions": verified, |
|
|
"greenwashing_index": greenwashing_index, |
|
|
"regulatory_risk": reg_risk, |
|
|
"stakeholder_score": stakeholder_score, |
|
|
"impact_verified": impact_verified, |
|
|
"premium_insight": insight, |
|
|
"claims_psi": claims_psi, |
|
|
"reality_psi": reality_psi, |
|
|
"greenwashing_gap_pct": greenwashing_gap_pct, |
|
|
|
|
|
"audit_gap_size": audit_gap_size, |
|
|
"supplier_esg_score": supplier_esg_score, |
|
|
"employee_whistleblower_count": employee_whistleblower_count, |
|
|
"carbon_credit_validity_score": carbon_credit_validity_score |
|
|
}) |
|
|
return data |
|
|
|
|
|
|
|
|
def generate_regulatory_data(self, date_obj): |
|
|
""" |
|
|
Product 4: Regulatory Compliance Prediction |
|
|
Columns: company, date, enforcement_probability, compliance_gap, fines_estimate, |
|
|
remediation_cost, whistleblower_risk, regulatory_foresight, premium_insight, |
|
|
enforcement_probability_pct, fine_impact_usd, |
|
|
# ML FEATURES |
|
|
action_timeline_days |
|
|
""" |
|
|
companies = ["Meta", "Coinbase", "Amazon", "Pfizer", "Goldman Sachs"] |
|
|
|
|
|
data = [] |
|
|
for co in companies: |
|
|
enf_prob = random.randint(10, 90) |
|
|
gap = "Large" if enf_prob > 70 else "Medium" if enf_prob > 40 else "Small" |
|
|
fines = f"${random.randint(10, 5000)}M" |
|
|
remediation = f"${random.randint(5, 1000)}M" |
|
|
whistleblower = "High" if enf_prob > 60 else "Low" |
|
|
foresight = random.randint(20, 90) |
|
|
|
|
|
|
|
|
enforcement_probability_pct = enf_prob |
|
|
fine_impact_usd = random.randint(10, 5000) * 1000000 |
|
|
|
|
|
if enf_prob > 75: |
|
|
insight = "High risk of antitrust action - compliance gaps significant" |
|
|
elif foresight > 80: |
|
|
insight = "Proactive compliance strategy mitigating sector risks" |
|
|
else: |
|
|
insight = "Moderate risk - improving compliance but scrutiny remains" |
|
|
|
|
|
|
|
|
action_timeline_days = random.randint(30, 180) |
|
|
|
|
|
data.append({ |
|
|
"company": co, |
|
|
"date": date_obj.strftime("%Y-%m-%d"), |
|
|
"enforcement_probability": f"{enf_prob}%", |
|
|
"compliance_gap": gap, |
|
|
"fines_estimate": fines, |
|
|
"remediation_cost": remediation, |
|
|
"whistleblower_risk": whistleblower, |
|
|
"regulatory_foresight": foresight, |
|
|
"premium_insight": insight, |
|
|
"enforcement_probability_pct": enforcement_probability_pct, |
|
|
"fine_impact_usd": fine_impact_usd, |
|
|
|
|
|
"action_timeline_days": action_timeline_days |
|
|
}) |
|
|
return data |
|
|
|
|
|
|
|
|
def generate_supply_chain_data(self, date_obj): |
|
|
""" |
|
|
Product 5: Supply Chain Resilience |
|
|
Columns: company, date, disruption_risk, recovery_days, single_point_failure, |
|
|
cost_inflation, resilience_score, premium_insight, |
|
|
disruption_probability, days_to_impact, |
|
|
# ML FEATURES |
|
|
impact_revenue_pct |
|
|
""" |
|
|
companies = ["Apple", "Ford", "Nike", "Toyota", "Samsung"] |
|
|
|
|
|
data = [] |
|
|
for co in companies: |
|
|
risk = random.randint(10, 80) |
|
|
recovery = int(risk * 0.6) |
|
|
failure_pt = "High" if risk > 60 else "Medium" if risk > 30 else "Low" |
|
|
inflation = f"{round(random.uniform(1.0, 15.0), 1)}%" |
|
|
resilience = 100 - risk |
|
|
|
|
|
|
|
|
disruption_probability = risk |
|
|
days_to_impact = random.randint(5, 60) |
|
|
|
|
|
if risk > 60: |
|
|
insight = "High battery/chip supply risk - alternative suppliers needed urgently" |
|
|
elif resilience > 75: |
|
|
insight = "Strong supplier diversification but regional dependency remains" |
|
|
else: |
|
|
insight = "Stable supply chain with moderate inflationary pressure" |
|
|
|
|
|
|
|
|
impact_revenue_pct = random.uniform(0.5, 5.0) |
|
|
|
|
|
data.append({ |
|
|
"company": co, |
|
|
"date": date_obj.strftime("%Y-%m-%d"), |
|
|
"disruption_risk": risk, |
|
|
"recovery_days": recovery, |
|
|
"single_point_failure": failure_pt, |
|
|
"cost_inflation": inflation, |
|
|
"resilience_score": resilience, |
|
|
"premium_insight": insight, |
|
|
"disruption_probability": disruption_probability, |
|
|
"days_to_impact": days_to_impact, |
|
|
|
|
|
"impact_revenue_pct": impact_revenue_pct |
|
|
}) |
|
|
return data |
|
|
|
|
|
def run_pipeline(self): |
|
|
"""Run the full data pipeline (Backfill + Update).""" |
|
|
logger.info("Starting Premium Data Engine Pipeline...") |
|
|
|
|
|
|
|
|
files = { |
|
|
"fintech": "fintech_growth_digest.csv", |
|
|
"ai_talent": "ai_talent_heatmap.csv", |
|
|
"esg": "esg_sentiment_tracker.csv", |
|
|
"regulatory": "regulatory_risk_index.csv", |
|
|
"supply_chain": "supply_chain_risk.csv" |
|
|
} |
|
|
|
|
|
total_added_bytes = 0 |
|
|
details = {} |
|
|
|
|
|
for key, generator in self.verticals.items(): |
|
|
base_filename = files[key].replace('.csv', '') |
|
|
|
|
|
|
|
|
full_df = pd.DataFrame() |
|
|
|
|
|
|
|
|
|
|
|
yearly_path = os.path.join(DATA_DIR, f"{base_filename}_2025_yearly.csv") |
|
|
|
|
|
if not os.path.exists(yearly_path): |
|
|
logger.info(f"Backfilling {key} (365 days)...") |
|
|
dates = self.generate_date_range(365) |
|
|
all_data = [] |
|
|
for d in dates: |
|
|
all_data.extend(generator(d)) |
|
|
full_df = pd.DataFrame(all_data) |
|
|
else: |
|
|
logger.info(f"Updating {key} (Daily)...") |
|
|
|
|
|
full_df = pd.read_csv(yearly_path) |
|
|
|
|
|
|
|
|
today = datetime.now() |
|
|
today_str = today.strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
if today_str not in full_df['date'].values: |
|
|
today_data = generator(today) |
|
|
new_row = pd.DataFrame(today_data) |
|
|
full_df = pd.concat([full_df, new_row], ignore_index=True) |
|
|
|
|
|
|
|
|
|
|
|
full_df['date'] = pd.to_datetime(full_df['date']) |
|
|
|
|
|
|
|
|
df_2025 = full_df[full_df['date'].dt.year == 2025] |
|
|
if not df_2025.empty: |
|
|
df_2025.to_csv(yearly_path, index=False) |
|
|
details[f"{base_filename}_2025_yearly.csv"] = os.path.getsize(yearly_path) |
|
|
|
|
|
|
|
|
for q in [1, 2, 3, 4]: |
|
|
df_q = df_2025[df_2025['date'].dt.quarter == q] |
|
|
if not df_q.empty: |
|
|
q_path = os.path.join(DATA_DIR, f"{base_filename}_2025_q{q}.csv") |
|
|
df_q.to_csv(q_path, index=False) |
|
|
details[f"{base_filename}_2025_q{q}.csv"] = os.path.getsize(q_path) |
|
|
|
|
|
|
|
|
|
|
|
legacy_path = os.path.join(DATA_DIR, files[key]) |
|
|
full_df.to_csv(legacy_path, index=False) |
|
|
|
|
|
return self.finalize_status() |
|
|
|
|
|
def finalize_status(self): |
|
|
|
|
|
total_size = sum(os.path.getsize(os.path.join(DATA_DIR, f)) for f in os.listdir(DATA_DIR) if f.endswith('.csv')) |
|
|
|
|
|
|
|
|
import json |
|
|
status = { |
|
|
"last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC"), |
|
|
"total_data_size_bytes": total_size, |
|
|
"status": "Premium Data Pipeline Active" |
|
|
} |
|
|
with open(os.path.join(DATA_DIR, "status.json"), "w") as f: |
|
|
json.dump(status, f) |
|
|
return status |
|
|
|
|
|
def update_dataset(): |
|
|
engine = PremiumDataEngine() |
|
|
|
|
|
|
|
|
before_sizes = {} |
|
|
for f in os.listdir(DATA_DIR): |
|
|
if f.endswith(".csv"): |
|
|
before_sizes[f] = os.path.getsize(os.path.join(DATA_DIR, f)) |
|
|
|
|
|
engine.run_pipeline() |
|
|
|
|
|
|
|
|
total_added = 0 |
|
|
details = {} |
|
|
for f in os.listdir(DATA_DIR): |
|
|
if f.endswith(".csv"): |
|
|
new = os.path.getsize(os.path.join(DATA_DIR, f)) |
|
|
old = before_sizes.get(f, 0) |
|
|
diff = new - old |
|
|
if diff > 0: |
|
|
total_added += diff |
|
|
details[f] = diff |
|
|
|
|
|
|
|
|
import json |
|
|
status_path = os.path.join(DATA_DIR, "status.json") |
|
|
if os.path.exists(status_path): |
|
|
with open(status_path, 'r') as f: |
|
|
st = json.load(f) |
|
|
st['total_added_bytes'] = total_added |
|
|
st['details'] = details |
|
|
with open(status_path, 'w') as f: |
|
|
json.dump(st, f) |
|
|
|
|
|
return total_added |
|
|
|
|
|
if __name__ == "__main__": |
|
|
update_dataset() |
|
|
|