# -*- coding: utf-8 -*- """processing.py Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/13EcoLMljb9XzVBELmFC0EBDknuHS79Vy """ # processing.py # Functions for processing QuantConnect JSON data. import json import pandas as pd import traceback import numpy as np from utils import get_nested_value, process_timeseries_chart # Import helpers def process_single_file(file_path): """ Processes a single QuantConnect JSON file. Extracts statistics, equity, drawdown, benchmark, trades, exposure, and turnover data. Returns a dictionary containing processed dataframes and series. """ # Extract filename from the full path filename = file_path.split('/')[-1] if file_path else "Unknown File" # Initialize results dictionary with default empty structures results = { "filename": filename, "stats_df": pd.DataFrame(columns=['Metric', 'Value']), # Overall statistics "equity_df": pd.DataFrame(), # Equity curve data (with 'Time' column) "daily_returns": None, # Series of daily percentage returns (DatetimeIndex) "drawdown_df": pd.DataFrame(), # Drawdown curve data (with 'Time' column) "benchmark_df": pd.DataFrame(),# Benchmark data (with 'Time' column) "trades_df": pd.DataFrame(), # Closed trades data "exposure_series": None, # Raw exposure data series (often needs further processing for plotting) "turnover_df": pd.DataFrame(), # Portfolio turnover data (with 'Time' column) "error": None # Stores any error message during processing } try: # Open and load the JSON file with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # --- Extract Statistics --- # Try primary location, then fallback location for statistics stats_dict = get_nested_value(data, ['statistics']) or \ get_nested_value(data, ['totalPerformance', 'portfolioStatistics']) if stats_dict: # Convert dictionary to DataFrame results["stats_df"] = pd.DataFrame(list(stats_dict.items()), columns=['Metric', 'Value']) # --- Process Equity Curve and Calculate Daily Returns --- equity_values = get_nested_value(data, ['charts', 'Strategy Equity', 'series', 'Equity', 'values']) equity_df_indexed = process_timeseries_chart(equity_values, 'Equity') # Gets DF with DatetimeIndex if not equity_df_indexed.empty: # Store equity curve with 'Time' as a column for easier plotting results["equity_df"] = equity_df_indexed.reset_index() # Calculate daily percentage returns from the indexed equity data returns_series = equity_df_indexed['Equity'].pct_change().dropna() # Store the returns series if calculation was successful if not returns_series.empty: results["daily_returns"] = returns_series # Has DatetimeIndex (UTC) # --- Process Drawdown Curve --- drawdown_values = get_nested_value(data, ['charts', 'Drawdown', 'series', 'Equity Drawdown', 'values']) drawdown_df_indexed = process_timeseries_chart(drawdown_values, 'Drawdown') if not drawdown_df_indexed.empty: results["drawdown_df"] = drawdown_df_indexed.reset_index() # Store with 'Time' column # --- Process Benchmark Curve --- benchmark_values = get_nested_value(data, ['charts', 'Benchmark', 'series', 'Benchmark', 'values']) benchmark_df_indexed = process_timeseries_chart(benchmark_values, 'Benchmark') if not benchmark_df_indexed.empty: results["benchmark_df"] = benchmark_df_indexed.reset_index() # Store with 'Time' column # --- Process Closed Trades --- closed_trades_list = get_nested_value(data, ['totalPerformance', 'closedTrades']) if closed_trades_list and isinstance(closed_trades_list, list): temp_trades_df = pd.DataFrame(closed_trades_list) if not temp_trades_df.empty: # Convert relevant columns to numeric, coercing errors numeric_cols = ['profitLoss', 'entryPrice', 'exitPrice', 'quantity', 'totalFees'] for col in numeric_cols: if col in temp_trades_df.columns: temp_trades_df[col] = pd.to_numeric(temp_trades_df[col], errors='coerce') # Convert time columns to datetime, coercing errors time_cols = ['entryTime', 'exitTime'] for col in time_cols: if col in temp_trades_df.columns: # Attempt conversion, handle potential ISO 8601 format with timezone try: temp_trades_df[col] = pd.to_datetime(temp_trades_df[col], errors='coerce', utc=True) except ValueError: # Fallback if direct conversion fails temp_trades_df[col] = pd.to_datetime(temp_trades_df[col].str.slice(0, 19), errors='coerce') # Try without timezone if temp_trades_df[col].notna().any(): # If some converted, make timezone naive for consistency before duration calc temp_trades_df[col] = temp_trades_df[col].dt.tz_localize(None) # Calculate trade duration if both entry and exit times are valid datetimes if 'entryTime' in temp_trades_df.columns and 'exitTime' in temp_trades_df.columns and \ pd.api.types.is_datetime64_any_dtype(temp_trades_df['entryTime']) and \ pd.api.types.is_datetime64_any_dtype(temp_trades_df['exitTime']) and \ not temp_trades_df['entryTime'].isnull().all() and \ not temp_trades_df['exitTime'].isnull().all(): # Make times timezone-naive for direct subtraction if they have timezones if temp_trades_df['entryTime'].dt.tz is not None: temp_trades_df['entryTime'] = temp_trades_df['entryTime'].dt.tz_convert(None) if temp_trades_df['exitTime'].dt.tz is not None: temp_trades_df['exitTime'] = temp_trades_df['exitTime'].dt.tz_convert(None) # Calculate duration as timedelta and in days temp_trades_df['duration_td'] = temp_trades_df['exitTime'] - temp_trades_df['entryTime'] temp_trades_df['duration_days'] = temp_trades_df['duration_td'].dt.total_seconds() / (24 * 60 * 60) else: # Set duration columns to None if times are invalid/missing temp_trades_df['duration_td'] = pd.NaT temp_trades_df['duration_days'] = np.nan # Store the processed trades DataFrame results["trades_df"] = temp_trades_df # --- Extract Exposure Series Data --- # Note: This is often nested and might need specific parsing for plotting results["exposure_series"] = get_nested_value(data, ['charts', 'Exposure', 'series']) # --- Process Portfolio Turnover --- turnover_values = get_nested_value(data, ['charts', 'Portfolio Turnover', 'series', 'Portfolio Turnover', 'values']) turnover_df_indexed = process_timeseries_chart(turnover_values, 'Turnover') if not turnover_df_indexed.empty: results["turnover_df"] = turnover_df_indexed.reset_index() # Store with 'Time' column except FileNotFoundError: error_msg = f"Error: File not found at {file_path}" print(error_msg) results["error"] = error_msg except json.JSONDecodeError: error_msg = f"Error: Could not decode JSON from {filename}" print(error_msg) results["error"] = error_msg except Exception as e: # Catch any other unexpected errors during processing error_msg = f"Error processing file {filename}: {e}" print(error_msg) traceback.print_exc() results["error"] = error_msg return results