AFML / afml /util /misc.py
akshayboora's picture
Upload 940 files
669d6a1 verified
"""
Various useful functions
"""
import functools
import io
import os
import re
import time
from datetime import datetime as dt
from datetime import timedelta
from itertools import product
from pathlib import Path
from typing import Callable, Literal, Optional, Tuple, Union
import holidays
import nbformat as nbf
import numpy as np
import pandas as pd
import psutil
from loguru import logger
from numba import njit
def crop_data_frame_in_batches(df: pd.DataFrame, chunksize: int):
# pylint: disable=invalid-name
"""
Splits df into chunks of chunksize
:param df: (pd.DataFrame) Dataframe to split
:param chunksize: (int) Number of rows in chunk
:return: (list) Chunks (pd.DataFrames)
"""
generator_object = []
for _, chunk in df.groupby(np.arange(len(df)) // chunksize):
generator_object.append(chunk)
return generator_object
def indices_to_mask(indices, length):
"""
Convert an array of indices into a boolean mask of given length.
Parameters
----------
indices : array-like of integers
The indices to be marked as True.
length : int
The desired length of the output boolean mask.
Returns
-------
mask : np.ndarray
A boolean array with True at positions given by indices and False elsewhere.
"""
mask = np.zeros(length, dtype=bool)
mask[indices] = True
return mask
@njit(parallel=True, cache=True)
def _count_max_decimals_numba(values: np.ndarray, max_places: int = 10) -> int:
max_dec = 0
for val in values:
for i in range(max_places + 1):
if np.isclose(val, round(val, i)):
if i > max_dec:
max_dec = i
break
return max_dec
def count_max_decimals(
values: Union[pd.Series, np.ndarray], max_places: int = 10
) -> int:
"""
Determine the maximum number of decimal places in a numeric array or pandas Series
without using string-based operations.
:param values: Input array or Series of floating-point values. NaNs are ignored.
:type values: Union[pd.Series, np.ndarray]
:param max_places: Maximum number of decimal places to test for. Defaults to 10.
:type max_places: int
:return: The largest number of decimal places required to accurately represent
any value in the input.
:rtype: int
"""
arr = values.to_numpy() if isinstance(values, pd.Series) else np.asarray(values)
arr = arr[~np.isnan(arr)]
return _count_max_decimals_numba(arr, max_places)
# --- Pandas Utilities ---
class DataFrameFormatter:
"""
A collection of reusable formatting utilities for pandas DataFrames.
This class provides static methods that return formatting callables, suitable for
use with both `DataFrame.apply()` and `DataFrame.style.format()`. These formatters
are designed to support both human-readable string representations and native types
for further processing or analysis.
:Example:
>>> formatter = DataFrameFormatter()
>>> # Convert seconds to hh:mm:ss as string and as timedelta
>>> df["duration_str"] = df["duration_sec"].apply(formatter.to_timecode("string"))
>>> df["duration_td"] = df["duration_sec"].apply(formatter.to_timecode("object"))
>>> # Format columns for display in styled DataFrame
>>> df.style.format({
... "sales": formatter.with_commas(),
... "profit_margin": formatter.percentage(2),
... "revenue": formatter.currency("€"),
... })
Methods are stateless and safe to reuse across projects or report pipelines.
"""
@staticmethod
def with_commas():
"""Returns a formatter that adds thousands separators to numbers."""
return lambda x: f"{x:,}"
@staticmethod
def to_timecode(
mode: Literal["string", "object"] = "string",
) -> Callable[[Union[int, float]], Union[str, timedelta]]:
"""
Converts seconds to hh:mm:ss format or timedelta objects.
:param mode: 'string' to return formatted timecode, 'object' for timedelta.
:type mode: Literal["string", "object"]
:return: A callable that formats seconds accordingly.
:rtype: Callable[[int | float], str | timedelta]
:Example:
>>> formatter = DataFrameFormatter()
>>> time_str = formatter.to_timecode("string")(3661)
>>> print(time_str)
1:01:01
>>> time_obj = formatter.to_timecode("object")(3661)
>>> print(time_obj)
1:01:01
"""
if mode == "string":
return lambda x: str(timedelta(seconds=int(x)))
elif mode == "object":
return lambda x: timedelta(seconds=int(x))
else:
raise ValueError("mode must be either 'string' or 'object'")
@staticmethod
def percentage(decimal_places=2):
"""Returns a formatter that formats a float as a percentage."""
return lambda x: f"{x * 100:.{decimal_places}f}%"
@staticmethod
def currency(symbol="$"):
"""Returns a formatter for currency with commas and a symbol."""
return lambda x: f"{symbol}{x:,.2f}"
def optimize_dtypes(df: pd.DataFrame, verbose: bool = True) -> pd.DataFrame:
optimized_df = df.copy()
start_mem = optimized_df.memory_usage(deep=True).sum() / 1024**2
for col in optimized_df.columns:
col_dtype = optimized_df[col].dtype
if pd.api.types.is_numeric_dtype(col_dtype):
if pd.api.types.is_integer_dtype(col_dtype):
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast="integer")
elif pd.api.types.is_float_dtype(col_dtype):
# Simple check: no NaNs and all values are whole numbers
if (
not optimized_df[col].isna().any()
and (optimized_df[col] == optimized_df[col].round()).all()
):
optimized_df[col] = optimized_df[col].astype("int64")
optimized_df[col] = pd.to_numeric(
optimized_df[col], downcast="integer"
)
else:
optimized_df[col] = pd.to_numeric(
optimized_df[col], downcast="float"
)
elif pd.api.types.is_object_dtype(col_dtype):
num_unique_values = optimized_df[col].nunique()
num_total_values = len(optimized_df[col])
if num_unique_values / num_total_values < 0.5:
optimized_df[col] = optimized_df[col].astype("category")
end_mem = optimized_df.memory_usage(deep=True).sum() / 1024**2
if verbose:
reduction_pct = 100 * (start_mem - end_mem) / start_mem
print(
f"Memory usage reduced from {start_mem:.2f} MB to {end_mem:.2f} MB ({reduction_pct:.1f}% reduction)"
)
return optimized_df
def log_column_changes(func):
"""
Decorator that logs column name changes made by a DataFrame transformation function.
Captures the original column headers and their transformed versions to aid reproducibility
and debugging in data preprocessing pipelines.
:param func: A function that returns a DataFrame with potentially renamed or flattened columns.
:type func: Callable[[pd.DataFrame], pd.DataFrame]
:return: Wrapped function with logging.
:rtype: Callable
"""
@functools.wraps(func)
def wrapper(df: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
old_cols = df.columns.tolist()
result = func(df, *args, **kwargs)
new_cols = result.columns.tolist()
if old_cols != new_cols:
logger.info("Column names changed:")
for old, new in zip(old_cols, new_cols):
if old != new:
logger.info(f" '{old}' -> '{new}'")
else:
logger.info("No column name changes detected.")
return result
return wrapper
@log_column_changes
def flatten_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""
Returns a copy of the DataFrame with flattened column names.
This flattens MultiIndex column names by joining tuple elements with underscores,
and returns a new DataFrame with updated column headers.
:param df: Input DataFrame with potentially MultiIndex columns.
:type df: pd.DataFrame
:return: A copy of the DataFrame with flattened column names.
:rtype: pd.DataFrame
:Example:
>>> df_grouped = df.groupby("key").agg({"value": ["mean", "sum"]})
>>> df_flat = flatten_column_names(df_grouped)
>>> df_flat.columns
Index(['value_mean', 'value_sum'], dtype='object')
"""
df_new = df.copy()
df_new.columns = [
"_".join(map(str, col)).strip() if isinstance(col, tuple) else str(col)
for col in df.columns
]
return df_new
def value_counts_data(
series: pd.Series, verbose: bool = False, as_percentage: bool = True
) -> pd.DataFrame:
"""
Returns a DataFrame showing raw and relative value counts of a Series.
:param series: The input Series to analyze.
:type series: pd.Series
:param verbose: If True, prints the result; otherwise returns it as a DataFrame.
:type verbose: bool
:param as_percentage: Whether to include proportion column.
:type as_percentage: bool
:return: A DataFrame with counts and optional percentage column.
:rtype: pd.DataFrame
:Example:
>>> value_counts_data(df["status"], verbose=True)
status value counts:
count proportion
active 1,240 0.62
closed 760 0.38
"""
counts = series.value_counts()
formatted = counts.apply(lambda x: f"{x:,}")
df = pd.DataFrame({"count": formatted})
if as_percentage:
df["proportion"] = series.value_counts(normalize=True)
if verbose:
print(f"\n{df}\n")
return df
# --- Logging Utilities ---
def log_performance(func):
"""
Decorator that logs the memory usage and execution time of a function.
This utility tracks the resident memory footprint before and after a function
call and reports the delta in megabytes along with the runtime duration.
:param func: The function to wrap and monitor.
:type func: Callable
:return: A wrapped function that logs performance metrics to the configured logger.
:rtype: Callable
:Example:
>>> @log_performance
... def heavy_function():
... return np.zeros((1000, 1000))
>>> heavy_function()
'heavy_function' - Time: 0:00:00.002345. Memory increment: 7.63 MB (123.45 MB -> 131.08 MB)
"""
def wrapper(*args, **kwargs):
process = psutil.Process()
mem_before = process.memory_info().rss / 1024**2 # memory in MB
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start_time
elapsed = timedelta(seconds=elapsed)
mem_after = process.memory_info().rss / 1024**2
logger.info(
f"'{func.__name__}' - Time: {elapsed}. Memory increment: {mem_after - mem_before:.2f} MB ({mem_before:.2f} MB -> {mem_after:.2f} MB)"
)
return result
return wrapper
def log_df_info(df: pd.DataFrame):
"""
Logs the output of `df.info()` for a given DataFrame.
This is useful for capturing column types, non-null counts, and memory usage
in structured logs during preprocessing or debugging.
:param df: The DataFrame whose structure will be logged.
:type df: pd.DataFrame
:Example:
>>> log_df_info(df)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 4 columns):
...
"""
buffer = io.StringIO()
df.info(buf=buffer)
info_str = buffer.getvalue()
buffer.close()
logger.info("\n" + info_str) # Log the captured output
# --- Time Helpers ---
def set_resampling_freq(timeframe: str):
"""
Convert an MT5 timeframe string to a valid pandas resampling frequency.
This utility interprets MetaTrader 5 (MT5) timeframe codes and maps them
to pandas-compatible resampling frequencies. Useful for aligning time-series
data to business days, calendar weeks, or custom intervals.
:param timeframe: MT5-style timeframe code (e.g., 'D1', 'W1', 'H1', 'M15', 'S30').
:type timeframe: str
:return: A string representing the pandas frequency alias (e.g., 'B', 'W-FRI', '15min').
:rtype: str
:raises ValueError: If the timeframe is not recognized or supported.
:Example:
>>> set_resampling_freq("D1")
'B'
>>> set_resampling_freq("H4")
'4h'
>>> set_resampling_freq("M15")
'15min'
>>> set_resampling_freq("S30")
'30s'
>>> set_resampling_freq("W1")
'W-FRI'
"""
timeframe = timeframe.upper()
nums = (x for x in list(timeframe) if x.isnumeric()) # list of numbers in timeframe
x = int("".join(nums))
if timeframe.startswith("W"):
freq = "W-FRI"
elif timeframe.startswith("D"):
freq = "B"
elif timeframe.startswith("H"):
freq = f"{x}h"
elif timeframe.startswith("M"):
freq = f"{x}min"
elif timeframe.startswith("S"):
freq = f"{x}s"
else:
raise ValueError(
"""
Valid timeframe arguments:
W1: weekly
D1: daily
H(x): resample x hours, e.g. H1, H4
M(x): resample x minutes, e.g. M1, M5
S(x): resample x seconds, e.g. S15, S30
"""
)
return freq
def date_conversion(
start_date: Union[str, dt, pd.Timestamp],
end_date: Union[str, dt, pd.Timestamp],
default_tz: str = "UTC",
) -> Optional[Tuple[pd.Timestamp, pd.Timestamp]]:
"""
Validates start and end dates, ensuring they are timezone-aware and in correct order.
Args:
start_date (Union[str, dt, pd.Timestamp]): The start date of the period.
end_date (Union[str, dt, pd.Timestamp]): The end date of the period.
default_tz (str): The default timezone to localize dates if they are naive.
Returns:
Optional[Tuple[pd.Timestamp, pd.Timestamp]]: A tuple of (start_date, end_date) as
timezone-aware pandas Timestamps, or None if validation fails.
"""
def convert_single_date(date_val, date_name):
if date_val is None:
raise ValueError(f"{date_name} cannot be None")
try:
# Handle various input types
if isinstance(date_val, str):
if not date_val.strip():
raise ValueError(f"{date_name} cannot be empty string")
ts = pd.to_datetime(date_val)
elif isinstance(date_val, (int, float)):
# Handle Unix timestamps
ts = pd.to_datetime(date_val, unit="s")
else:
ts = pd.to_datetime(date_val)
# Ensure timezone awareness
if ts.tzinfo is None:
ts = ts.tz_localize(default_tz)
elif str(ts.tzinfo) != default_tz:
# Convert to desired timezone if different
ts = ts.tz_convert(default_tz)
return ts
except Exception as e:
raise ValueError(f"Cannot parse {date_name} '{date_val}': {str(e)}")
# Convert both dates
start_ts = convert_single_date(start_date, "start_date")
end_ts = convert_single_date(end_date, "end_date")
# Validate date order
if start_ts >= end_ts:
raise ValueError(f"Start date ({start_ts}) must be before end date ({end_ts})")
return start_ts, end_ts
def is_trading_day(date, countries=["US"]):
date = pd.Timestamp(date)
holidays_ = {}
for country in countries:
holidays_.update(getattr(holidays, country)(years=[date.year]))
return date.weekday() < 5 and date not in holidays_
def is_first_weekday(date):
ts = pd.Timestamp(date)
first_day = ts.replace(day=1)
if first_day.weekday() < 5:
first_weekday = first_day
else:
# Roll forward to Monday
first_weekday = first_day + pd.offsets.Day(7 - first_day.weekday())
return ts == first_weekday
def is_last_weekday(date):
ts = pd.Timestamp(date)
# Month end
month_end = ts + pd.offsets.MonthEnd(0)
# If month end is weekend, roll back to Friday
if month_end.weekday() >= 5:
month_end = month_end - pd.offsets.Day(month_end.weekday() - 4)
return ts == month_end
# --- Convert Files ---
def markdown_to_notebook(markdown_content, output_filename="AFML Experiments.ipynb"):
"""
Convert markdown content (either as string or file path) to Jupyter notebook.
Args:
markdown_content (str): Can be either markdown content string or file path
output_filename (str): Output notebook filename
Returns:
str: Output notebook filename
"""
# Read content if input is a valid file path
if isinstance(markdown_content, str) and os.path.isfile(markdown_content):
with open(markdown_content, "r", encoding="utf-8") as f:
content = f.read()
else:
content = markdown_content
# Create new notebook and pattern for code blocks
nb = nbf.v4.new_notebook()
pattern = r"```python\n(.*?)\n```"
current_pos = 0
cells = []
# Process all code blocks
for match in re.finditer(pattern, content, re.DOTALL):
# Add preceding markdown
markdown_segment = content[current_pos : match.start()].strip()
if markdown_segment:
clean_md = re.sub(r"\n{3,}", "\n\n", markdown_segment)
cells.append(nbf.v4.new_markdown_cell(clean_md))
# Add code block
code_block = match.group(1).strip()
if code_block:
cells.append(nbf.v4.new_code_cell(code_block))
current_pos = match.end()
# Add remaining markdown after last code block
trailing_markdown = content[current_pos:].strip()
if trailing_markdown:
clean_trailing = re.sub(r"\n{3,}", "\n\n", trailing_markdown)
cells.append(nbf.v4.new_markdown_cell(clean_trailing))
# Handle case with no code blocks
if not cells:
clean_content = re.sub(r"\n{3,}", "\n\n", content.strip())
cells.append(nbf.v4.new_markdown_cell(clean_content))
# Add cells to notebook and save
nb.cells = cells
with open(output_filename, "w", encoding="utf-8") as f:
nbf.write(nb, f)
print(f"Notebook saved as {output_filename}")
return output_filename
# --- String Formatting ---
def to_subscript(text):
subscript_chars = {
# Numbers
"0": "₀",
"1": "₁",
"2": "₂",
"3": "₃",
"4": "₄",
"5": "₅",
"6": "₆",
"7": "₇",
"8": "₈",
"9": "₉",
# Mathematical operators
"+": "₊",
"-": "₋",
"=": "₌",
"(": "₍",
")": "₎",
# Latin letters
"a": "ₐ",
"e": "ₑ",
"h": "ₕ",
"i": "ᵢ",
"j": "ⱼ",
"k": "ₖ",
"l": "ₗ",
"m": "ₘ",
"n": "ₙ",
"o": "ₒ",
"p": "ₚ",
"r": "ᵣ",
"s": "ₛ",
"t": "ₜ",
"u": "ᵤ",
"v": "ᵥ",
"x": "ₓ",
# Greek letters (only those with actual Unicode subscripts)
"β": "ᵦ", # Beta - this one exists
"γ": "ᵧ", # Gamma - this one exists
"ρ": "ᵨ", # Rho - this one exists
"φ": "ᵩ", # Phi - this one exists
"χ": "ᵪ", # Chi - this one exists
# Note: δ, α, σ, λ, μ, τ, θ, ε, ψ don't have Unicode subscripts
}
return "".join(subscript_chars.get(char, char) for char in str(text))
def smart_subscript(base, subscript):
"""
Smart subscript formatting with fallback options
"""
# Try Unicode subscript first
unicode_result = to_subscript(subscript)
# For letters without subscript equivalents, return original
if unicode_result == subscript and any(char in subscript for char in "δασλμτθεψ"):
unicode_result = f"{base}_{subscript}" # Fallback notation
else:
unicode_result = f"{base}{unicode_result}"
# For display contexts that support it
latex_result = f"${base}_{{{subscript}}}$"
html_result = f"{base}<sub>{subscript}</sub>"
return {
"unicode": unicode_result,
"latex": latex_result,
"html": html_result,
"plain": f"{base}_{subscript}", # Fallback
}
def get_folder_size(path: str) -> float:
folder = Path(path)
total_size = sum(f.stat().st_size for f in folder.rglob("*") if f.is_file())
return total_size / (1024**2) # MB
# --- Dictionary helpers ---
def expand_params(params: dict) -> list[dict]:
"""
Expands a dictionary of parameters with list values into a list of all combinations.
:param params: Dictionary where keys are parameter names and values are lists of possible values.
:return: List of dictionaries, each representing a unique combination of parameters.
"""
keys = list(params.keys())
values = [params[k] for k in keys]
combos = product(*values)
return [dict(zip(keys, combo)) for combo in combos]
def dict_to_key(strategy="json"):
"""
Returns a closure that converts dicts into hashable keys.
Parameters
----------
strategy : str
"frozenset" -> unordered, hashable
"tuple" -> ordered, deterministic
"json" -> string serialization
Returns
-------
converter : function
A function that takes a dict and returns a hashable key.
Usage
-----
# Create a converter using the recommended 'tuple' strategy
keyify = dict_to_key("tuple")
# Example dictionary
inner = {"a": 1, "b": 2}
# Use the converted dict as a key in another dictionary
outer = {keyify(inner): "stored value"}
# Equivalent dicts (different insertion order) resolve to the same key
print(outer[keyify({"b": 2, "a": 1})]) # "stored value"
# Switching strategies:
# frozenset ignores order entirely
keyify_fs = dict_to_key("frozenset")
print(keyify_fs({"x": 1, "y": 2})) # frozenset({('x', 1), ('y', 2)})
# json produces a human-readable string key
keyify_json = dict_to_key("json")
print(keyify_json({"x": 1, "y": 2})) # '{"x": 1, "y": 2}'
"""
import json
def converter(d):
if not isinstance(d, dict):
raise TypeError("Expected dict, got {}".format(type(d)))
if strategy == "frozenset":
return frozenset(d.items())
elif strategy == "tuple":
return tuple(sorted(d.items()))
elif strategy == "json":
return json.dumps(d, sort_keys=True)
else:
raise ValueError("Unknown strategy: {}".format(strategy))
return converter