# scripts/process_data.py from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, DoubleType, IntegerType from pyspark.sql.functions import col, row_number, floor, first, max, min, last, sum from pyspark.sql.window import Window import os import sys import shutil import pandas as pd # Add the project root directory to the Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from minio_api.client import sign_in from minio_api.minio_utils import get_minio_data def initialize_spark_session(app_name="MinIO to Spark DataFrame", driver_memory="4g", executor_memory="4g"): return SparkSession.builder \ .appName(app_name) \ .config("spark.driver.memory", driver_memory) \ .config("spark.executor.memory", executor_memory) \ .getOrCreate() def create_dataframe_from_csv(spark, csv_lines, temp_parquet_path="temp/temp_parquet_chunks", chunk_size=int(3e+6)): os.makedirs(temp_parquet_path, exist_ok=True) schema = StructType([ StructField("Open time", LongType(), True), StructField("Open", DoubleType(), True), StructField("High", DoubleType(), True), StructField("Low", DoubleType(), True), StructField("Close", DoubleType(), True), StructField("Volume", DoubleType(), True), StructField("Close time", LongType(), True), StructField("Quote asset volume", DoubleType(), True), StructField("Number of trades", IntegerType(), True), StructField("Taker buy base asset volume", DoubleType(), True), StructField("Taker buy quote asset volume", DoubleType(), True), StructField("Ignore", IntegerType(), True) ]) if csv_lines and csv_lines[0].startswith("Open time,"): data_lines = csv_lines[1:] else: data_lines = csv_lines if os.path.exists(temp_parquet_path): shutil.rmtree(temp_parquet_path) for i in range(0, len(data_lines), chunk_size): chunk = data_lines[i:i + chunk_size] rdd_chunk = spark.sparkContext.parallelize(chunk).repartition(8) df_chunk = spark.read.schema(schema).csv(rdd_chunk, header=False) df_chunk.write.mode("append").parquet(temp_parquet_path) return spark.read.parquet(temp_parquet_path) def resample_dataframe(df, track_each=3600): keep_cols = ["Open time", "Open", "High", "Low", "Close", "Number of trades"] df = df.select(keep_cols) window_spec = Window.orderBy("Open time") df = df.withColumn("row_number", row_number().over(window_spec)) df = df.withColumn("group_id", floor((col("row_number") - 1) / track_each)) aggregations = [ first("Open time").alias("Open time"), first("Open").alias("Open"), max("High").alias("High"), min("Low").alias("Low"), last("Close").alias("Close"), sum("Number of trades").alias("Number of trades") ] aggregated_df = df.groupBy("group_id").agg(*aggregations) return aggregated_df.select("Open time", "Open", "High", "Low", "Close", "Number of trades") def process_financial_data(bucket_name="minio-ngrok-bucket", file_name="BTCUSDT-1s-2025-09.csv", temp_parquet_path="temp/temp_parquet_chunks", output_parquet_path="temp/aggregated_output"): minio_client = sign_in() spark = initialize_spark_session() try: csv_lines = get_minio_data(minio_client, bucket_name, file_name) print(f"Fetched CSV data from MinIO: {len(csv_lines)} lines") df = create_dataframe_from_csv(spark, csv_lines, temp_parquet_path) print("Created Spark DataFrame from CSV data.") aggregated_df = resample_dataframe(df) print("Resampled DataFrame with OHLC aggregations.") # Save aggregated DataFrame to a temporary Parquet directory os.makedirs(os.path.dirname(output_parquet_path), exist_ok=True) aggregated_df.write.mode("overwrite").parquet(output_parquet_path) print(f"Saved aggregated DataFrame to {output_parquet_path}") # Verify that the Parquet directory exists if not os.path.exists(output_parquet_path) or not os.path.isdir(output_parquet_path): raise FileNotFoundError(f"Parquet directory {output_parquet_path} was not created or is not a directory.") else: print(f"Verified: Parquet directory exists at {output_parquet_path}") return output_parquet_path except Exception as e: print(f"Error in process_financial_data: {e}") raise finally: spark.stop() if __name__ == "__main__": # Example usage output_parquet_path = process_financial_data() print(output_parquet_path)