File size: 8,160 Bytes
76317bb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | # -*- coding: utf-8 -*-
"""processing.py
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/13EcoLMljb9XzVBELmFC0EBDknuHS79Vy
"""
# processing.py
# Functions for processing QuantConnect JSON data.
import json
import pandas as pd
import traceback
import numpy as np
from utils import get_nested_value, process_timeseries_chart # Import helpers
def process_single_file(file_path):
"""
Processes a single QuantConnect JSON file.
Extracts statistics, equity, drawdown, benchmark, trades, exposure, and turnover data.
Returns a dictionary containing processed dataframes and series.
"""
# Extract filename from the full path
filename = file_path.split('/')[-1] if file_path else "Unknown File"
# Initialize results dictionary with default empty structures
results = {
"filename": filename,
"stats_df": pd.DataFrame(columns=['Metric', 'Value']), # Overall statistics
"equity_df": pd.DataFrame(), # Equity curve data (with 'Time' column)
"daily_returns": None, # Series of daily percentage returns (DatetimeIndex)
"drawdown_df": pd.DataFrame(), # Drawdown curve data (with 'Time' column)
"benchmark_df": pd.DataFrame(),# Benchmark data (with 'Time' column)
"trades_df": pd.DataFrame(), # Closed trades data
"exposure_series": None, # Raw exposure data series (often needs further processing for plotting)
"turnover_df": pd.DataFrame(), # Portfolio turnover data (with 'Time' column)
"error": None # Stores any error message during processing
}
try:
# Open and load the JSON file
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# --- Extract Statistics ---
# Try primary location, then fallback location for statistics
stats_dict = get_nested_value(data, ['statistics']) or \
get_nested_value(data, ['totalPerformance', 'portfolioStatistics'])
if stats_dict:
# Convert dictionary to DataFrame
results["stats_df"] = pd.DataFrame(list(stats_dict.items()), columns=['Metric', 'Value'])
# --- Process Equity Curve and Calculate Daily Returns ---
equity_values = get_nested_value(data, ['charts', 'Strategy Equity', 'series', 'Equity', 'values'])
equity_df_indexed = process_timeseries_chart(equity_values, 'Equity') # Gets DF with DatetimeIndex
if not equity_df_indexed.empty:
# Store equity curve with 'Time' as a column for easier plotting
results["equity_df"] = equity_df_indexed.reset_index()
# Calculate daily percentage returns from the indexed equity data
returns_series = equity_df_indexed['Equity'].pct_change().dropna()
# Store the returns series if calculation was successful
if not returns_series.empty:
results["daily_returns"] = returns_series # Has DatetimeIndex (UTC)
# --- Process Drawdown Curve ---
drawdown_values = get_nested_value(data, ['charts', 'Drawdown', 'series', 'Equity Drawdown', 'values'])
drawdown_df_indexed = process_timeseries_chart(drawdown_values, 'Drawdown')
if not drawdown_df_indexed.empty:
results["drawdown_df"] = drawdown_df_indexed.reset_index() # Store with 'Time' column
# --- Process Benchmark Curve ---
benchmark_values = get_nested_value(data, ['charts', 'Benchmark', 'series', 'Benchmark', 'values'])
benchmark_df_indexed = process_timeseries_chart(benchmark_values, 'Benchmark')
if not benchmark_df_indexed.empty:
results["benchmark_df"] = benchmark_df_indexed.reset_index() # Store with 'Time' column
# --- Process Closed Trades ---
closed_trades_list = get_nested_value(data, ['totalPerformance', 'closedTrades'])
if closed_trades_list and isinstance(closed_trades_list, list):
temp_trades_df = pd.DataFrame(closed_trades_list)
if not temp_trades_df.empty:
# Convert relevant columns to numeric, coercing errors
numeric_cols = ['profitLoss', 'entryPrice', 'exitPrice', 'quantity', 'totalFees']
for col in numeric_cols:
if col in temp_trades_df.columns:
temp_trades_df[col] = pd.to_numeric(temp_trades_df[col], errors='coerce')
# Convert time columns to datetime, coercing errors
time_cols = ['entryTime', 'exitTime']
for col in time_cols:
if col in temp_trades_df.columns:
# Attempt conversion, handle potential ISO 8601 format with timezone
try:
temp_trades_df[col] = pd.to_datetime(temp_trades_df[col], errors='coerce', utc=True)
except ValueError: # Fallback if direct conversion fails
temp_trades_df[col] = pd.to_datetime(temp_trades_df[col].str.slice(0, 19), errors='coerce') # Try without timezone
if temp_trades_df[col].notna().any(): # If some converted, make timezone naive for consistency before duration calc
temp_trades_df[col] = temp_trades_df[col].dt.tz_localize(None)
# Calculate trade duration if both entry and exit times are valid datetimes
if 'entryTime' in temp_trades_df.columns and 'exitTime' in temp_trades_df.columns and \
pd.api.types.is_datetime64_any_dtype(temp_trades_df['entryTime']) and \
pd.api.types.is_datetime64_any_dtype(temp_trades_df['exitTime']) and \
not temp_trades_df['entryTime'].isnull().all() and \
not temp_trades_df['exitTime'].isnull().all():
# Make times timezone-naive for direct subtraction if they have timezones
if temp_trades_df['entryTime'].dt.tz is not None:
temp_trades_df['entryTime'] = temp_trades_df['entryTime'].dt.tz_convert(None)
if temp_trades_df['exitTime'].dt.tz is not None:
temp_trades_df['exitTime'] = temp_trades_df['exitTime'].dt.tz_convert(None)
# Calculate duration as timedelta and in days
temp_trades_df['duration_td'] = temp_trades_df['exitTime'] - temp_trades_df['entryTime']
temp_trades_df['duration_days'] = temp_trades_df['duration_td'].dt.total_seconds() / (24 * 60 * 60)
else:
# Set duration columns to None if times are invalid/missing
temp_trades_df['duration_td'] = pd.NaT
temp_trades_df['duration_days'] = np.nan
# Store the processed trades DataFrame
results["trades_df"] = temp_trades_df
# --- Extract Exposure Series Data ---
# Note: This is often nested and might need specific parsing for plotting
results["exposure_series"] = get_nested_value(data, ['charts', 'Exposure', 'series'])
# --- Process Portfolio Turnover ---
turnover_values = get_nested_value(data, ['charts', 'Portfolio Turnover', 'series', 'Portfolio Turnover', 'values'])
turnover_df_indexed = process_timeseries_chart(turnover_values, 'Turnover')
if not turnover_df_indexed.empty:
results["turnover_df"] = turnover_df_indexed.reset_index() # Store with 'Time' column
except FileNotFoundError:
error_msg = f"Error: File not found at {file_path}"
print(error_msg)
results["error"] = error_msg
except json.JSONDecodeError:
error_msg = f"Error: Could not decode JSON from {filename}"
print(error_msg)
results["error"] = error_msg
except Exception as e:
# Catch any other unexpected errors during processing
error_msg = f"Error processing file {filename}: {e}"
print(error_msg)
traceback.print_exc()
results["error"] = error_msg
return results |