import pandas as pd import asyncio import math import json from datetime import datetime, date from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from passlib.context import CryptContext from src.core.database import async_session from src.core.models import Users, Teams, Roles, UserTeamsRole, Assets, AssetStatus # --------------------------------------------- # PASSWORD HASHER # --------------------------------------------- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) # --------------------------------------------- # CLEAN NaN / EMPTY VALUES # --------------------------------------------- def clean(value): if value is None: return None if isinstance(value, float) and math.isnan(value): return None if isinstance(value, str) and value.strip() == "": return None return value # --------------------------------------------- # NORMALIZE ASSET ID (NEW) # --------------------------------------------- def normalize_asset_id(value: str) -> str: """Convert messy input to clean format YB-73-M or YB-77-MS""" if not value: return None value = value.strip().replace(" ", "") value = value.upper() while "--" in value: value = value.replace("--", "-") if "-" not in value: prefix = value[:2] number = "".join(filter(str.isdigit, value)) suffix = value[len(prefix) + len(number) :] return f"{prefix}-{number}-{suffix}" return value # --------------------------------------------- # ASSET TYPE MAP (fallback) # --------------------------------------------- ASSET_TYPE_MAP = { "M": "Monitor", "L": "Laptop", "H": "Headphone", "MS": "Mouse", } # --------------------------------------------- # get_or_create # --------------------------------------------- async def get_or_create(session: AsyncSession, model, name: str): result = await session.exec(select(model).where(model.name == name)) row = result.first() if row: return row row = model(name=name) session.add(row) await session.commit() await session.refresh(row) return row # --------------------------------------------- # PARSE ASSETS JSON # --------------------------------------------- def parse_assets_from_excel(raw_value): if raw_value is None: return [] raw = str(raw_value).strip() raw = raw.replace('""', '"') if raw.startswith('"') and raw.endswith('"'): raw = raw[1:-1] try: parsed = json.loads(raw) if isinstance(parsed, list): return parsed return [] except: return [] # --------------------------------------------- # FORMAT JOIN DATE (NEW) # --------------------------------------------- def format_join_date(value): """Always return YYYY-MM-DD string, no time.""" if value is None: return None if isinstance(value, datetime): return value.date().isoformat() # remove time if isinstance(value, date): return value.isoformat() if isinstance(value, str): value = value.strip() # Try standard formats for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%d-%m-%Y", "%m/%d/%Y"): try: d = datetime.strptime(value, fmt).date() return d.isoformat() except: continue # Fallback: return cleaned string return value return str(value) # --------------------------------------------- # MAIN IMPORT # --------------------------------------------- async def seed_from_excel(session: AsyncSession, excel_path="src/data_add/users.xlsx"): df = pd.read_excel(excel_path) print("\nšŸ“Œ Importing users from Excel...\n") for _, row in df.iterrows(): # --- DOB --- raw_dob = clean(row["dob"]) dob = None if isinstance(raw_dob, str): try: dob = datetime.strptime(raw_dob, "%d.%m.%Y").date() except: dob = None elif isinstance(raw_dob, datetime): dob = raw_dob.date() else: dob = raw_dob # --- JOIN DATE (FIXED) --- raw_join = clean(row["join_date"]) join_date = format_join_date(raw_join) # --- CREATE USER --- user = Users( email_id=row["email"], password=hash_password(row["password"]), user_name=row["User_name"], dob=dob, address=clean(row["address"]), join_date=join_date, # <--- NOW ONLY YYYY-MM-DD is_verified=True, ) session.add(user) await session.commit() await session.refresh(user) # --- TEAM / ROLE --- team = await get_or_create(session, Teams, row["team"]) role = await get_or_create(session, Roles, row["role"]) mapping = UserTeamsRole( user_id=user.id, team_id=team.id, role_id=role.id, ) session.add(mapping) # --- ASSETS JSON --- assets_list = parse_assets_from_excel(clean(row["assets"])) for asset in assets_list: raw_id = clean(asset.get("id")) raw_type = clean(asset.get("type")) if not raw_id: continue # Clean & normalize asset id (your YB-12 format) asset_id = normalize_asset_id(raw_id) asset_type = raw_type or "Unknown" # Skip duplicates (because of unique constraint asset_id + type) existing = await session.exec( select(Assets).where( Assets.asset_id == asset_id, Assets.type == asset_type ) ) if existing.first(): print(f"⚠ Skipping duplicate asset: {asset_id} ({asset_type})") continue # Insert new asset asset_obj = Assets( user_id=user.id, asset_id=asset_id, type=asset_type, status=AssetStatus.ACTIVE, ) session.add(asset_obj) await session.commit() print("\nšŸŽ‰ Excel Import Completed Successfully!\n") # --------------------------------------------- # RUN SEEDS # --------------------------------------------- async def run_all_seeds(): async with async_session() as session: print("\n🟦 Seeding TEAMS...") for t in [ "AI/Tech", "Shared Services", "Digital Marketing", "Bevolve", "Bridge", "Quilt", "HR", ]: await get_or_create(session, Teams, t) print("\n🟦 Seeding ROLES...") for r in ["Mentor", "Team Lead", "HR", "Member"]: await get_or_create(session, Roles, r) print("\n🟦 Importing USERS from Excel...") await seed_from_excel(session, "src/data_add/users.xlsx") print("\nāœ” All seeding complete!\n") if __name__ == "__main__": asyncio.run(run_all_seeds())