firermsdata-agent / feed_data.py
shahf8604's picture
Feed tables data into databse table
73f3860
import asyncio
from datetime import time, timedelta, datetime
import pandas as pd
import numpy as np
from sqlalchemy import select
from src.configs import DatabaseConfig
from src.entities.incident import IncidentModel
from src.entities.apparatus import ApparatusModel
from src.entities.personnel import PersonnelModel
from src.entities.auv_incidentbase import IncidentBaseModel
from src.entities.auv_incidentapparatus import IncidentApparatusModel
from src.entities.auv_incidentpersonnel import IncidentPersonnelModel
EXCEL_FILE = "ProjectTablesData.xlsx"
EXCEL_SHEET_MODEL_MAP = {
0: IncidentModel,
1: ApparatusModel,
2: PersonnelModel,
3: IncidentBaseModel,
4: IncidentApparatusModel,
5: IncidentPersonnelModel
}
def clean_value(value):
# Handle NaN
try:
if pd.isna(value):
return None
except (TypeError, ValueError):
pass
# Handle pandas Timestamp → return datetime object (NOT string)
if isinstance(value, pd.Timestamp):
if value == pd.Timestamp("1900-01-01"): # Excel empty date default
return None
return value.to_pydatetime() # ✅ Return actual datetime, not string
# Convert Python time -> string
if isinstance(value, time):
return value.strftime("%H:%M:%S")
# Convert timedelta -> string
if isinstance(value, timedelta):
total_seconds = int(value.total_seconds())
h = total_seconds // 3600
m = (total_seconds % 3600) // 60
s = total_seconds % 60
return f"{h:02}:{m:02}:{s:02}"
# Convert numpy types
if isinstance(value, np.generic):
return value.item()
# Strip strings
if isinstance(value, str):
value = value.strip()
return value if value else None
return value
def get_model_columns(model):
"""Extract SQLAlchemy column names"""
return {column.name for column in model.__table__.columns}
def get_primary_keys(model):
"""Get primary key column names"""
return [column.name for column in model.__table__.primary_key.columns]
def row_to_model(model, row):
"""Convert pandas row → SQLAlchemy model"""
model_columns = get_model_columns(model)
data = {}
for column in model_columns:
if column in row:
data[column] = clean_value(row[column])
return model(**data)
async def get_existing_primary_keys(session, model, pk_column):
"""
Fetch existing primary keys from DB
"""
result = await session.execute(
select(getattr(model, pk_column))
)
return {row[0] for row in result.fetchall()}
async def ingest_excel():
print("Loading Excel file...")
excel_data = pd.read_excel(EXCEL_FILE, sheet_name=None)
async with DatabaseConfig.async_session() as session:
total_inserted = 0
for sheet_index, model in EXCEL_SHEET_MODEL_MAP.items():
sheet_name = list(excel_data.keys())[sheet_index]
print(f"\nProcessing Sheet: {sheet_name}")
print(f"Mapped Model: {model.__name__}")
df = excel_data[sheet_name]
pk_columns = get_primary_keys(model)
existing_keys = set()
seen_keys = set()
if pk_columns:
pk_column = pk_columns[0]
existing_keys = await get_existing_primary_keys(session, model, pk_column)
objects = []
skipped = 0
for _, row in df.iterrows():
obj = row_to_model(model, row)
if pk_columns:
pk_value = getattr(obj, pk_columns[0], None)
if pk_value in existing_keys or pk_value in seen_keys:
skipped += 1
continue
seen_keys.add(pk_value)
objects.append(obj)
session.add_all(objects)
inserted = len(objects)
print(f"Inserted: {inserted}")
print(f"Skipped duplicates: {skipped}")
total_inserted += inserted
await session.commit()
print("\nTotal rows inserted:", total_inserted)
if __name__ == "__main__":
asyncio.run(ingest_excel())