import asyncio import random from datetime import datetime, timedelta, timezone from database import init_db from models import Employee, Anomaly, MitreMapping, MitigationStrategy async def generate_data(): print("Starting data generation...") await init_db() # 1. Ensure we have some employees employees = await Employee.find_all().to_list() if len(employees) < 5: print("Creating dummy employees...") departments = ["Engineering", "Sales", "HR", "Finance", "Marketing"] roles = ["Developer", "Manager", "Analyst", "Director", "Specialist"] for i in range(5): emp = Employee( employee_id=f"DUMMY_{i+100}", name=f"Dummy Employee {i+1}", email=f"dummy{i+1}@company.com", department=random.choice(departments), role=random.choice(roles), baseline_location="New York, US", is_isolated=False ) await emp.create() employees.append(emp) print(f"Working with {len(employees)} employees") # 2. Generate Anomalies risk_levels = [ {"level": "low", "score_range": (0, 39), "types": ["policy_violation", "minor_access"]}, {"level": "medium", "score_range": (40, 59), "types": ["unusual_login", "suspicious_download"]}, {"level": "high", "score_range": (60, 79), "types": ["privilege_escalation", "mass_deletion"]}, {"level": "critical", "score_range": (80, 100), "types": ["data_exfiltration", "malware_activity"]} ] # Generate ~50 anomalies over 30 days anomalies_created = 0 now = datetime.now(timezone.utc) for _ in range(50): emp = random.choice(employees) # Weighted random choice for risk level (fewer criticals, more low/medium) risk_config = random.choices( risk_levels, weights=[40, 30, 20, 10], # 40% low, 30% medium, 20% high, 10% critical k=1 )[0] score = random.randint(*risk_config["score_range"]) anom_type = random.choice(risk_config["types"]) # Random time in last 30 days days_ago = random.randint(0, 30) detected_at = now - timedelta(days=days_ago, hours=random.randint(0, 23)) # Status: Older ones resolved, newer ones open status = "resolved" if days_ago > 7 else "open" anomaly = Anomaly( employee_id=emp.id, anomaly_score=-1.0, # Dummy raw score risk_level=risk_config["level"], risk_score=score, description=f"Generated {risk_config['level']} risk anomaly: {anom_type}", anomaly_type=anom_type, status=status, detected_at=detected_at, top_features=[{"feature": "dummy_feature", "value": 0.0, "description": "Simulated value"}] ) await anomaly.create() anomalies_created += 1 # Add a MITRE mapping for realism await MitreMapping( anomaly_id=anomaly.id, technique_id="T1078", technique_name="Valid Accounts", tactic="Defense Evasion", description="Simulated usage of valid accounts", confidence=0.8 ).create() print(f"✅ Successfully generated {anomalies_created} anomalies.") if __name__ == "__main__": asyncio.run(generate_data())