File size: 6,031 Bytes
76317bb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | # -*- coding: utf-8 -*-
"""utils.py
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1RyRghhbleQJ01USX_0O4uUALsuFM10hJ
"""
# utils.py
# Helper functions for data manipulation and plotting defaults.
import pandas as pd
import plotly.graph_objects as go
import re
import numpy as np
import traceback
def get_nested_value(data_dict, keys, default=None):
"""Safely get a value from a nested dictionary or list."""
current_level = data_dict
for key in keys:
if isinstance(current_level, dict) and key in current_level:
current_level = current_level[key]
elif isinstance(current_level, list) and isinstance(key, int) and 0 <= key < len(current_level):
current_level = current_level[key]
else:
return default
return current_level
def parse_numeric_string(value_str, default=None):
"""Attempts to parse numeric values from strings, handling $, %, and commas."""
if not isinstance(value_str, str):
# If it's already a number (int, float), return it directly
if isinstance(value_str, (int, float)):
return value_str
# Otherwise, it might be None or some other non-string type
return default # Return default for non-string, non-numeric types
try:
# Remove currency symbols, percentage signs, and commas
cleaned_str = re.sub(r'[$,%]', '', value_str).strip()
return float(cleaned_str)
except (ValueError, TypeError):
# Return default if cleaning/conversion fails
return default
def create_empty_figure(title="No Data Available"):
"""Creates an empty Plotly figure with a title."""
fig = go.Figure()
fig.update_layout(
title=title,
xaxis={'visible': False},
yaxis={'visible': False},
annotations=[{
'text': title,
'xref': 'paper', 'yref': 'paper',
'showarrow': False, 'font': {'size': 16}
}]
)
return fig
def process_timeseries_chart(chart_data, value_col_name='Value'):
"""
Processes QuantConnect timeseries chart data like [[timestamp, value, ...], ...].
Assumes timestamp is in SECONDS. Extracts the second element as the value.
Returns a DataFrame with 'Time' (datetime) index and value_col_name.
Handles potential errors during processing.
"""
# Check if input data is valid list format
if not chart_data or not isinstance(chart_data, list):
# print(f"Warning: Invalid or empty chart_data for {value_col_name}. Returning empty DataFrame.")
return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
# Check if the first element is a list/tuple with at least two items
if not chart_data[0] or not isinstance(chart_data[0], (list, tuple)) or len(chart_data[0]) < 2:
# print(f"Warning: First element format incorrect for {value_col_name}. Returning empty DataFrame.")
return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
try:
# Extract timestamp (assumed index 0) and value (assumed index 1)
# Filter out entries where timestamp or value is None
processed_data = [
[item[0], item[1]] for item in chart_data
if isinstance(item, (list, tuple)) and len(item) >= 2 and item[0] is not None and item[1] is not None
]
# If no valid data points remain after filtering
if not processed_data:
# print(f"Warning: No valid data points after filtering for {value_col_name}. Returning empty DataFrame.")
return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
# Create DataFrame
df = pd.DataFrame(processed_data, columns=['Time_Raw', value_col_name])
# Convert timestamp (assumed seconds) to numeric, coercing errors
df['Time_Raw'] = pd.to_numeric(df['Time_Raw'], errors='coerce')
df.dropna(subset=['Time_Raw'], inplace=True) # Drop rows where timestamp conversion failed
if df.empty: return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
# Convert numeric timestamp to datetime, coercing errors
df['Time'] = pd.to_datetime(df['Time_Raw'], unit='s', errors='coerce')
df.dropna(subset=['Time'], inplace=True) # Drop rows where datetime conversion failed
if df.empty: return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
# Convert value column to numeric, coercing errors
df[value_col_name] = pd.to_numeric(df[value_col_name], errors='coerce')
df.dropna(subset=[value_col_name], inplace=True) # Drop rows where value conversion failed
if df.empty: return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
# Set the datetime 'Time' column as the index
df = df.set_index('Time')
# Verify the index is indeed a DatetimeIndex
if not isinstance(df.index, pd.DatetimeIndex):
print(f"Warning: Index is not DatetimeIndex for {value_col_name} after setting. Attempting conversion.")
df.index = pd.to_datetime(df.index, errors='coerce')
df.dropna(inplace=True) # Drop rows if conversion failed
if df.empty: return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time')
# Ensure the DatetimeIndex is timezone-aware (UTC)
if df.index.tz is None:
df = df.tz_localize('UTC') # Localize if naive
elif df.index.tz != 'UTC':
df = df.tz_convert('UTC') # Convert if different timezone
# Return the DataFrame with only the value column, sorted by time
return df[[value_col_name]].sort_index()
except Exception as e:
print(f"Error creating/processing DataFrame for {value_col_name}: {e}")
traceback.print_exc()
# Return an empty DataFrame in case of any unexpected error
return pd.DataFrame(columns=['Time', value_col_name]).set_index('Time') |