|
|
import os |
|
|
import duckdb |
|
|
import shutil |
|
|
import pandas as pd |
|
|
|
|
|
def push_to_duckdb(duckdb_path, parquet_path, temp_parquet_path="temp/duckdb_temp_parquet"): |
|
|
""" |
|
|
Push the aggregated data from a Parquet directory to DuckDB. |
|
|
|
|
|
Args: |
|
|
duckdb_path (str): Path to the DuckDB database file |
|
|
parquet_path (str): Path to the Parquet directory containing the aggregated data |
|
|
temp_parquet_path (str): Temporary path for storing Parquet files |
|
|
""" |
|
|
|
|
|
if not isinstance(parquet_path, str): |
|
|
raise ValueError(f"parquet_path must be a string, got {type(parquet_path)}: {parquet_path}") |
|
|
if not os.path.exists(parquet_path): |
|
|
raise FileNotFoundError(f"Parquet directory does not exist at {parquet_path}") |
|
|
if not os.path.isdir(parquet_path): |
|
|
raise ValueError(f"parquet_path must be a directory, got a file at {parquet_path}") |
|
|
|
|
|
|
|
|
if os.path.exists(temp_parquet_path): |
|
|
shutil.rmtree(temp_parquet_path) |
|
|
os.makedirs(temp_parquet_path, exist_ok=True) |
|
|
|
|
|
|
|
|
try: |
|
|
shutil.copytree(parquet_path, temp_parquet_path, dirs_exist_ok=True) |
|
|
print(f"Copied Parquet directory from {parquet_path} to {temp_parquet_path}") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to copy Parquet directory from {parquet_path} to {temp_parquet_path}: {e}") |
|
|
|
|
|
|
|
|
directory = os.path.dirname(duckdb_path) |
|
|
if directory and not os.path.exists(directory): |
|
|
os.makedirs(directory) |
|
|
|
|
|
con = duckdb.connect(duckdb_path) |
|
|
|
|
|
|
|
|
try: |
|
|
con.execute(f""" |
|
|
CREATE OR REPLACE TABLE aggregated_financial_data AS |
|
|
SELECT * FROM parquet_scan('{temp_parquet_path}/*.parquet') |
|
|
""") |
|
|
print(f"Successfully loaded data into DuckDB table from {temp_parquet_path}") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to load Parquet files into DuckDB: {e}") |
|
|
finally: |
|
|
con.close() |
|
|
|
|
|
|
|
|
if os.path.exists(temp_parquet_path): |
|
|
shutil.rmtree(temp_parquet_path) |
|
|
print(f"Cleaned up temporary directory {temp_parquet_path}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
from process_data import process_financial_data |
|
|
duckdb_path = "duckdb_databases/financial_data.db" |
|
|
parquet_path = process_financial_data() |
|
|
|
|
|
try: |
|
|
push_to_duckdb(duckdb_path, parquet_path) |
|
|
except Exception as e: |
|
|
print(f"Error pushing to DuckDB: {e}") |
|
|
|
|
|
|