RazHadas's picture
Upload 6 files
76317bb verified
# -*- coding: utf-8 -*-
"""processing.py
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/13EcoLMljb9XzVBELmFC0EBDknuHS79Vy
"""
# processing.py
# Functions for processing QuantConnect JSON data.
import json
import pandas as pd
import traceback
import numpy as np
from utils import get_nested_value, process_timeseries_chart # Import helpers
def process_single_file(file_path):
"""
Processes a single QuantConnect JSON file.
Extracts statistics, equity, drawdown, benchmark, trades, exposure, and turnover data.
Returns a dictionary containing processed dataframes and series.
"""
# Extract filename from the full path
filename = file_path.split('/')[-1] if file_path else "Unknown File"
# Initialize results dictionary with default empty structures
results = {
"filename": filename,
"stats_df": pd.DataFrame(columns=['Metric', 'Value']), # Overall statistics
"equity_df": pd.DataFrame(), # Equity curve data (with 'Time' column)
"daily_returns": None, # Series of daily percentage returns (DatetimeIndex)
"drawdown_df": pd.DataFrame(), # Drawdown curve data (with 'Time' column)
"benchmark_df": pd.DataFrame(),# Benchmark data (with 'Time' column)
"trades_df": pd.DataFrame(), # Closed trades data
"exposure_series": None, # Raw exposure data series (often needs further processing for plotting)
"turnover_df": pd.DataFrame(), # Portfolio turnover data (with 'Time' column)
"error": None # Stores any error message during processing
}
try:
# Open and load the JSON file
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# --- Extract Statistics ---
# Try primary location, then fallback location for statistics
stats_dict = get_nested_value(data, ['statistics']) or \
get_nested_value(data, ['totalPerformance', 'portfolioStatistics'])
if stats_dict:
# Convert dictionary to DataFrame
results["stats_df"] = pd.DataFrame(list(stats_dict.items()), columns=['Metric', 'Value'])
# --- Process Equity Curve and Calculate Daily Returns ---
equity_values = get_nested_value(data, ['charts', 'Strategy Equity', 'series', 'Equity', 'values'])
equity_df_indexed = process_timeseries_chart(equity_values, 'Equity') # Gets DF with DatetimeIndex
if not equity_df_indexed.empty:
# Store equity curve with 'Time' as a column for easier plotting
results["equity_df"] = equity_df_indexed.reset_index()
# Calculate daily percentage returns from the indexed equity data
returns_series = equity_df_indexed['Equity'].pct_change().dropna()
# Store the returns series if calculation was successful
if not returns_series.empty:
results["daily_returns"] = returns_series # Has DatetimeIndex (UTC)
# --- Process Drawdown Curve ---
drawdown_values = get_nested_value(data, ['charts', 'Drawdown', 'series', 'Equity Drawdown', 'values'])
drawdown_df_indexed = process_timeseries_chart(drawdown_values, 'Drawdown')
if not drawdown_df_indexed.empty:
results["drawdown_df"] = drawdown_df_indexed.reset_index() # Store with 'Time' column
# --- Process Benchmark Curve ---
benchmark_values = get_nested_value(data, ['charts', 'Benchmark', 'series', 'Benchmark', 'values'])
benchmark_df_indexed = process_timeseries_chart(benchmark_values, 'Benchmark')
if not benchmark_df_indexed.empty:
results["benchmark_df"] = benchmark_df_indexed.reset_index() # Store with 'Time' column
# --- Process Closed Trades ---
closed_trades_list = get_nested_value(data, ['totalPerformance', 'closedTrades'])
if closed_trades_list and isinstance(closed_trades_list, list):
temp_trades_df = pd.DataFrame(closed_trades_list)
if not temp_trades_df.empty:
# Convert relevant columns to numeric, coercing errors
numeric_cols = ['profitLoss', 'entryPrice', 'exitPrice', 'quantity', 'totalFees']
for col in numeric_cols:
if col in temp_trades_df.columns:
temp_trades_df[col] = pd.to_numeric(temp_trades_df[col], errors='coerce')
# Convert time columns to datetime, coercing errors
time_cols = ['entryTime', 'exitTime']
for col in time_cols:
if col in temp_trades_df.columns:
# Attempt conversion, handle potential ISO 8601 format with timezone
try:
temp_trades_df[col] = pd.to_datetime(temp_trades_df[col], errors='coerce', utc=True)
except ValueError: # Fallback if direct conversion fails
temp_trades_df[col] = pd.to_datetime(temp_trades_df[col].str.slice(0, 19), errors='coerce') # Try without timezone
if temp_trades_df[col].notna().any(): # If some converted, make timezone naive for consistency before duration calc
temp_trades_df[col] = temp_trades_df[col].dt.tz_localize(None)
# Calculate trade duration if both entry and exit times are valid datetimes
if 'entryTime' in temp_trades_df.columns and 'exitTime' in temp_trades_df.columns and \
pd.api.types.is_datetime64_any_dtype(temp_trades_df['entryTime']) and \
pd.api.types.is_datetime64_any_dtype(temp_trades_df['exitTime']) and \
not temp_trades_df['entryTime'].isnull().all() and \
not temp_trades_df['exitTime'].isnull().all():
# Make times timezone-naive for direct subtraction if they have timezones
if temp_trades_df['entryTime'].dt.tz is not None:
temp_trades_df['entryTime'] = temp_trades_df['entryTime'].dt.tz_convert(None)
if temp_trades_df['exitTime'].dt.tz is not None:
temp_trades_df['exitTime'] = temp_trades_df['exitTime'].dt.tz_convert(None)
# Calculate duration as timedelta and in days
temp_trades_df['duration_td'] = temp_trades_df['exitTime'] - temp_trades_df['entryTime']
temp_trades_df['duration_days'] = temp_trades_df['duration_td'].dt.total_seconds() / (24 * 60 * 60)
else:
# Set duration columns to None if times are invalid/missing
temp_trades_df['duration_td'] = pd.NaT
temp_trades_df['duration_days'] = np.nan
# Store the processed trades DataFrame
results["trades_df"] = temp_trades_df
# --- Extract Exposure Series Data ---
# Note: This is often nested and might need specific parsing for plotting
results["exposure_series"] = get_nested_value(data, ['charts', 'Exposure', 'series'])
# --- Process Portfolio Turnover ---
turnover_values = get_nested_value(data, ['charts', 'Portfolio Turnover', 'series', 'Portfolio Turnover', 'values'])
turnover_df_indexed = process_timeseries_chart(turnover_values, 'Turnover')
if not turnover_df_indexed.empty:
results["turnover_df"] = turnover_df_indexed.reset_index() # Store with 'Time' column
except FileNotFoundError:
error_msg = f"Error: File not found at {file_path}"
print(error_msg)
results["error"] = error_msg
except json.JSONDecodeError:
error_msg = f"Error: Could not decode JSON from {filename}"
print(error_msg)
results["error"] = error_msg
except Exception as e:
# Catch any other unexpected errors during processing
error_msg = f"Error processing file {filename}: {e}"
print(error_msg)
traceback.print_exc()
results["error"] = error_msg
return results