| import os |
| import pandas as pd |
| import time |
| from sqlalchemy import create_engine, Column, String, Integer, Float, DateTime, inspect, MetaData |
| from sqlalchemy.orm import declarative_base |
| from sqlalchemy.exc import SQLAlchemyError |
|
|
| DATABASE_URL = os.getenv('DATABASE_URL') |
| engine = create_engine(DATABASE_URL) |
|
|
|
|
| def get_df_from_csv(csv_file_path): |
| df = pd.read_csv(csv_file_path) |
| return df |
|
|
| def get_schema_from_df(df): |
| schema = pd.io.json.build_table_schema(df) |
| return schema |
|
|
| def create_table_from_schema(table_name, schema): |
| Base = declarative_base() |
| |
| inspector = inspect(engine) |
| metadata = MetaData() |
| metadata.reflect(bind=engine) |
|
|
| |
| if table_name in inspector.get_table_names(): |
| existing_columns = {column['name']: column['type'] for column in inspector.get_columns(table_name)} |
| new_columns = {field['name']: field['type'] for field in schema['fields']} |
| |
| if existing_columns == new_columns: |
| print(f"Table '{table_name}' with the same schema already exists. Skipping creation.") |
| return |
| else: |
| print(f"Table '{table_name}' exists but has a different schema. Creating a new table with a timestamp suffix.") |
| table_name = f"{table_name}_{int(time.time())}" |
|
|
| class DynamicTable(Base): |
| __tablename__ = table_name |
| |
| id = Column(Integer, primary_key=True) |
| |
| for column in schema['fields']: |
| if column['name'] != 'id': |
| if column['type'] == 'integer': |
| locals()[column['name']] = Column(Integer) |
| elif column['type'] == 'number': |
| locals()[column['name']] = Column(Float) |
| elif column['type'] == 'datetime': |
| locals()[column['name']] = Column(DateTime) |
| else: |
| locals()[column['name']] = Column(String) |
| |
| try: |
| Base.metadata.create_all(engine) |
| print(f"Table '{table_name}' created successfully.") |
| except SQLAlchemyError as e: |
| print(f"Error creating table: {str(e)}") |
|
|
|
|
| def save_data_to_table(table_name, df): |
| try: |
| df.to_sql(table_name, engine, if_exists='append') |
| except SQLAlchemyError as e: |
| print(f"Error saving data to table: {str(e)}") |
| |
|
|
|
|
| if __name__ == "__main__": |
| filename = 'funder_dataset_300_rows.csv' |
| df = get_df_from_csv(filename) |
| schema = get_schema_from_df(df) |
| table_name = filename.split('.')[0] |
| |
| create_table_from_schema(table_name, schema) |
| save_data_to_table(table_name, df) |
|
|
|
|
|
|