ETL_pipeline / components /duckdb_api.py
heboya8's picture
Upload folder using huggingface_hub
2eee82e verified
import os
import duckdb
import shutil
import pandas as pd
def push_to_duckdb(duckdb_path, parquet_path, temp_parquet_path="temp/duckdb_temp_parquet"):
"""
Push the aggregated data from a Parquet directory to DuckDB.
Args:
duckdb_path (str): Path to the DuckDB database file
parquet_path (str): Path to the Parquet directory containing the aggregated data
temp_parquet_path (str): Temporary path for storing Parquet files
"""
# Validate input parquet_path
if not isinstance(parquet_path, str):
raise ValueError(f"parquet_path must be a string, got {type(parquet_path)}: {parquet_path}")
if not os.path.exists(parquet_path):
raise FileNotFoundError(f"Parquet directory does not exist at {parquet_path}")
if not os.path.isdir(parquet_path):
raise ValueError(f"parquet_path must be a directory, got a file at {parquet_path}")
# Ensure the temporary directory is clean before copying
if os.path.exists(temp_parquet_path):
shutil.rmtree(temp_parquet_path)
os.makedirs(temp_parquet_path, exist_ok=True)
# Copy the input Parquet directory to the temporary directory
try:
shutil.copytree(parquet_path, temp_parquet_path, dirs_exist_ok=True)
print(f"Copied Parquet directory from {parquet_path} to {temp_parquet_path}")
except Exception as e:
raise RuntimeError(f"Failed to copy Parquet directory from {parquet_path} to {temp_parquet_path}: {e}")
# Connect to DuckDB
directory = os.path.dirname(duckdb_path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
con = duckdb.connect(duckdb_path)
# Create or replace the table in DuckDB by reading the Parquet files
try:
con.execute(f"""
CREATE OR REPLACE TABLE aggregated_financial_data AS
SELECT * FROM parquet_scan('{temp_parquet_path}/*.parquet')
""")
print(f"Successfully loaded data into DuckDB table from {temp_parquet_path}")
except Exception as e:
raise RuntimeError(f"Failed to load Parquet files into DuckDB: {e}")
finally:
con.close()
# Clean up temporary Parquet directory
if os.path.exists(temp_parquet_path):
shutil.rmtree(temp_parquet_path)
print(f"Cleaned up temporary directory {temp_parquet_path}")
if __name__ == "__main__":
from process_data import process_financial_data
duckdb_path = "duckdb_databases/financial_data.db"
parquet_path = process_financial_data()
try:
push_to_duckdb(duckdb_path, parquet_path)
except Exception as e:
print(f"Error pushing to DuckDB: {e}")